Skip to content

Commit a569a68

Browse files
Merge pull request #472 from dciabrin/seqno-push
Let pods report their state to the operator
2 parents 20e7938 + 7d591dc commit a569a68

6 files changed

Lines changed: 325 additions & 101 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ require (
1111
github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.20260515134210-2e2a0d06648c
1212
github.com/openstack-k8s-operators/mariadb-operator/api v0.0.0-00010101000000-000000000000
1313
go.uber.org/zap v1.28.0
14-
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
1514
k8s.io/api v0.31.14
1615
k8s.io/apimachinery v0.31.14
1716
k8s.io/client-go v0.31.14
@@ -82,6 +81,7 @@ require (
8281
go.uber.org/multierr v1.11.0 // indirect
8382
go.yaml.in/yaml/v2 v2.4.2 // indirect
8483
go.yaml.in/yaml/v3 v3.0.4 // indirect
84+
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
8585
golang.org/x/mod v0.32.0 // indirect
8686
golang.org/x/net v0.49.0 // indirect
8787
golang.org/x/oauth2 v0.30.0 // indirect

internal/controller/galera_controller.go

Lines changed: 120 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"encoding/json"
2424
"errors"
2525
"fmt"
26+
"slices"
2627
"sort"
2728
"strconv"
2829
"strings"
@@ -51,8 +52,6 @@ import (
5152
"k8s.io/client-go/rest"
5253
"k8s.io/kubectl/pkg/util/podutils"
5354

54-
"golang.org/x/exp/maps"
55-
5655
"github.com/go-logr/logr"
5756
"k8s.io/apimachinery/pkg/runtime"
5857
ctrl "sigs.k8s.io/controller-runtime"
@@ -110,12 +109,21 @@ func GetLog(ctx context.Context, controller string) logr.Logger {
110109
//
111110

112111
// findBestCandidate returns the node with the lowest seqno
113-
func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) {
114-
sortednodes := maps.Keys(g.Status.Attributes)
115-
sort.Strings(sortednodes)
112+
func findBestCandidate(g *mariadbv1.Galera, pods []corev1.Pod, log logr.Logger) (node string, found bool) {
113+
log.Info("Known attributes for bootstrapping the cluster", "attributes", g.Status.Attributes)
114+
var sortedNodes []string
115+
for _, pod := range pods {
116+
if cidFound, cid := getGaleraContainerID(&pod); cidFound {
117+
attr, attrFound := g.Status.Attributes[pod.Name]
118+
if attrFound && attr.ContainerID == cid {
119+
sortedNodes = append(sortedNodes, pod.Name)
120+
}
121+
}
122+
}
123+
sort.Strings(sortedNodes)
116124
bestnode := ""
117125
bestseqno := -1
118-
for _, node := range sortednodes {
126+
for _, node := range sortedNodes {
119127
// On clean shutdown, galera sets the last
120128
// stopped node as 'safe to bootstrap', so use
121129
// this hint when we can
@@ -131,7 +139,7 @@ func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) {
131139
}
132140
// if we pass here, a candidate is only valid if we
133141
// inspected all the expected replicas (e.g. typically 3)
134-
if len(g.Status.Attributes) != int(*g.Spec.Replicas) {
142+
if len(sortedNodes) != int(*g.Spec.Replicas) {
135143
return "", false
136144
}
137145
return bestnode, true //"galera-0"
@@ -186,15 +194,42 @@ func getReadyPods(pods []corev1.Pod) (ret []corev1.Pod) {
186194
return
187195
}
188196

189-
// getRunningPodsMissingAttributes returns all the pods for which the operator
197+
// getWaitingPodsNotUsingNewReportScript filters out pods which are not yet
198+
// using the new galera state report script (pre-push behaviour ONLY)
199+
func getPodsNotUsingNewReportScript(pods []corev1.Pod) (ret []corev1.Pod) {
200+
for _, pod := range pods {
201+
cmIdx := slices.IndexFunc(pod.Spec.Volumes, func(v corev1.Volume) bool {
202+
return v.Name == "operator-scripts"
203+
})
204+
cm := *pod.Spec.Volumes[cmIdx].ConfigMap
205+
scriptIdx := slices.IndexFunc(cm.Items, func(k corev1.KeyToPath) bool {
206+
return k.Key == "report_local_galera_state.py"
207+
})
208+
if scriptIdx == -1 {
209+
ret = append(ret, pod)
210+
}
211+
}
212+
return
213+
}
214+
215+
// getWaitingPodsMissingAttributes returns all the pods for which the operator
190216
// has no seqno information, and which are ready for being inspected.
191217
// Note: a pod is considered 'ready for inspection' when its main container is
192218
// started and its inner process is currently waiting for a gcomm URI
193219
// (i.e. it is not running mysqld)
194-
func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
220+
func getWaitingPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
195221
for _, pod := range pods {
196222
if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) {
197223
_, attrFound := instance.Status.Attributes[pod.Name]
224+
if attrFound {
225+
cidFound, cid := getGaleraContainerID(&pod)
226+
if !cidFound || cid != instance.Status.Attributes[pod.Name].ContainerID {
227+
// The info we have for this pod came from an old container,
228+
// so drop it and mark this pod as missing attributes
229+
clearPodAttributes(ctx, instance, pod.Name)
230+
attrFound = false
231+
}
232+
}
198233
if !attrFound && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) {
199234
ret = append(ret, pod)
200235
}
@@ -203,12 +238,28 @@ func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, ins
203238
return
204239
}
205240

206-
// getRunningPodsMissingGcomm returns all the pods which are not running galera
241+
// retrieveAttributesForPods fetches galera database status from pods which are not
242+
// not yet using the new galera state report script (pre-push behaviour ONLY)
243+
func retrieveAttributesForPods(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) {
244+
for _, pod := range pods {
245+
name := pod.Name
246+
util.LogForObject(h, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance)
247+
warn, err := retrieveSequenceNumber(ctx, h, config, instance, &pod)
248+
if len(warn) > 0 {
249+
util.LogForObject(h, fmt.Sprintf("Warning: %q", warn), instance)
250+
}
251+
if err != nil {
252+
util.LogForObject(h, fmt.Sprintf("Failed to retrieve seqno: %q", err), instance)
253+
}
254+
}
255+
}
256+
257+
// getPodsWaitingForGcomm returns all the pods which are not running galera
207258
// yet but are ready to join the cluster.
208-
// Note: a pod is considered 'ready to join' when its main container is
259+
// NOTE: a pod is considered 'ready to join' when its main container is
209260
// started and its inner process is currently waiting for a gcomm URI
210261
// (i.e. it is not running mysqld)
211-
func getRunningPodsMissingGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
262+
func getPodsWaitingForGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) {
212263
for _, pod := range pods {
213264
if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) &&
214265
isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) {
@@ -259,10 +310,18 @@ func injectGcommURI(ctx context.Context, h *helper.Helper, config *rest.Config,
259310
func(_ *bytes.Buffer, _ *bytes.Buffer) error {
260311
attr := instance.Status.Attributes[pod.Name]
261312
attr.Gcomm = uri
262-
attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID
263313
instance.Status.Attributes[pod.Name] = attr
264314
return nil
265315
})
316+
// If we could not push the file in the pod, this might be due
317+
// to a transient connection error. Return an error, so that
318+
// this reconcile event gets reprocessed, to give us a chance
319+
// to re-inject the URI into the pod.
320+
//
321+
// NOTE: if the error was due to a pod being restarted, the next
322+
// processing of this reconcile event will automatically clean up
323+
// the outdated attributes and wait for the newly restarted pod
324+
// to publish up-to-date state before injecting any URI.
266325
return err
267326
}
268327

@@ -279,15 +338,19 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config *
279338
if stderr.Len() > 0 {
280339
errStr = strings.Split(strings.TrimSuffix(stderr.String(), "\n"), "\n")
281340
}
341+
// Remember the container from which the probe was extracted
342+
attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID
282343
instance.Status.Attributes[pod.Name] = attr
283344
return nil
284345
})
285346
return
286347
}
287348

288349
// clearPodAttributes clears information known by the operator about a pod
289-
func clearPodAttributes(instance *mariadbv1.Galera, podName string) {
350+
func clearPodAttributes(ctx context.Context, instance *mariadbv1.Galera, podName string) {
290351
delete(instance.Status.Attributes, podName)
352+
log := GetLog(ctx, "galera")
353+
log.Info("Clear tracked attributes for pod", "pod", podName, "instance", instance)
291354
// If the pod was deemed safeToBootstrap, this state has to be reassessed
292355
if instance.Status.SafeToBootstrap == podName {
293356
instance.Status.SafeToBootstrap = ""
@@ -297,40 +360,14 @@ func clearPodAttributes(instance *mariadbv1.Galera, podName string) {
297360
// clearOldPodsAttributesOnScaleDown removes known information from old pods
298361
// that no longer exist after a scale down of the galera CR
299362
func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1.Galera) {
300-
log := GetLog(ctx, "galera")
301363
replicas := int(*instance.Spec.Replicas)
302364

303365
// a pod's name is built as 'statefulsetname-n'
304366
for node := range instance.Status.Attributes {
305367
parts := strings.Split(node, "-")
306368
index, _ := strconv.Atoi(parts[len(parts)-1])
307369
if index >= replicas {
308-
clearPodAttributes(instance, node)
309-
log.Info("Remove old pod from status after scale-down", "instance", instance, "pod", node)
310-
}
311-
}
312-
}
313-
314-
// assertPodsAttributesValidity compares the current state of the pods that are starting galera
315-
// against their known state in the CR's attributes. If a pod's attributes don't match its actual
316-
// state (i.e. it failed to start galera), the attributes are cleared from the CR's status
317-
func assertPodsAttributesValidity(helper *helper.Helper, instance *mariadbv1.Galera, pods []corev1.Pod) {
318-
for _, pod := range pods {
319-
_, found := instance.Status.Attributes[pod.Name]
320-
if !found {
321-
continue
322-
}
323-
// A node can have various attributes depending on its known state.
324-
// A ContainerID attribute is only present if the node is being started.
325-
attrCID := instance.Status.Attributes[pod.Name].ContainerID
326-
containerFound, podCID := getGaleraContainerID(&pod)
327-
if !containerFound || (attrCID != "" && attrCID != podCID) {
328-
// This gcomm URI was pushed in a pod which was restarted
329-
// before the attribute got cleared, which means the pod
330-
// failed to start galera. Clear the attribute here, and
331-
// reprobe the pod's state in the next reconcile loop
332-
clearPodAttributes(instance, pod.Name)
333-
util.LogForObject(helper, "Pod restarted while galera was starting", instance, "pod", pod.Name, "recorded ID", attrCID)
370+
clearPodAttributes(ctx, instance, node)
334371
}
335372
}
336373
}
@@ -502,6 +539,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
502539
Resources: []string{"galeras"},
503540
Verbs: []string{"get", "list"},
504541
},
542+
{
543+
APIGroups: []string{"mariadb.openstack.org"},
544+
Resources: []string{"galeras/status"},
545+
Verbs: []string{"get", "list", "update", "patch"},
546+
},
505547
{
506548
APIGroups: []string{"mariadb.openstack.org"},
507549
Resources: []string{"mariadbaccounts"},
@@ -867,8 +909,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
867909
return sfres, sferr
868910
}
869911

870-
// util.LogForObject(helper, fmt.Sprintf("DAM BEFORE %v - AFTER %v", helper.GetBefore(), helper.GetAfter()), instance)
871-
872912
statefulset := commonstatefulset.GetStatefulSet()
873913

874914
// If a full cluster restart was requested,
@@ -892,6 +932,12 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
892932
return ctrl.Result{}, err
893933
}
894934

935+
// NOTE: Pre-push behaviour ONLY
936+
// If we're doing a rolling update to the new way of probing galera,
937+
// there might still be old pods whose probing must be managed by the operator.
938+
// keep track of those pods for later
939+
oldPods := getPodsNotUsingNewReportScript(podList.Items)
940+
oldWaitingPods := getWaitingPodsMissingAttributes(ctx, oldPods, instance, helper, r.config)
895941
//
896942
// Reconstruct the state of the galera resource based on the replicaset and its pods
897943
//
@@ -901,9 +947,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
901947
clearOldPodsAttributesOnScaleDown(ctx, instance)
902948
}
903949

904-
// Ensure that all the ongoing galera start actions are still running
905-
assertPodsAttributesValidity(helper, instance, podList.Items)
906-
907950
// Note:
908951
// . A pod is available in the statefulset if the pod's readiness
909952
// probe returns true (i.e. galera is running in the pod and clustered)
@@ -920,20 +963,14 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
920963
name := pod.Name
921964
if _, found := instance.Status.Attributes[name]; found {
922965
log.Info("Galera started", "pod", name)
923-
clearPodAttributes(instance, name)
966+
clearPodAttributes(ctx, instance, name)
924967
}
925968
}
926969

927-
runningPods := getRunningPodsMissingGcomm(ctx, podList.Items, instance, helper, r.config)
928-
// Special case for 1-node deployment: if the statefulset reports 1 node is available
929-
// but the pod shows up in runningPods (i.e. NotReady), do not consider it a joiner.
930-
// Wait for the two statuses to re-sync after another k8s probe is run.
931-
if *instance.Spec.Replicas == 1 && len(runningPods) == 1 {
932-
log.Info("Galera node no longer running. Requeuing")
933-
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
934-
}
935-
936-
// The other 'Running' pods can join the existing cluster.
970+
// The remaining pods will become joiner nodes. Wait until they have
971+
// reported their status to the operator, and then configure them
972+
// as joiners.
973+
runningPods := getPodsWaitingForGcomm(ctx, podList.Items, instance, helper, r.config)
937974
for _, pod := range runningPods {
938975
name := pod.Name
939976
joinerURI := buildGcommURI(instance)
@@ -942,51 +979,33 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
942979
err := injectGcommURI(ctx, helper, r.config, instance, &pod, joinerURI)
943980
if err != nil {
944981
log.Error(err, "Failed to push gcomm URI", "pod", name)
945-
// A failed injection likely means the pod's status has changed.
946-
// drop it from status and reprobe it in another reconcile loop
947-
clearPodAttributes(instance, name)
982+
// Force an error here to retry this reconcile, and try
983+
// another injection (see details in injectGcommURI())
948984
return ctrl.Result{}, err
949985
}
950986
}
951987
}
952988

953-
// If the cluster is not running, probe the available pods for seqno
954-
// to determine the bootstrap node.
989+
// If the cluster is not running, wait until pods advertise their state
990+
// in the Galera CR, and determine which node to bootstrap from
955991
// Note:
956992
// . a pod whose phase is Running but who is not Ready hasn't started
957993
// galera, it is waiting for the operator's instructions.
958-
// We can record its galera's seqno in our status.
959-
// . any other status means the the pod is starting/restarting. We can't
960-
// exec into the pod yet, so we will probe it in another reconcile loop.
961994
if !instance.Status.Bootstrapped && !isBootstrapInProgress(instance) {
962995
var node string
963996
found := false
964-
for _, pod := range getRunningPodsMissingAttributes(ctx, podList.Items, instance, helper, r.config) {
965-
name := pod.Name
966-
util.LogForObject(helper, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance)
967-
warn, err := retrieveSequenceNumber(ctx, helper, r.config, instance, &pod)
968-
if len(warn) > 0 {
969-
util.LogForObject(helper, fmt.Sprintf("Warning: %q", warn), instance)
970-
}
971-
if err != nil {
972-
log.Error(err, fmt.Sprintf("Failed to retrieve seqno for %s", name))
973-
return ctrl.Result{}, err
974-
}
975-
log.Info(fmt.Sprintf("Attributes retrieved for %s", name),
976-
"UUID", instance.Status.Attributes[name].UUID,
977-
"Seqno", instance.Status.Attributes[name].Seqno,
978-
"SafeToBootstrap", instance.Status.Attributes[name].SafeToBootstrap,
979-
)
980-
if instance.Status.Attributes[name].SafeToBootstrap {
981-
node = name
982-
found = true
983-
break
984-
}
997+
998+
// NOTE: Pre-push behaviour ONLY
999+
// If old pods are still running, the operator must probe their status.
1000+
if len(oldWaitingPods) > 0 {
1001+
retrieveAttributesForPods(ctx, oldWaitingPods, instance, helper, r.config)
9851002
}
9861003

9871004
// Check if we have enough info to bootstrap the cluster now
988-
if !found && int(*instance.Spec.Replicas) > 0 {
989-
node, found = findBestCandidate(instance)
1005+
// (either one of the node reported itself as `safe_to_bootstrap`
1006+
// or all the nodes have reported their state and we'll determine one candidate)
1007+
if int(*instance.Spec.Replicas) > 0 {
1008+
node, found = findBestCandidate(instance, podList.Items, log)
9901009
}
9911010
if found {
9921011
pod := getPodFromName(podList.Items, node)
@@ -995,22 +1014,25 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
9951014
err := injectGcommURI(ctx, helper, r.config, instance, pod, "gcomm://")
9961015
if err != nil {
9971016
log.Error(err, "Failed to push gcomm URI", "pod", node)
998-
// A failed injection likely means the pod's status has changed.
999-
// drop it from status and reprobe it in another reconcile loop
1000-
clearPodAttributes(instance, node)
1017+
// Force an error here to retry this reconcile, and try
1018+
// another injection (see details in injectGcommURI())
10011019
return ctrl.Result{}, err
10021020
}
10031021
}
10041022
}
10051023

1006-
// The statefulset usually instantiates the pods instantly, and the galera
1007-
// operator doesn't receive individual events for pod's phase transition or
1008-
// readiness, as it is not controlling the pods (the statefulset is).
1009-
// So until all pods become available, we have to requeue this event to get
1010-
// a chance to react to all pod's transitions.
1024+
// Stop here if we don't have all galera pods running yet. Another
1025+
// reconciliation will be triggered if either:
1026+
// - a pod (re)starts and push its local state to the galera CR
1027+
// - A pod becomes ready, which bumps the AvailableReplicas in the StatefulSet
10111028
if statefulset.Status.AvailableReplicas != statefulset.Status.Replicas {
1012-
log.Info("Requeuing until all replicas are available")
1013-
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
1029+
// NOTE: Pre-push behaviour ONLY
1030+
// If old pods are still running, requeue until all pods are available
1031+
if len(oldPods) > 0 {
1032+
log.Info("Requeuing until all replicas are available")
1033+
return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil
1034+
}
1035+
return ctrl.Result{}, nil
10141036
}
10151037

10161038
// We reached the end of the Reconcile, update the Ready condition based on

0 commit comments

Comments
 (0)