Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.20260515134210-2e2a0d06648c
github.com/openstack-k8s-operators/mariadb-operator/api v0.0.0-00010101000000-000000000000
go.uber.org/zap v1.28.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
k8s.io/api v0.31.14
k8s.io/apimachinery v0.31.14
k8s.io/client-go v0.31.14
Expand Down Expand Up @@ -82,6 +81,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/mod v0.32.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
Expand Down
218 changes: 120 additions & 98 deletions internal/controller/galera_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -51,8 +52,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/util/podutils"

"golang.org/x/exp/maps"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -110,12 +109,21 @@ func GetLog(ctx context.Context, controller string) logr.Logger {
//

// findBestCandidate returns the node with the lowest seqno
func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) {
sortednodes := maps.Keys(g.Status.Attributes)
sort.Strings(sortednodes)
func findBestCandidate(g *mariadbv1.Galera, pods []corev1.Pod, log logr.Logger) (node string, found bool) {
log.Info("Known attributes for bootstrapping the cluster", "attributes", g.Status.Attributes)
var sortedNodes []string
for _, pod := range pods {
if cidFound, cid := getGaleraContainerID(&pod); cidFound {
attr, attrFound := g.Status.Attributes[pod.Name]
if attrFound && attr.ContainerID == cid {
sortedNodes = append(sortedNodes, pod.Name)
}
}
}
sort.Strings(sortedNodes)
bestnode := ""
bestseqno := -1
for _, node := range sortednodes {
for _, node := range sortedNodes {
// On clean shutdown, galera sets the last
// stopped node as 'safe to bootstrap', so use
// this hint when we can
Expand All @@ -131,7 +139,7 @@ func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) {
}
// if we pass here, a candidate is only valid if we
// inspected all the expected replicas (e.g. typically 3)
if len(g.Status.Attributes) != int(*g.Spec.Replicas) {
if len(sortedNodes) != int(*g.Spec.Replicas) {
return "", false
}
return bestnode, true //"galera-0"
Expand Down Expand Up @@ -186,15 +194,42 @@ func getReadyPods(pods []corev1.Pod) (ret []corev1.Pod) {
return
}

// getRunningPodsMissingAttributes returns all the pods for which the operator
// getWaitingPodsNotUsingNewReportScript filters out pods which are not yet
// using the new galera state report script (pre-push behaviour ONLY)
func getPodsNotUsingNewReportScript(pods []corev1.Pod) (ret []corev1.Pod) {
for _, pod := range pods {
cmIdx := slices.IndexFunc(pod.Spec.Volumes, func(v corev1.Volume) bool {
return v.Name == "operator-scripts"
})
cm := *pod.Spec.Volumes[cmIdx].ConfigMap
scriptIdx := slices.IndexFunc(cm.Items, func(k corev1.KeyToPath) bool {
return k.Key == "report_local_galera_state.py"
})
if scriptIdx == -1 {
ret = append(ret, pod)
}
}
return
}

// getWaitingPodsMissingAttributes returns all the pods for which the operator
// has no seqno information, and which are ready for being inspected.
// Note: a pod is considered 'ready for inspection' when its main container is
// started and its inner process is currently waiting for a gcomm URI
// (i.e. it is not running mysqld)
func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
func getWaitingPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
for _, pod := range pods {
if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) {
_, attrFound := instance.Status.Attributes[pod.Name]
if attrFound {
cidFound, cid := getGaleraContainerID(&pod)
if !cidFound || cid != instance.Status.Attributes[pod.Name].ContainerID {
// The info we have for this pod came from an old container,
// so drop it and mark this pod as missing attributes
clearPodAttributes(ctx, instance, pod.Name)
attrFound = false
}
}
if !attrFound && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) {
ret = append(ret, pod)
}
Expand All @@ -203,12 +238,28 @@ func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, ins
return
}

// getRunningPodsMissingGcomm returns all the pods which are not running galera
// retrieveAttributesForPods fetches galera database status from pods which are not
// not yet using the new galera state report script (pre-push behaviour ONLY)
func retrieveAttributesForPods(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) {
for _, pod := range pods {
name := pod.Name
util.LogForObject(h, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance)
warn, err := retrieveSequenceNumber(ctx, h, config, instance, &pod)
if len(warn) > 0 {
util.LogForObject(h, fmt.Sprintf("Warning: %q", warn), instance)
}
if err != nil {
util.LogForObject(h, fmt.Sprintf("Failed to retrieve seqno: %q", err), instance)
}
}
}

// getPodsWaitingForGcomm returns all the pods which are not running galera
// yet but are ready to join the cluster.
// Note: a pod is considered 'ready to join' when its main container is
// NOTE: a pod is considered 'ready to join' when its main container is
// started and its inner process is currently waiting for a gcomm URI
// (i.e. it is not running mysqld)
func getRunningPodsMissingGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
func getPodsWaitingForGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
for _, pod := range pods {
if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) &&
isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) {
Expand Down Expand Up @@ -259,10 +310,18 @@ func injectGcommURI(ctx context.Context, h *helper.Helper, config *rest.Config,
func(_ *bytes.Buffer, _ *bytes.Buffer) error {
attr := instance.Status.Attributes[pod.Name]
attr.Gcomm = uri
attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID
instance.Status.Attributes[pod.Name] = attr
return nil
})
// If we could not push the file in the pod, this might be due
// to a transient connection error. Return an error, so that
// this reconcile event gets reprocessed, to give us a chance
// to re-inject the URI into the pod.
//
// NOTE: if the error was due to a pod being restarted, the next
// processing of this reconcile event will automatically clean up
// the outdated attributes and wait for the newly restarted pod
// to publish up-to-date state before injecting any URI.
return err
}

Expand All @@ -279,15 +338,19 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config *
if stderr.Len() > 0 {
errStr = strings.Split(strings.TrimSuffix(stderr.String(), "\n"), "\n")
}
// Remember the container from which the probe was extracted
attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID
instance.Status.Attributes[pod.Name] = attr
return nil
})
return
}

// clearPodAttributes clears information known by the operator about a pod
func clearPodAttributes(instance *mariadbv1.Galera, podName string) {
func clearPodAttributes(ctx context.Context, instance *mariadbv1.Galera, podName string) {
delete(instance.Status.Attributes, podName)
log := GetLog(ctx, "galera")
log.Info("Clear tracked attributes for pod", "pod", podName, "instance", instance)
// If the pod was deemed safeToBootstrap, this state has to be reassessed
if instance.Status.SafeToBootstrap == podName {
instance.Status.SafeToBootstrap = ""
Expand All @@ -297,40 +360,14 @@ func clearPodAttributes(instance *mariadbv1.Galera, podName string) {
// clearOldPodsAttributesOnScaleDown removes known information from old pods
// that no longer exist after a scale down of the galera CR
func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1.Galera) {
log := GetLog(ctx, "galera")
replicas := int(*instance.Spec.Replicas)

// a pod's name is built as 'statefulsetname-n'
for node := range instance.Status.Attributes {
parts := strings.Split(node, "-")
index, _ := strconv.Atoi(parts[len(parts)-1])
if index >= replicas {
clearPodAttributes(instance, node)
log.Info("Remove old pod from status after scale-down", "instance", instance, "pod", node)
}
}
}

// assertPodsAttributesValidity compares the current state of the pods that are starting galera
// against their known state in the CR's attributes. If a pod's attributes don't match its actual
// state (i.e. it failed to start galera), the attributes are cleared from the CR's status
func assertPodsAttributesValidity(helper *helper.Helper, instance *mariadbv1.Galera, pods []corev1.Pod) {
for _, pod := range pods {
_, found := instance.Status.Attributes[pod.Name]
if !found {
continue
}
// A node can have various attributes depending on its known state.
// A ContainerID attribute is only present if the node is being started.
attrCID := instance.Status.Attributes[pod.Name].ContainerID
containerFound, podCID := getGaleraContainerID(&pod)
if !containerFound || (attrCID != "" && attrCID != podCID) {
// This gcomm URI was pushed in a pod which was restarted
// before the attribute got cleared, which means the pod
// failed to start galera. Clear the attribute here, and
// reprobe the pod's state in the next reconcile loop
clearPodAttributes(instance, pod.Name)
util.LogForObject(helper, "Pod restarted while galera was starting", instance, "pod", pod.Name, "recorded ID", attrCID)
clearPodAttributes(ctx, instance, node)
}
}
}
Expand Down Expand Up @@ -502,6 +539,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
Resources: []string{"galeras"},
Verbs: []string{"get", "list"},
},
{
APIGroups: []string{"mariadb.openstack.org"},
Resources: []string{"galeras/status"},
Verbs: []string{"get", "list", "update", "patch"},
},
{
APIGroups: []string{"mariadb.openstack.org"},
Resources: []string{"mariadbaccounts"},
Expand Down Expand Up @@ -867,8 +909,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return sfres, sferr
}

// util.LogForObject(helper, fmt.Sprintf("DAM BEFORE %v - AFTER %v", helper.GetBefore(), helper.GetAfter()), instance)

statefulset := commonstatefulset.GetStatefulSet()

// If a full cluster restart was requested,
Expand All @@ -892,6 +932,12 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return ctrl.Result{}, err
}

// NOTE: Pre-push behaviour ONLY
// If we're doing a rolling update to the new way of probing galera,
// there might still be old pods whose probing must be managed by the operator.
// keep track of those pods for later
oldPods := getPodsNotUsingNewReportScript(podList.Items)
oldWaitingPods := getWaitingPodsMissingAttributes(ctx, oldPods, instance, helper, r.config)
//
// Reconstruct the state of the galera resource based on the replicaset and its pods
//
Expand All @@ -901,9 +947,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
clearOldPodsAttributesOnScaleDown(ctx, instance)
}

// Ensure that all the ongoing galera start actions are still running
assertPodsAttributesValidity(helper, instance, podList.Items)

// Note:
// . A pod is available in the statefulset if the pod's readiness
// probe returns true (i.e. galera is running in the pod and clustered)
Expand All @@ -920,20 +963,14 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
name := pod.Name
if _, found := instance.Status.Attributes[name]; found {
log.Info("Galera started", "pod", name)
clearPodAttributes(instance, name)
clearPodAttributes(ctx, instance, name)
}
}

runningPods := getRunningPodsMissingGcomm(ctx, podList.Items, instance, helper, r.config)
// Special case for 1-node deployment: if the statefulset reports 1 node is available
// but the pod shows up in runningPods (i.e. NotReady), do not consider it a joiner.
// Wait for the two statuses to re-sync after another k8s probe is run.
if *instance.Spec.Replicas == 1 && len(runningPods) == 1 {
log.Info("Galera node no longer running. Requeuing")
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
}

// The other 'Running' pods can join the existing cluster.
// The remaining pods will become joiner nodes. Wait until they have
// reported their status to the operator, and then configure them
// as joiners.
runningPods := getPodsWaitingForGcomm(ctx, podList.Items, instance, helper, r.config)
for _, pod := range runningPods {
name := pod.Name
joinerURI := buildGcommURI(instance)
Expand All @@ -942,51 +979,33 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
err := injectGcommURI(ctx, helper, r.config, instance, &pod, joinerURI)
if err != nil {
log.Error(err, "Failed to push gcomm URI", "pod", name)
// A failed injection likely means the pod's status has changed.
// drop it from status and reprobe it in another reconcile loop
clearPodAttributes(instance, name)
// Force an error here to retry this reconcile, and try
// another injection (see details in injectGcommURI())
return ctrl.Result{}, err
}
}
}

// If the cluster is not running, probe the available pods for seqno
// to determine the bootstrap node.
// If the cluster is not running, wait until pods advertise their state
// in the Galera CR, and determine which node to bootstrap from
// Note:
// . a pod whose phase is Running but who is not Ready hasn't started
// galera, it is waiting for the operator's instructions.
// We can record its galera's seqno in our status.
// . any other status means the the pod is starting/restarting. We can't
// exec into the pod yet, so we will probe it in another reconcile loop.
if !instance.Status.Bootstrapped && !isBootstrapInProgress(instance) {
var node string
found := false
for _, pod := range getRunningPodsMissingAttributes(ctx, podList.Items, instance, helper, r.config) {
name := pod.Name
util.LogForObject(helper, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance)
warn, err := retrieveSequenceNumber(ctx, helper, r.config, instance, &pod)
if len(warn) > 0 {
util.LogForObject(helper, fmt.Sprintf("Warning: %q", warn), instance)
}
if err != nil {
log.Error(err, fmt.Sprintf("Failed to retrieve seqno for %s", name))
return ctrl.Result{}, err
}
log.Info(fmt.Sprintf("Attributes retrieved for %s", name),
"UUID", instance.Status.Attributes[name].UUID,
"Seqno", instance.Status.Attributes[name].Seqno,
"SafeToBootstrap", instance.Status.Attributes[name].SafeToBootstrap,
)
if instance.Status.Attributes[name].SafeToBootstrap {
node = name
found = true
break
}

// NOTE: Pre-push behaviour ONLY
// If old pods are still running, the operator must probe their status.
if len(oldWaitingPods) > 0 {
retrieveAttributesForPods(ctx, oldWaitingPods, instance, helper, r.config)
}

// Check if we have enough info to bootstrap the cluster now
if !found && int(*instance.Spec.Replicas) > 0 {
node, found = findBestCandidate(instance)
// (either one of the node reported itself as `safe_to_bootstrap`
// or all the nodes have reported their state and we'll determine one candidate)
if int(*instance.Spec.Replicas) > 0 {
node, found = findBestCandidate(instance, podList.Items, log)
}
if found {
pod := getPodFromName(podList.Items, node)
Expand All @@ -995,22 +1014,25 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
err := injectGcommURI(ctx, helper, r.config, instance, pod, "gcomm://")
if err != nil {
log.Error(err, "Failed to push gcomm URI", "pod", node)
// A failed injection likely means the pod's status has changed.
// drop it from status and reprobe it in another reconcile loop
clearPodAttributes(instance, node)
// Force an error here to retry this reconcile, and try
// another injection (see details in injectGcommURI())
return ctrl.Result{}, err
}
}
}

// The statefulset usually instantiates the pods instantly, and the galera
// operator doesn't receive individual events for pod's phase transition or
// readiness, as it is not controlling the pods (the statefulset is).
// So until all pods become available, we have to requeue this event to get
// a chance to react to all pod's transitions.
// Stop here if we don't have all galera pods running yet. Another
// reconciliation will be triggered if either:
// - a pod (re)starts and push its local state to the galera CR
// - A pod becomes ready, which bumps the AvailableReplicas in the StatefulSet
if statefulset.Status.AvailableReplicas != statefulset.Status.Replicas {
log.Info("Requeuing until all replicas are available")
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
// NOTE: Pre-push behaviour ONLY
// If old pods are still running, requeue until all pods are available
if len(oldPods) > 0 {
log.Info("Requeuing until all replicas are available")
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
}
return ctrl.Result{}, nil
}

// We reached the end of the Reconcile, update the Ready condition based on
Expand Down
Loading
Loading