Skip to content

Commit 718cfeb

Browse files
committed
Let pods report their state to the operator
Reverse the introspection approach for bootstrapping the cluster. Until now, the mariadb operator was in charge in running scripts inside pods to extract the state of the mysql database. Now the pod are responsible for reporting the state of their database to the operator, by updating their associated galera CR via calls to the API server. This allows asynchronous reporting from the operator's perspective, and ensure that newly (re)started pod will always update the operator state, preventing stale data to linger in the galera CR status. Jira: OSPRH-23075
1 parent a039cf4 commit 718cfeb

6 files changed

Lines changed: 325 additions & 101 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ require (
1111
github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.20260515134210-2e2a0d06648c
1212
github.com/openstack-k8s-operators/mariadb-operator/api v0.0.0-00010101000000-000000000000
1313
go.uber.org/zap v1.28.0
14-
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
1514
k8s.io/api v0.31.14
1615
k8s.io/apimachinery v0.31.14
1716
k8s.io/client-go v0.31.14
@@ -82,6 +81,7 @@ require (
8281
go.uber.org/multierr v1.11.0 // indirect
8382
go.yaml.in/yaml/v2 v2.4.2 // indirect
8483
go.yaml.in/yaml/v3 v3.0.4 // indirect
84+
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
8585
golang.org/x/mod v0.32.0 // indirect
8686
golang.org/x/net v0.49.0 // indirect
8787
golang.org/x/oauth2 v0.30.0 // indirect

internal/controller/galera_controller.go

Lines changed: 120 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"encoding/json"
2424
"errors"
2525
"fmt"
26+
"slices"
2627
"sort"
2728
"strconv"
2829
"strings"
@@ -51,8 +52,6 @@ import (
5152
"k8s.io/client-go/rest"
5253
"k8s.io/kubectl/pkg/util/podutils"
5354

54-
"golang.org/x/exp/maps"
55-
5655
"github.com/go-logr/logr"
5756
"k8s.io/apimachinery/pkg/runtime"
5857
ctrl "sigs.k8s.io/controller-runtime"
@@ -110,12 +109,21 @@ func GetLog(ctx context.Context, controller string) logr.Logger {
110109
//
111110

112111
// findBestCandidate returns the node with the lowest seqno
113-
func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) {
114-
sortednodes := maps.Keys(g.Status.Attributes)
115-
sort.Strings(sortednodes)
112+
func findBestCandidate(g *mariadbv1.Galera, pods []corev1.Pod, log logr.Logger) (node string, found bool) {
113+
log.Info("Known attributes for bootstrapping the cluster", "attributes", g.Status.Attributes)
114+
var sortedNodes []string
115+
for _, pod := range pods {
116+
if cidFound, cid := getGaleraContainerID(&pod); cidFound {
117+
attr, attrFound := g.Status.Attributes[pod.Name]
118+
if attrFound && attr.ContainerID == cid {
119+
sortedNodes = append(sortedNodes, pod.Name)
120+
}
121+
}
122+
}
123+
sort.Strings(sortedNodes)
116124
bestnode := ""
117125
bestseqno := -1
118-
for _, node := range sortednodes {
126+
for _, node := range sortedNodes {
119127
// On clean shutdown, galera sets the last
120128
// stopped node as 'safe to bootstrap', so use
121129
// this hint when we can
@@ -131,7 +139,7 @@ func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) {
131139
}
132140
// if we pass here, a candidate is only valid if we
133141
// inspected all the expected replicas (e.g. typically 3)
134-
if len(g.Status.Attributes) != int(*g.Spec.Replicas) {
142+
if len(sortedNodes) != int(*g.Spec.Replicas) {
135143
return "", false
136144
}
137145
return bestnode, true //"galera-0"
@@ -186,15 +194,42 @@ func getReadyPods(pods []corev1.Pod) (ret []corev1.Pod) {
186194
return
187195
}
188196

189-
// getRunningPodsMissingAttributes returns all the pods for which the operator
197+
// getWaitingPodsNotUsingNewReportScript filters out pods which are not yet
198+
// using the new galera state report script (pre-push behaviour ONLY)
199+
func getPodsNotUsingNewReportScript(pods []corev1.Pod) (ret []corev1.Pod) {
200+
for _, pod := range pods {
201+
cmIdx := slices.IndexFunc(pod.Spec.Volumes, func(v corev1.Volume) bool {
202+
return v.Name == "operator-scripts"
203+
})
204+
cm := *pod.Spec.Volumes[cmIdx].ConfigMap
205+
scriptIdx := slices.IndexFunc(cm.Items, func(k corev1.KeyToPath) bool {
206+
return k.Key == "report_local_galera_state.py"
207+
})
208+
if scriptIdx == -1 {
209+
ret = append(ret, pod)
210+
}
211+
}
212+
return
213+
}
214+
215+
// getWaitingPodsMissingAttributes returns all the pods for which the operator
190216
// has no seqno information, and which are ready for being inspected.
191217
// Note: a pod is considered 'ready for inspection' when its main container is
192218
// started and its inner process is currently waiting for a gcomm URI
193219
// (i.e. it is not running mysqld)
194-
func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
220+
func getWaitingPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
195221
for _, pod := range pods {
196222
if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) {
197223
_, attrFound := instance.Status.Attributes[pod.Name]
224+
if attrFound {
225+
cidFound, cid := getGaleraContainerID(&pod)
226+
if !cidFound || cid != instance.Status.Attributes[pod.Name].ContainerID {
227+
// The info we have for this pod came from an old container,
228+
// so drop it and mark this pod as missing attributes
229+
clearPodAttributes(ctx, instance, pod.Name)
230+
attrFound = false
231+
}
232+
}
198233
if !attrFound && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) {
199234
ret = append(ret, pod)
200235
}
@@ -203,12 +238,28 @@ func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, ins
203238
return
204239
}
205240

206-
// getRunningPodsMissingGcomm returns all the pods which are not running galera
241+
// retrieveAttributesForPods fetches galera database status from pods which are not
242+
// not yet using the new galera state report script (pre-push behaviour ONLY)
243+
func retrieveAttributesForPods(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) {
244+
for _, pod := range pods {
245+
name := pod.Name
246+
util.LogForObject(h, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance)
247+
warn, err := retrieveSequenceNumber(ctx, h, config, instance, &pod)
248+
if len(warn) > 0 {
249+
util.LogForObject(h, fmt.Sprintf("Warning: %q", warn), instance)
250+
}
251+
if err != nil {
252+
util.LogForObject(h, fmt.Sprintf("Failed to retrieve seqno: %q", err), instance)
253+
}
254+
}
255+
}
256+
257+
// getPodsWaitingForGcomm returns all the pods which are not running galera
207258
// yet but are ready to join the cluster.
208-
// Note: a pod is considered 'ready to join' when its main container is
259+
// NOTE: a pod is considered 'ready to join' when its main container is
209260
// started and its inner process is currently waiting for a gcomm URI
210261
// (i.e. it is not running mysqld)
211-
func getRunningPodsMissingGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
262+
func getPodsWaitingForGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
212263
for _, pod := range pods {
213264
if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) &&
214265
isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) {
@@ -259,10 +310,18 @@ func injectGcommURI(ctx context.Context, h *helper.Helper, config *rest.Config,
259310
func(_ *bytes.Buffer, _ *bytes.Buffer) error {
260311
attr := instance.Status.Attributes[pod.Name]
261312
attr.Gcomm = uri
262-
attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID
263313
instance.Status.Attributes[pod.Name] = attr
264314
return nil
265315
})
316+
// If we could not push the file in the pod, this might be due
317+
// to a transient connection error. Return an error, so that
318+
// this reconcile event gets reprocessed, to give us a chance
319+
// to re-inject the URI into the pod.
320+
//
321+
// NOTE: if the error was due to a pod being restarted, the next
322+
// processing of this reconcile event will automatically clean up
323+
// the outdated attributes and wait for the newly restarted pod
324+
// to publish up-to-date state before injecting any URI.
266325
return err
267326
}
268327

@@ -279,15 +338,19 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config *
279338
if stderr.Len() > 0 {
280339
errStr = strings.Split(strings.TrimSuffix(stderr.String(), "\n"), "\n")
281340
}
341+
// Remember the container from which the probe was extracted
342+
attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID
282343
instance.Status.Attributes[pod.Name] = attr
283344
return nil
284345
})
285346
return
286347
}
287348

288349
// clearPodAttributes clears information known by the operator about a pod
289-
func clearPodAttributes(instance *mariadbv1.Galera, podName string) {
350+
func clearPodAttributes(ctx context.Context, instance *mariadbv1.Galera, podName string) {
290351
delete(instance.Status.Attributes, podName)
352+
log := GetLog(ctx, "galera")
353+
log.Info("Clear tracked attributes for pod", "pod", podName, "instance", instance)
291354
// If the pod was deemed safeToBootstrap, this state has to be reassessed
292355
if instance.Status.SafeToBootstrap == podName {
293356
instance.Status.SafeToBootstrap = ""
@@ -297,40 +360,14 @@ func clearPodAttributes(instance *mariadbv1.Galera, podName string) {
297360
// clearOldPodsAttributesOnScaleDown removes known information from old pods
298361
// that no longer exist after a scale down of the galera CR
299362
func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1.Galera) {
300-
log := GetLog(ctx, "galera")
301363
replicas := int(*instance.Spec.Replicas)
302364

303365
// a pod's name is built as 'statefulsetname-n'
304366
for node := range instance.Status.Attributes {
305367
parts := strings.Split(node, "-")
306368
index, _ := strconv.Atoi(parts[len(parts)-1])
307369
if index >= replicas {
308-
clearPodAttributes(instance, node)
309-
log.Info("Remove old pod from status after scale-down", "instance", instance, "pod", node)
310-
}
311-
}
312-
}
313-
314-
// assertPodsAttributesValidity compares the current state of the pods that are starting galera
315-
// against their known state in the CR's attributes. If a pod's attributes don't match its actual
316-
// state (i.e. it failed to start galera), the attributes are cleared from the CR's status
317-
func assertPodsAttributesValidity(helper *helper.Helper, instance *mariadbv1.Galera, pods []corev1.Pod) {
318-
for _, pod := range pods {
319-
_, found := instance.Status.Attributes[pod.Name]
320-
if !found {
321-
continue
322-
}
323-
// A node can have various attributes depending on its known state.
324-
// A ContainerID attribute is only present if the node is being started.
325-
attrCID := instance.Status.Attributes[pod.Name].ContainerID
326-
containerFound, podCID := getGaleraContainerID(&pod)
327-
if !containerFound || (attrCID != "" && attrCID != podCID) {
328-
// This gcomm URI was pushed in a pod which was restarted
329-
// before the attribute got cleared, which means the pod
330-
// failed to start galera. Clear the attribute here, and
331-
// reprobe the pod's state in the next reconcile loop
332-
clearPodAttributes(instance, pod.Name)
333-
util.LogForObject(helper, "Pod restarted while galera was starting", instance, "pod", pod.Name, "recorded ID", attrCID)
370+
clearPodAttributes(ctx, instance, node)
334371
}
335372
}
336373
}
@@ -502,6 +539,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
502539
Resources: []string{"galeras"},
503540
Verbs: []string{"get", "list"},
504541
},
542+
{
543+
APIGroups: []string{"mariadb.openstack.org"},
544+
Resources: []string{"galeras/status"},
545+
Verbs: []string{"get", "list", "update", "patch"},
546+
},
505547
{
506548
APIGroups: []string{"mariadb.openstack.org"},
507549
Resources: []string{"mariadbaccounts"},
@@ -867,8 +909,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
867909
return sfres, sferr
868910
}
869911

870-
// util.LogForObject(helper, fmt.Sprintf("DAM BEFORE %v - AFTER %v", helper.GetBefore(), helper.GetAfter()), instance)
871-
872912
statefulset := commonstatefulset.GetStatefulSet()
873913

874914
// If a full cluster restart was requested,
@@ -892,6 +932,12 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
892932
return ctrl.Result{}, err
893933
}
894934

935+
// NOTE: Pre-push behaviour ONLY
936+
// If we're doing a rolling update to the new way of probing galera,
937+
// there might still be old pods whose probing must be managed by the operator.
938+
// keep track of those pods for later
939+
oldPods := getPodsNotUsingNewReportScript(podList.Items)
940+
oldWaitingPods := getWaitingPodsMissingAttributes(ctx, oldPods, instance, helper, r.config)
895941
//
896942
// Reconstruct the state of the galera resource based on the replicaset and its pods
897943
//
@@ -901,9 +947,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
901947
clearOldPodsAttributesOnScaleDown(ctx, instance)
902948
}
903949

904-
// Ensure that all the ongoing galera start actions are still running
905-
assertPodsAttributesValidity(helper, instance, podList.Items)
906-
907950
// Note:
908951
// . A pod is available in the statefulset if the pod's readiness
909952
// probe returns true (i.e. galera is running in the pod and clustered)
@@ -920,20 +963,14 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
920963
name := pod.Name
921964
if _, found := instance.Status.Attributes[name]; found {
922965
log.Info("Galera started", "pod", name)
923-
clearPodAttributes(instance, name)
966+
clearPodAttributes(ctx, instance, name)
924967
}
925968
}
926969

927-
runningPods := getRunningPodsMissingGcomm(ctx, podList.Items, instance, helper, r.config)
928-
// Special case for 1-node deployment: if the statefulset reports 1 node is available
929-
// but the pod shows up in runningPods (i.e. NotReady), do not consider it a joiner.
930-
// Wait for the two statuses to re-sync after another k8s probe is run.
931-
if *instance.Spec.Replicas == 1 && len(runningPods) == 1 {
932-
log.Info("Galera node no longer running. Requeuing")
933-
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
934-
}
935-
936-
// The other 'Running' pods can join the existing cluster.
970+
// The remaining pods will become joiner nodes. Wait until they have
971+
// reported their status to the operator, and then configure them
972+
// as joiners.
973+
runningPods := getPodsWaitingForGcomm(ctx, podList.Items, instance, helper, r.config)
937974
for _, pod := range runningPods {
938975
name := pod.Name
939976
joinerURI := buildGcommURI(instance)
@@ -942,51 +979,33 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
942979
err := injectGcommURI(ctx, helper, r.config, instance, &pod, joinerURI)
943980
if err != nil {
944981
log.Error(err, "Failed to push gcomm URI", "pod", name)
945-
// A failed injection likely means the pod's status has changed.
946-
// drop it from status and reprobe it in another reconcile loop
947-
clearPodAttributes(instance, name)
982+
// Force an error here to retry this reconcile, and try
983+
// another injection (see details in injectGcommURI())
948984
return ctrl.Result{}, err
949985
}
950986
}
951987
}
952988

953-
// If the cluster is not running, probe the available pods for seqno
954-
// to determine the bootstrap node.
989+
// If the cluster is not running, wait until pods advertise their state
990+
// in the Galera CR, and determine which node to bootstrap from
955991
// Note:
956992
// . a pod whose phase is Running but who is not Ready hasn't started
957993
// galera, it is waiting for the operator's instructions.
958-
// We can record its galera's seqno in our status.
959-
// . any other status means the the pod is starting/restarting. We can't
960-
// exec into the pod yet, so we will probe it in another reconcile loop.
961994
if !instance.Status.Bootstrapped && !isBootstrapInProgress(instance) {
962995
var node string
963996
found := false
964-
for _, pod := range getRunningPodsMissingAttributes(ctx, podList.Items, instance, helper, r.config) {
965-
name := pod.Name
966-
util.LogForObject(helper, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance)
967-
warn, err := retrieveSequenceNumber(ctx, helper, r.config, instance, &pod)
968-
if len(warn) > 0 {
969-
util.LogForObject(helper, fmt.Sprintf("Warning: %q", warn), instance)
970-
}
971-
if err != nil {
972-
log.Error(err, fmt.Sprintf("Failed to retrieve seqno for %s", name))
973-
return ctrl.Result{}, err
974-
}
975-
log.Info(fmt.Sprintf("Attributes retrieved for %s", name),
976-
"UUID", instance.Status.Attributes[name].UUID,
977-
"Seqno", instance.Status.Attributes[name].Seqno,
978-
"SafeToBootstrap", instance.Status.Attributes[name].SafeToBootstrap,
979-
)
980-
if instance.Status.Attributes[name].SafeToBootstrap {
981-
node = name
982-
found = true
983-
break
984-
}
997+
998+
// NOTE: Pre-push behaviour ONLY
999+
// If old pods are still running, the operator must probe their status.
1000+
if len(oldWaitingPods) > 0 {
1001+
retrieveAttributesForPods(ctx, oldWaitingPods, instance, helper, r.config)
9851002
}
9861003

9871004
// Check if we have enough info to bootstrap the cluster now
988-
if !found && int(*instance.Spec.Replicas) > 0 {
989-
node, found = findBestCandidate(instance)
1005+
// (either one of the node reported itself as `safe_to_bootstrap`
1006+
// or all the nodes have reported their state and we'll determine one candidate)
1007+
if int(*instance.Spec.Replicas) > 0 {
1008+
node, found = findBestCandidate(instance, podList.Items, log)
9901009
}
9911010
if found {
9921011
pod := getPodFromName(podList.Items, node)
@@ -995,22 +1014,25 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
9951014
err := injectGcommURI(ctx, helper, r.config, instance, pod, "gcomm://")
9961015
if err != nil {
9971016
log.Error(err, "Failed to push gcomm URI", "pod", node)
998-
// A failed injection likely means the pod's status has changed.
999-
// drop it from status and reprobe it in another reconcile loop
1000-
clearPodAttributes(instance, node)
1017+
// Force an error here to retry this reconcile, and try
1018+
// another injection (see details in injectGcommURI())
10011019
return ctrl.Result{}, err
10021020
}
10031021
}
10041022
}
10051023

1006-
// The statefulset usually instantiates the pods instantly, and the galera
1007-
// operator doesn't receive individual events for pod's phase transition or
1008-
// readiness, as it is not controlling the pods (the statefulset is).
1009-
// So until all pods become available, we have to requeue this event to get
1010-
// a chance to react to all pod's transitions.
1024+
// Stop here if we don't have all galera pods running yet. Another
1025+
// reconciliation will be triggered if either:
1026+
// - a pod (re)starts and push its local state to the galera CR
1027+
// - A pod becomes ready, which bumps the AvailableReplicas in the StatefulSet
10111028
if statefulset.Status.AvailableReplicas != statefulset.Status.Replicas {
1012-
log.Info("Requeuing until all replicas are available")
1013-
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
1029+
// NOTE: Pre-push behaviour ONLY
1030+
// If old pods are still running, requeue until all pods are available
1031+
if len(oldPods) > 0 {
1032+
log.Info("Requeuing until all replicas are available")
1033+
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
1034+
}
1035+
return ctrl.Result{}, nil
10141036
}
10151037

10161038
// We reached the end of the Reconcile, update the Ready condition based on

0 commit comments

Comments
 (0)