From 7d591dcd2592a92963a4fc994ae5727966b924e5 Mon Sep 17 00:00:00 2001 From: Damien Ciabrini Date: Fri, 22 May 2026 11:29:11 +0200 Subject: [PATCH] Let pods report their state to the operator Reverse the introspection approach for bootstrapping the cluster. Until now, the mariadb operator was in charge in running scripts inside pods to extract the state of the mysql database. Now the pod are responsible for reporting the state of their database to the operator, by updating their associated galera CR via calls to the API server. This allows asynchronous reporting from the operator's perspective, and ensure that newly (re)started pod will always update the operator state, preventing stale data to linger in the galera CR status. Jira: OSPRH-23075 --- go.mod | 2 +- internal/controller/galera_controller.go | 218 ++++++++++-------- internal/mariadb/volumes.go | 4 + .../galera/bin/detect_gcomm_and_start.sh | 20 +- templates/galera/bin/detect_last_commit.sh | 12 +- .../galera/bin/report_local_galera_state.py | 170 ++++++++++++++ 6 files changed, 325 insertions(+), 101 deletions(-) create mode 100755 templates/galera/bin/report_local_galera_state.py diff --git a/go.mod b/go.mod index 79d59c7f..d609994d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.20260515134210-2e2a0d06648c github.com/openstack-k8s-operators/mariadb-operator/api v0.0.0-00010101000000-000000000000 go.uber.org/zap v1.28.0 - golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 k8s.io/api v0.31.14 k8s.io/apimachinery v0.31.14 k8s.io/client-go v0.31.14 @@ -82,6 +81,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect diff --git a/internal/controller/galera_controller.go b/internal/controller/galera_controller.go index 199e56d9..3b8bbef2 100644 --- a/internal/controller/galera_controller.go +++ b/internal/controller/galera_controller.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "sort" "strconv" "strings" @@ -51,8 +52,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/kubectl/pkg/util/podutils" - "golang.org/x/exp/maps" - "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -110,12 +109,21 @@ func GetLog(ctx context.Context, controller string) logr.Logger { // // findBestCandidate returns the node with the lowest seqno -func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) { - sortednodes := maps.Keys(g.Status.Attributes) - sort.Strings(sortednodes) +func findBestCandidate(g *mariadbv1.Galera, pods []corev1.Pod, log logr.Logger) (node string, found bool) { + log.Info("Known attributes for bootstrapping the cluster", "attributes", g.Status.Attributes) + var sortedNodes []string + for _, pod := range pods { + if cidFound, cid := getGaleraContainerID(&pod); cidFound { + attr, attrFound := g.Status.Attributes[pod.Name] + if attrFound && attr.ContainerID == cid { + sortedNodes = append(sortedNodes, pod.Name) + } + } + } + sort.Strings(sortedNodes) bestnode := "" bestseqno := -1 - for _, node := range sortednodes { + for _, node := range sortedNodes { // On clean shutdown, galera sets the last // stopped node as 'safe to bootstrap', so use // this hint when we can @@ -131,7 +139,7 @@ func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) { } // if we pass here, a candidate is only valid if we // inspected all the expected replicas (e.g. typically 3) - if len(g.Status.Attributes) != int(*g.Spec.Replicas) { + if len(sortedNodes) != int(*g.Spec.Replicas) { return "", false } return bestnode, true //"galera-0" @@ -186,15 +194,42 @@ func getReadyPods(pods []corev1.Pod) (ret []corev1.Pod) { return } -// getRunningPodsMissingAttributes returns all the pods for which the operator +// getWaitingPodsNotUsingNewReportScript filters out pods which are not yet +// using the new galera state report script (pre-push behaviour ONLY) +func getPodsNotUsingNewReportScript(pods []corev1.Pod) (ret []corev1.Pod) { + for _, pod := range pods { + cmIdx := slices.IndexFunc(pod.Spec.Volumes, func(v corev1.Volume) bool { + return v.Name == "operator-scripts" + }) + cm := *pod.Spec.Volumes[cmIdx].ConfigMap + scriptIdx := slices.IndexFunc(cm.Items, func(k corev1.KeyToPath) bool { + return k.Key == "report_local_galera_state.py" + }) + if scriptIdx == -1 { + ret = append(ret, pod) + } + } + return +} + +// getWaitingPodsMissingAttributes returns all the pods for which the operator // has no seqno information, and which are ready for being inspected. // Note: a pod is considered 'ready for inspection' when its main container is // started and its inner process is currently waiting for a gcomm URI // (i.e. it is not running mysqld) -func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { +func getWaitingPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { for _, pod := range pods { if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) { _, attrFound := instance.Status.Attributes[pod.Name] + if attrFound { + cidFound, cid := getGaleraContainerID(&pod) + if !cidFound || cid != instance.Status.Attributes[pod.Name].ContainerID { + // The info we have for this pod came from an old container, + // so drop it and mark this pod as missing attributes + clearPodAttributes(ctx, instance, pod.Name) + attrFound = false + } + } if !attrFound && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) { ret = append(ret, pod) } @@ -203,12 +238,28 @@ func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, ins return } -// getRunningPodsMissingGcomm returns all the pods which are not running galera +// retrieveAttributesForPods fetches galera database status from pods which are not +// not yet using the new galera state report script (pre-push behaviour ONLY) +func retrieveAttributesForPods(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) { + for _, pod := range pods { + name := pod.Name + util.LogForObject(h, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance) + warn, err := retrieveSequenceNumber(ctx, h, config, instance, &pod) + if len(warn) > 0 { + util.LogForObject(h, fmt.Sprintf("Warning: %q", warn), instance) + } + if err != nil { + util.LogForObject(h, fmt.Sprintf("Failed to retrieve seqno: %q", err), instance) + } + } +} + +// getPodsWaitingForGcomm returns all the pods which are not running galera // yet but are ready to join the cluster. -// Note: a pod is considered 'ready to join' when its main container is +// NOTE: a pod is considered 'ready to join' when its main container is // started and its inner process is currently waiting for a gcomm URI // (i.e. it is not running mysqld) -func getRunningPodsMissingGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { +func getPodsWaitingForGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { for _, pod := range pods { if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) { @@ -259,10 +310,18 @@ func injectGcommURI(ctx context.Context, h *helper.Helper, config *rest.Config, func(_ *bytes.Buffer, _ *bytes.Buffer) error { attr := instance.Status.Attributes[pod.Name] attr.Gcomm = uri - attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID instance.Status.Attributes[pod.Name] = attr return nil }) + // If we could not push the file in the pod, this might be due + // to a transient connection error. Return an error, so that + // this reconcile event gets reprocessed, to give us a chance + // to re-inject the URI into the pod. + // + // NOTE: if the error was due to a pod being restarted, the next + // processing of this reconcile event will automatically clean up + // the outdated attributes and wait for the newly restarted pod + // to publish up-to-date state before injecting any URI. return err } @@ -279,6 +338,8 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config * if stderr.Len() > 0 { errStr = strings.Split(strings.TrimSuffix(stderr.String(), "\n"), "\n") } + // Remember the container from which the probe was extracted + attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID instance.Status.Attributes[pod.Name] = attr return nil }) @@ -286,8 +347,10 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config * } // clearPodAttributes clears information known by the operator about a pod -func clearPodAttributes(instance *mariadbv1.Galera, podName string) { +func clearPodAttributes(ctx context.Context, instance *mariadbv1.Galera, podName string) { delete(instance.Status.Attributes, podName) + log := GetLog(ctx, "galera") + log.Info("Clear tracked attributes for pod", "pod", podName, "instance", instance) // If the pod was deemed safeToBootstrap, this state has to be reassessed if instance.Status.SafeToBootstrap == podName { instance.Status.SafeToBootstrap = "" @@ -297,7 +360,6 @@ func clearPodAttributes(instance *mariadbv1.Galera, podName string) { // clearOldPodsAttributesOnScaleDown removes known information from old pods // that no longer exist after a scale down of the galera CR func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1.Galera) { - log := GetLog(ctx, "galera") replicas := int(*instance.Spec.Replicas) // a pod's name is built as 'statefulsetname-n' @@ -305,32 +367,7 @@ func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1. parts := strings.Split(node, "-") index, _ := strconv.Atoi(parts[len(parts)-1]) if index >= replicas { - clearPodAttributes(instance, node) - log.Info("Remove old pod from status after scale-down", "instance", instance, "pod", node) - } - } -} - -// assertPodsAttributesValidity compares the current state of the pods that are starting galera -// against their known state in the CR's attributes. If a pod's attributes don't match its actual -// state (i.e. it failed to start galera), the attributes are cleared from the CR's status -func assertPodsAttributesValidity(helper *helper.Helper, instance *mariadbv1.Galera, pods []corev1.Pod) { - for _, pod := range pods { - _, found := instance.Status.Attributes[pod.Name] - if !found { - continue - } - // A node can have various attributes depending on its known state. - // A ContainerID attribute is only present if the node is being started. - attrCID := instance.Status.Attributes[pod.Name].ContainerID - containerFound, podCID := getGaleraContainerID(&pod) - if !containerFound || (attrCID != "" && attrCID != podCID) { - // This gcomm URI was pushed in a pod which was restarted - // before the attribute got cleared, which means the pod - // failed to start galera. Clear the attribute here, and - // reprobe the pod's state in the next reconcile loop - clearPodAttributes(instance, pod.Name) - util.LogForObject(helper, "Pod restarted while galera was starting", instance, "pod", pod.Name, "recorded ID", attrCID) + clearPodAttributes(ctx, instance, node) } } } @@ -502,6 +539,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res Resources: []string{"galeras"}, Verbs: []string{"get", "list"}, }, + { + APIGroups: []string{"mariadb.openstack.org"}, + Resources: []string{"galeras/status"}, + Verbs: []string{"get", "list", "update", "patch"}, + }, { APIGroups: []string{"mariadb.openstack.org"}, Resources: []string{"mariadbaccounts"}, @@ -867,8 +909,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return sfres, sferr } - // util.LogForObject(helper, fmt.Sprintf("DAM BEFORE %v - AFTER %v", helper.GetBefore(), helper.GetAfter()), instance) - statefulset := commonstatefulset.GetStatefulSet() // If a full cluster restart was requested, @@ -892,6 +932,12 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, err } + // NOTE: Pre-push behaviour ONLY + // If we're doing a rolling update to the new way of probing galera, + // there might still be old pods whose probing must be managed by the operator. + // keep track of those pods for later + oldPods := getPodsNotUsingNewReportScript(podList.Items) + oldWaitingPods := getWaitingPodsMissingAttributes(ctx, oldPods, instance, helper, r.config) // // Reconstruct the state of the galera resource based on the replicaset and its pods // @@ -901,9 +947,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res clearOldPodsAttributesOnScaleDown(ctx, instance) } - // Ensure that all the ongoing galera start actions are still running - assertPodsAttributesValidity(helper, instance, podList.Items) - // Note: // . A pod is available in the statefulset if the pod's readiness // probe returns true (i.e. galera is running in the pod and clustered) @@ -920,20 +963,14 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res name := pod.Name if _, found := instance.Status.Attributes[name]; found { log.Info("Galera started", "pod", name) - clearPodAttributes(instance, name) + clearPodAttributes(ctx, instance, name) } } - runningPods := getRunningPodsMissingGcomm(ctx, podList.Items, instance, helper, r.config) - // Special case for 1-node deployment: if the statefulset reports 1 node is available - // but the pod shows up in runningPods (i.e. NotReady), do not consider it a joiner. - // Wait for the two statuses to re-sync after another k8s probe is run. - if *instance.Spec.Replicas == 1 && len(runningPods) == 1 { - log.Info("Galera node no longer running. Requeuing") - return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil - } - - // The other 'Running' pods can join the existing cluster. + // The remaining pods will become joiner nodes. Wait until they have + // reported their status to the operator, and then configure them + // as joiners. + runningPods := getPodsWaitingForGcomm(ctx, podList.Items, instance, helper, r.config) for _, pod := range runningPods { name := pod.Name joinerURI := buildGcommURI(instance) @@ -942,51 +979,33 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res err := injectGcommURI(ctx, helper, r.config, instance, &pod, joinerURI) if err != nil { log.Error(err, "Failed to push gcomm URI", "pod", name) - // A failed injection likely means the pod's status has changed. - // drop it from status and reprobe it in another reconcile loop - clearPodAttributes(instance, name) + // Force an error here to retry this reconcile, and try + // another injection (see details in injectGcommURI()) return ctrl.Result{}, err } } } - // If the cluster is not running, probe the available pods for seqno - // to determine the bootstrap node. + // If the cluster is not running, wait until pods advertise their state + // in the Galera CR, and determine which node to bootstrap from // Note: // . a pod whose phase is Running but who is not Ready hasn't started // galera, it is waiting for the operator's instructions. - // We can record its galera's seqno in our status. - // . any other status means the the pod is starting/restarting. We can't - // exec into the pod yet, so we will probe it in another reconcile loop. if !instance.Status.Bootstrapped && !isBootstrapInProgress(instance) { var node string found := false - for _, pod := range getRunningPodsMissingAttributes(ctx, podList.Items, instance, helper, r.config) { - name := pod.Name - util.LogForObject(helper, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance) - warn, err := retrieveSequenceNumber(ctx, helper, r.config, instance, &pod) - if len(warn) > 0 { - util.LogForObject(helper, fmt.Sprintf("Warning: %q", warn), instance) - } - if err != nil { - log.Error(err, fmt.Sprintf("Failed to retrieve seqno for %s", name)) - return ctrl.Result{}, err - } - log.Info(fmt.Sprintf("Attributes retrieved for %s", name), - "UUID", instance.Status.Attributes[name].UUID, - "Seqno", instance.Status.Attributes[name].Seqno, - "SafeToBootstrap", instance.Status.Attributes[name].SafeToBootstrap, - ) - if instance.Status.Attributes[name].SafeToBootstrap { - node = name - found = true - break - } + + // NOTE: Pre-push behaviour ONLY + // If old pods are still running, the operator must probe their status. + if len(oldWaitingPods) > 0 { + retrieveAttributesForPods(ctx, oldWaitingPods, instance, helper, r.config) } // Check if we have enough info to bootstrap the cluster now - if !found && int(*instance.Spec.Replicas) > 0 { - node, found = findBestCandidate(instance) + // (either one of the node reported itself as `safe_to_bootstrap` + // or all the nodes have reported their state and we'll determine one candidate) + if int(*instance.Spec.Replicas) > 0 { + node, found = findBestCandidate(instance, podList.Items, log) } if found { pod := getPodFromName(podList.Items, node) @@ -995,22 +1014,25 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res err := injectGcommURI(ctx, helper, r.config, instance, pod, "gcomm://") if err != nil { log.Error(err, "Failed to push gcomm URI", "pod", node) - // A failed injection likely means the pod's status has changed. - // drop it from status and reprobe it in another reconcile loop - clearPodAttributes(instance, node) + // Force an error here to retry this reconcile, and try + // another injection (see details in injectGcommURI()) return ctrl.Result{}, err } } } - // The statefulset usually instantiates the pods instantly, and the galera - // operator doesn't receive individual events for pod's phase transition or - // readiness, as it is not controlling the pods (the statefulset is). - // So until all pods become available, we have to requeue this event to get - // a chance to react to all pod's transitions. + // Stop here if we don't have all galera pods running yet. Another + // reconciliation will be triggered if either: + // - a pod (re)starts and push its local state to the galera CR + // - A pod becomes ready, which bumps the AvailableReplicas in the StatefulSet if statefulset.Status.AvailableReplicas != statefulset.Status.Replicas { - log.Info("Requeuing until all replicas are available") - return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil + // NOTE: Pre-push behaviour ONLY + // If old pods are still running, requeue until all pods are available + if len(oldPods) > 0 { + log.Info("Requeuing until all replicas are available") + return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil + } + return ctrl.Result{}, nil } // We reached the end of the Reconcile, update the Ready condition based on diff --git a/internal/mariadb/volumes.go b/internal/mariadb/volumes.go index 2e9bb53e..6dd08275 100644 --- a/internal/mariadb/volumes.go +++ b/internal/mariadb/volumes.go @@ -96,6 +96,10 @@ func getGaleraVolumes(g *mariadbv1.Galera) []corev1.Volume { Key: "detect_last_commit.sh", Path: "detect_last_commit.sh", }, + { + Key: "report_local_galera_state.py", + Path: "report_local_galera_state.py", + }, { Key: "detect_gcomm_and_start.sh", Path: "detect_gcomm_and_start.sh", diff --git a/templates/galera/bin/detect_gcomm_and_start.sh b/templates/galera/bin/detect_gcomm_and_start.sh index 0c7c337b..a73d8e2d 100755 --- a/templates/galera/bin/detect_gcomm_and_start.sh +++ b/templates/galera/bin/detect_gcomm_and_start.sh @@ -5,7 +5,12 @@ set -eu # OSPRH-27031: Conditional sourcing for backwards compatibility with old pods # where script is updated but mysql_root_auth.sh is not yet available if [ -f /var/lib/operator-scripts/mysql_root_auth.sh ]; then - source /var/lib/operator-scripts/mysql_root_auth.sh + # When this container starts, retrieve up-to-date DB credentials + # before starting the local mysql server: + # - disable password check, as when this script runs, + # the mysql server has not started yet. + # - discard warning logged while fetching new credentials + MYSQL_ROOT_AUTH_BYPASS_CHECKS=true source /var/lib/operator-scripts/mysql_root_auth.sh 2>/dev/null else # Old pod restart scenario: script updated but mysql_root_auth.sh not available if [ -z "${DB_ROOT_PASSWORD}" ]; then @@ -19,6 +24,19 @@ URI_FILE=/var/lib/mysql/gcomm_uri rm -f /var/lib/mysql/mysql.sock rm -f $URI_FILE +# Discover the state of the local galera database and report it +# back to the galera CR so the mariadb-operator can decide how +# to start mysqld in this container. +if [ -f /var/lib/operator-scripts/report_local_galera_state.py ]; then + LOGFILE=$(my_print_defaults mysqld | grep log-error | cut -d= -f2) + if [ -f "${LOGFILE}" ]; then + REPORT_ARGS="--file ${LOGFILE}" + else + REPORT_ARGS="" + fi + python3 /var/lib/operator-scripts/report_local_galera_state.py $REPORT_ARGS --push +fi + echo "Waiting for gcomm URI to be configured for this POD" while [ ! -f $URI_FILE ]; do sleep 2 diff --git a/templates/galera/bin/detect_last_commit.sh b/templates/galera/bin/detect_last_commit.sh index 58af23c5..079d35b9 100755 --- a/templates/galera/bin/detect_last_commit.sh +++ b/templates/galera/bin/detect_last_commit.sh @@ -5,7 +5,12 @@ set -eu # OSPRH-27031: Conditional sourcing for backwards compatibility with old pods # where script is updated but mysql_root_auth.sh is not yet available if [ -f /var/lib/operator-scripts/mysql_root_auth.sh ]; then - source /var/lib/operator-scripts/mysql_root_auth.sh + # Make sure we are using up-to-date DB credentials for inspecting + # the state of the local mysql: + # - disable password check, as when this script runs, + # the mysql server has not started yet. + # - discard warning logged while fetching new credentials + MYSQL_ROOT_AUTH_BYPASS_CHECKS=true source /var/lib/operator-scripts/mysql_root_auth.sh 2>/dev/null else export MYSQL_PWD="${DB_ROOT_PASSWORD}" fi @@ -38,6 +43,11 @@ function json_summary { trap json_summary EXIT +if pgrep mysqld >/dev/null; then + echo "Cannot run this script while mysqld is running! Aborting" >&2 + exit 2 +fi + # codership/galera#354 # Some ungraceful shutdowns can leave an empty gvwstate.dat on # disk. This will prevent galera to join the cluster if it is diff --git a/templates/galera/bin/report_local_galera_state.py b/templates/galera/bin/report_local_galera_state.py new file mode 100755 index 00000000..32388b3d --- /dev/null +++ b/templates/galera/bin/report_local_galera_state.py @@ -0,0 +1,170 @@ +#!/usr/bin/python3 +# -*- python -*- + +import argparse +import json +import logging +import os +import re +import socket +import ssl +import subprocess +import sys +from time import sleep +from urllib.error import HTTPError +from urllib.request import Request, urlopen + +LOG_FORMAT = "%(asctime)s - %(filename)s - %(levelname)s: %(message)s" + + +class APIServer: + """REST call to the k8s API server and automatic json (un)marshalling""" + + def __init__(self, namespace: str, token: str, cert: str, timeout: int = 10, + api_server: str = "https://kubernetes.default.svc"): + self.api_server = api_server + self.namespace = namespace + self.token = token + self.cert = cert + self.timeout = timeout + + def get(self, url: str): + context = ssl.create_default_context(cafile=self.cert) + req = Request(self.api_server + url) + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", "Bearer " + self.token) + res = urlopen(req, timeout=self.timeout, context=context) + if res.status == 200: + content = res.read() + data = json.loads(content) + return data + else: + raise Exception("get error") + + def put(self, url: str, data: str): + jsondata = json.dumps(data).encode() + context = ssl.create_default_context(cafile=self.cert) + req = Request(self.api_server + url, data=jsondata, method='PUT') + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", "Bearer " + self.token) + res = urlopen(req, timeout=self.timeout, context=context) + return res + + +def retry(times, backoff=1.3): + """Primitive retry decorator with exponential backoff""" + + def decorator(func): + def wrapped_func(*args, **kwargs): + attempt = 0 + sleep_time = 1 + last_error = None + while attempt < times: + try: + return func(*args, **kwargs) + except HTTPError as e: + last_error = e + if e.code == 403: # forbidden + raise Exception("API access no longer authorized") + elif e.code == 409: # conflict + log.warning(f"object changed, retrying {func.__name__}()") + except KeyError as e: + last_error = e + log.warning(f"expected input not present ({e}), retrying {func.__name__}()") + # pass here if there was an caught exception + sleep(sleep_time) + sleep_time *= backoff + attempt += 1 + raise Exception(f"Error after retrying {func.__name__} {times} times: {last_error}") + return wrapped_func + return decorator + + +@retry(10) +def get_cid(api: APIServer, namespace: str, pod_name: str) -> str: + pod: dict = api.get(f"/api/v1/namespaces/{namespace}/pods/{pod_name}") + if 'status' in pod and 'containerStatuses' in pod['status']: + for cstatus in pod['status']['containerStatuses']: + if cstatus['name'] == 'galera': + # status of container should report 'running' for the containerID + # key to be present in the returned status, otherwise a retry is attempted + return cstatus['containerID'] + raise KeyError("ContainerID not found for 'galera'") + + +def inspect_galera_state(): + status = subprocess.run(['bash', '/var/lib/operator-scripts/detect_last_commit.sh'], capture_output=True) + if status.returncode == 2: + raise Exception("mysqld seems to be running, cannot inspect local state") + assert status.returncode == 0 + detected = json.loads(status.stdout) + return detected + + +@retry(10) +def push_state(api, namespace, galera_name, pod_name, detected): + galera_cr = api.get(f"/apis/mariadb.openstack.org/v1beta1/namespaces/{namespace}/galeras/{galera_name}/status") + attributes = galera_cr['status'].get('attributes', {}) + attributes[pod_name] = detected + galera_cr['status']['attributes'] = attributes + api.put(f"/apis/mariadb.openstack.org/v1beta1/namespaces/{namespace}/galeras/{galera_name}/status", galera_cr) + + +def report_state(push: bool): + pod_name = socket.gethostname() + galera_name = re.sub(r'-galera-[0-9]*', '', pod_name) + + svc_account = "/var/run/secrets/kubernetes.io/serviceaccount" + with open(os.path.join(svc_account, "namespace")) as f: + namespace = f.read() + with open(os.path.join(svc_account, "token")) as f: + token = f.read() + cert = os.path.join(svc_account, "ca.crt") + + api = APIServer(namespace, token, cert) + + try: + log.info("Retrieving the ID of the container we are running in") + cid = get_cid(api, namespace, pod_name) + log.info(f"found: {cid}") + + log.info(f"Extracting the Galera state from the local mariadb database on {pod_name}") + detected = inspect_galera_state() + log.info(f"found: {detected}") + detected['containerID'] = cid + + if push: + log.info(f"Saving the extracted Galera state into the status of Galera CR {galera_name} - {detected}") + push_state(api, namespace, galera_name, pod_name, detected) + else: + log.info(f"Local galera state detected: {detected}") + + except Exception as e: + log.error(f"Unexpected error while inspecting local galera state: {type(e).__name__}: {e}") + sys.exit(1) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Report the state of the local mysql database") + parser.add_argument("-p", "--push", + help="Push the computed state to the linked galera CR", + action=argparse.BooleanOptionalAction) + parser.add_argument("-f", "--file", + help="Also log output of this script to file", + ) + opts = parser.parse_args() + + log = logging.getLogger(sys.argv[0]) + log.setLevel(logging.DEBUG) + formatter = logging.Formatter(LOG_FORMAT) + + output = logging.StreamHandler(sys.stdout) + output.setFormatter(formatter) + log.addHandler(output) + + if opts.file: + file_output = logging.FileHandler(opts.file, mode='a') + file_output.setFormatter(formatter) + log.addHandler(file_output) + + report_state(opts.push)