diff --git a/go.mod b/go.mod index 79d59c7f..d609994d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.20260515134210-2e2a0d06648c github.com/openstack-k8s-operators/mariadb-operator/api v0.0.0-00010101000000-000000000000 go.uber.org/zap v1.28.0 - golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 k8s.io/api v0.31.14 k8s.io/apimachinery v0.31.14 k8s.io/client-go v0.31.14 @@ -82,6 +81,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect diff --git a/internal/controller/galera_controller.go b/internal/controller/galera_controller.go index 199e56d9..3b8bbef2 100644 --- a/internal/controller/galera_controller.go +++ b/internal/controller/galera_controller.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "sort" "strconv" "strings" @@ -51,8 +52,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/kubectl/pkg/util/podutils" - "golang.org/x/exp/maps" - "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -110,12 +109,21 @@ func GetLog(ctx context.Context, controller string) logr.Logger { // // findBestCandidate returns the node with the lowest seqno -func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) { - sortednodes := maps.Keys(g.Status.Attributes) - sort.Strings(sortednodes) +func findBestCandidate(g *mariadbv1.Galera, pods []corev1.Pod, log logr.Logger) (node string, found bool) { + log.Info("Known attributes for bootstrapping the cluster", "attributes", g.Status.Attributes) + var sortedNodes []string + for _, pod := range pods { + if cidFound, cid := getGaleraContainerID(&pod); cidFound { + attr, attrFound := g.Status.Attributes[pod.Name] + if attrFound && attr.ContainerID == cid { + sortedNodes = append(sortedNodes, pod.Name) + } + } + } + sort.Strings(sortedNodes) bestnode := "" bestseqno := -1 - for _, node := range sortednodes { + for _, node := range sortedNodes { // On clean shutdown, galera sets the last // stopped node as 'safe to bootstrap', so use // this hint when we can @@ -131,7 +139,7 @@ func findBestCandidate(g *mariadbv1.Galera) (node string, found bool) { } // if we pass here, a candidate is only valid if we // inspected all the expected replicas (e.g. typically 3) - if len(g.Status.Attributes) != int(*g.Spec.Replicas) { + if len(sortedNodes) != int(*g.Spec.Replicas) { return "", false } return bestnode, true //"galera-0" @@ -186,15 +194,42 @@ func getReadyPods(pods []corev1.Pod) (ret []corev1.Pod) { return } -// getRunningPodsMissingAttributes returns all the pods for which the operator +// getWaitingPodsNotUsingNewReportScript filters out pods which are not yet +// using the new galera state report script (pre-push behaviour ONLY) +func getPodsNotUsingNewReportScript(pods []corev1.Pod) (ret []corev1.Pod) { + for _, pod := range pods { + cmIdx := slices.IndexFunc(pod.Spec.Volumes, func(v corev1.Volume) bool { + return v.Name == "operator-scripts" + }) + cm := *pod.Spec.Volumes[cmIdx].ConfigMap + scriptIdx := slices.IndexFunc(cm.Items, func(k corev1.KeyToPath) bool { + return k.Key == "report_local_galera_state.py" + }) + if scriptIdx == -1 { + ret = append(ret, pod) + } + } + return +} + +// getWaitingPodsMissingAttributes returns all the pods for which the operator // has no seqno information, and which are ready for being inspected. // Note: a pod is considered 'ready for inspection' when its main container is // started and its inner process is currently waiting for a gcomm URI // (i.e. it is not running mysqld) -func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { +func getWaitingPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { for _, pod := range pods { if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) { _, attrFound := instance.Status.Attributes[pod.Name] + if attrFound { + cidFound, cid := getGaleraContainerID(&pod) + if !cidFound || cid != instance.Status.Attributes[pod.Name].ContainerID { + // The info we have for this pod came from an old container, + // so drop it and mark this pod as missing attributes + clearPodAttributes(ctx, instance, pod.Name) + attrFound = false + } + } if !attrFound && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) { ret = append(ret, pod) } @@ -203,12 +238,28 @@ func getRunningPodsMissingAttributes(ctx context.Context, pods []corev1.Pod, ins return } -// getRunningPodsMissingGcomm returns all the pods which are not running galera +// retrieveAttributesForPods fetches galera database status from pods which are not +// not yet using the new galera state report script (pre-push behaviour ONLY) +func retrieveAttributesForPods(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) { + for _, pod := range pods { + name := pod.Name + util.LogForObject(h, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance) + warn, err := retrieveSequenceNumber(ctx, h, config, instance, &pod) + if len(warn) > 0 { + util.LogForObject(h, fmt.Sprintf("Warning: %q", warn), instance) + } + if err != nil { + util.LogForObject(h, fmt.Sprintf("Failed to retrieve seqno: %q", err), instance) + } + } +} + +// getPodsWaitingForGcomm returns all the pods which are not running galera // yet but are ready to join the cluster. -// Note: a pod is considered 'ready to join' when its main container is +// NOTE: a pod is considered 'ready to join' when its main container is // started and its inner process is currently waiting for a gcomm URI // (i.e. it is not running mysqld) -func getRunningPodsMissingGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { +func getPodsWaitingForGcomm(ctx context.Context, pods []corev1.Pod, instance *mariadbv1.Galera, h *helper.Helper, config *rest.Config) (ret []corev1.Pod) { for _, pod := range pods { if pod.Status.Phase == corev1.PodRunning && !podutils.IsPodReady(&pod) && isGaleraContainerStartedAndWaiting(ctx, &pod, instance, h, config) { @@ -259,10 +310,18 @@ func injectGcommURI(ctx context.Context, h *helper.Helper, config *rest.Config, func(_ *bytes.Buffer, _ *bytes.Buffer) error { attr := instance.Status.Attributes[pod.Name] attr.Gcomm = uri - attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID instance.Status.Attributes[pod.Name] = attr return nil }) + // If we could not push the file in the pod, this might be due + // to a transient connection error. Return an error, so that + // this reconcile event gets reprocessed, to give us a chance + // to re-inject the URI into the pod. + // + // NOTE: if the error was due to a pod being restarted, the next + // processing of this reconcile event will automatically clean up + // the outdated attributes and wait for the newly restarted pod + // to publish up-to-date state before injecting any URI. return err } @@ -279,6 +338,8 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config * if stderr.Len() > 0 { errStr = strings.Split(strings.TrimSuffix(stderr.String(), "\n"), "\n") } + // Remember the container from which the probe was extracted + attr.ContainerID = pod.Status.ContainerStatuses[0].ContainerID instance.Status.Attributes[pod.Name] = attr return nil }) @@ -286,8 +347,10 @@ func retrieveSequenceNumber(ctx context.Context, helper *helper.Helper, config * } // clearPodAttributes clears information known by the operator about a pod -func clearPodAttributes(instance *mariadbv1.Galera, podName string) { +func clearPodAttributes(ctx context.Context, instance *mariadbv1.Galera, podName string) { delete(instance.Status.Attributes, podName) + log := GetLog(ctx, "galera") + log.Info("Clear tracked attributes for pod", "pod", podName, "instance", instance) // If the pod was deemed safeToBootstrap, this state has to be reassessed if instance.Status.SafeToBootstrap == podName { instance.Status.SafeToBootstrap = "" @@ -297,7 +360,6 @@ func clearPodAttributes(instance *mariadbv1.Galera, podName string) { // clearOldPodsAttributesOnScaleDown removes known information from old pods // that no longer exist after a scale down of the galera CR func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1.Galera) { - log := GetLog(ctx, "galera") replicas := int(*instance.Spec.Replicas) // a pod's name is built as 'statefulsetname-n' @@ -305,32 +367,7 @@ func clearOldPodsAttributesOnScaleDown(ctx context.Context, instance *mariadbv1. parts := strings.Split(node, "-") index, _ := strconv.Atoi(parts[len(parts)-1]) if index >= replicas { - clearPodAttributes(instance, node) - log.Info("Remove old pod from status after scale-down", "instance", instance, "pod", node) - } - } -} - -// assertPodsAttributesValidity compares the current state of the pods that are starting galera -// against their known state in the CR's attributes. If a pod's attributes don't match its actual -// state (i.e. it failed to start galera), the attributes are cleared from the CR's status -func assertPodsAttributesValidity(helper *helper.Helper, instance *mariadbv1.Galera, pods []corev1.Pod) { - for _, pod := range pods { - _, found := instance.Status.Attributes[pod.Name] - if !found { - continue - } - // A node can have various attributes depending on its known state. - // A ContainerID attribute is only present if the node is being started. - attrCID := instance.Status.Attributes[pod.Name].ContainerID - containerFound, podCID := getGaleraContainerID(&pod) - if !containerFound || (attrCID != "" && attrCID != podCID) { - // This gcomm URI was pushed in a pod which was restarted - // before the attribute got cleared, which means the pod - // failed to start galera. Clear the attribute here, and - // reprobe the pod's state in the next reconcile loop - clearPodAttributes(instance, pod.Name) - util.LogForObject(helper, "Pod restarted while galera was starting", instance, "pod", pod.Name, "recorded ID", attrCID) + clearPodAttributes(ctx, instance, node) } } } @@ -502,6 +539,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res Resources: []string{"galeras"}, Verbs: []string{"get", "list"}, }, + { + APIGroups: []string{"mariadb.openstack.org"}, + Resources: []string{"galeras/status"}, + Verbs: []string{"get", "list", "update", "patch"}, + }, { APIGroups: []string{"mariadb.openstack.org"}, Resources: []string{"mariadbaccounts"}, @@ -867,8 +909,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return sfres, sferr } - // util.LogForObject(helper, fmt.Sprintf("DAM BEFORE %v - AFTER %v", helper.GetBefore(), helper.GetAfter()), instance) - statefulset := commonstatefulset.GetStatefulSet() // If a full cluster restart was requested, @@ -892,6 +932,12 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, err } + // NOTE: Pre-push behaviour ONLY + // If we're doing a rolling update to the new way of probing galera, + // there might still be old pods whose probing must be managed by the operator. + // keep track of those pods for later + oldPods := getPodsNotUsingNewReportScript(podList.Items) + oldWaitingPods := getWaitingPodsMissingAttributes(ctx, oldPods, instance, helper, r.config) // // Reconstruct the state of the galera resource based on the replicaset and its pods // @@ -901,9 +947,6 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res clearOldPodsAttributesOnScaleDown(ctx, instance) } - // Ensure that all the ongoing galera start actions are still running - assertPodsAttributesValidity(helper, instance, podList.Items) - // Note: // . A pod is available in the statefulset if the pod's readiness // probe returns true (i.e. galera is running in the pod and clustered) @@ -920,20 +963,14 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res name := pod.Name if _, found := instance.Status.Attributes[name]; found { log.Info("Galera started", "pod", name) - clearPodAttributes(instance, name) + clearPodAttributes(ctx, instance, name) } } - runningPods := getRunningPodsMissingGcomm(ctx, podList.Items, instance, helper, r.config) - // Special case for 1-node deployment: if the statefulset reports 1 node is available - // but the pod shows up in runningPods (i.e. NotReady), do not consider it a joiner. - // Wait for the two statuses to re-sync after another k8s probe is run. - if *instance.Spec.Replicas == 1 && len(runningPods) == 1 { - log.Info("Galera node no longer running. Requeuing") - return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil - } - - // The other 'Running' pods can join the existing cluster. + // The remaining pods will become joiner nodes. Wait until they have + // reported their status to the operator, and then configure them + // as joiners. + runningPods := getPodsWaitingForGcomm(ctx, podList.Items, instance, helper, r.config) for _, pod := range runningPods { name := pod.Name joinerURI := buildGcommURI(instance) @@ -942,51 +979,33 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res err := injectGcommURI(ctx, helper, r.config, instance, &pod, joinerURI) if err != nil { log.Error(err, "Failed to push gcomm URI", "pod", name) - // A failed injection likely means the pod's status has changed. - // drop it from status and reprobe it in another reconcile loop - clearPodAttributes(instance, name) + // Force an error here to retry this reconcile, and try + // another injection (see details in injectGcommURI()) return ctrl.Result{}, err } } } - // If the cluster is not running, probe the available pods for seqno - // to determine the bootstrap node. + // If the cluster is not running, wait until pods advertise their state + // in the Galera CR, and determine which node to bootstrap from // Note: // . a pod whose phase is Running but who is not Ready hasn't started // galera, it is waiting for the operator's instructions. - // We can record its galera's seqno in our status. - // . any other status means the the pod is starting/restarting. We can't - // exec into the pod yet, so we will probe it in another reconcile loop. if !instance.Status.Bootstrapped && !isBootstrapInProgress(instance) { var node string found := false - for _, pod := range getRunningPodsMissingAttributes(ctx, podList.Items, instance, helper, r.config) { - name := pod.Name - util.LogForObject(helper, fmt.Sprintf("Pod %s running, retrieve seqno", name), instance) - warn, err := retrieveSequenceNumber(ctx, helper, r.config, instance, &pod) - if len(warn) > 0 { - util.LogForObject(helper, fmt.Sprintf("Warning: %q", warn), instance) - } - if err != nil { - log.Error(err, fmt.Sprintf("Failed to retrieve seqno for %s", name)) - return ctrl.Result{}, err - } - log.Info(fmt.Sprintf("Attributes retrieved for %s", name), - "UUID", instance.Status.Attributes[name].UUID, - "Seqno", instance.Status.Attributes[name].Seqno, - "SafeToBootstrap", instance.Status.Attributes[name].SafeToBootstrap, - ) - if instance.Status.Attributes[name].SafeToBootstrap { - node = name - found = true - break - } + + // NOTE: Pre-push behaviour ONLY + // If old pods are still running, the operator must probe their status. + if len(oldWaitingPods) > 0 { + retrieveAttributesForPods(ctx, oldWaitingPods, instance, helper, r.config) } // Check if we have enough info to bootstrap the cluster now - if !found && int(*instance.Spec.Replicas) > 0 { - node, found = findBestCandidate(instance) + // (either one of the node reported itself as `safe_to_bootstrap` + // or all the nodes have reported their state and we'll determine one candidate) + if int(*instance.Spec.Replicas) > 0 { + node, found = findBestCandidate(instance, podList.Items, log) } if found { pod := getPodFromName(podList.Items, node) @@ -995,22 +1014,25 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res err := injectGcommURI(ctx, helper, r.config, instance, pod, "gcomm://") if err != nil { log.Error(err, "Failed to push gcomm URI", "pod", node) - // A failed injection likely means the pod's status has changed. - // drop it from status and reprobe it in another reconcile loop - clearPodAttributes(instance, node) + // Force an error here to retry this reconcile, and try + // another injection (see details in injectGcommURI()) return ctrl.Result{}, err } } } - // The statefulset usually instantiates the pods instantly, and the galera - // operator doesn't receive individual events for pod's phase transition or - // readiness, as it is not controlling the pods (the statefulset is). - // So until all pods become available, we have to requeue this event to get - // a chance to react to all pod's transitions. + // Stop here if we don't have all galera pods running yet. Another + // reconciliation will be triggered if either: + // - a pod (re)starts and push its local state to the galera CR + // - A pod becomes ready, which bumps the AvailableReplicas in the StatefulSet if statefulset.Status.AvailableReplicas != statefulset.Status.Replicas { - log.Info("Requeuing until all replicas are available") - return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil + // NOTE: Pre-push behaviour ONLY + // If old pods are still running, requeue until all pods are available + if len(oldPods) > 0 { + log.Info("Requeuing until all replicas are available") + return ctrl.Result{RequeueAfter: time.Duration(3) * time.Second}, nil + } + return ctrl.Result{}, nil } // We reached the end of the Reconcile, update the Ready condition based on diff --git a/internal/mariadb/volumes.go b/internal/mariadb/volumes.go index 2e9bb53e..6dd08275 100644 --- a/internal/mariadb/volumes.go +++ b/internal/mariadb/volumes.go @@ -96,6 +96,10 @@ func getGaleraVolumes(g *mariadbv1.Galera) []corev1.Volume { Key: "detect_last_commit.sh", Path: "detect_last_commit.sh", }, + { + Key: "report_local_galera_state.py", + Path: "report_local_galera_state.py", + }, { Key: "detect_gcomm_and_start.sh", Path: "detect_gcomm_and_start.sh", diff --git a/templates/galera/bin/detect_gcomm_and_start.sh b/templates/galera/bin/detect_gcomm_and_start.sh index 0c7c337b..a73d8e2d 100755 --- a/templates/galera/bin/detect_gcomm_and_start.sh +++ b/templates/galera/bin/detect_gcomm_and_start.sh @@ -5,7 +5,12 @@ set -eu # OSPRH-27031: Conditional sourcing for backwards compatibility with old pods # where script is updated but mysql_root_auth.sh is not yet available if [ -f /var/lib/operator-scripts/mysql_root_auth.sh ]; then - source /var/lib/operator-scripts/mysql_root_auth.sh + # When this container starts, retrieve up-to-date DB credentials + # before starting the local mysql server: + # - disable password check, as when this script runs, + # the mysql server has not started yet. + # - discard warning logged while fetching new credentials + MYSQL_ROOT_AUTH_BYPASS_CHECKS=true source /var/lib/operator-scripts/mysql_root_auth.sh 2>/dev/null else # Old pod restart scenario: script updated but mysql_root_auth.sh not available if [ -z "${DB_ROOT_PASSWORD}" ]; then @@ -19,6 +24,19 @@ URI_FILE=/var/lib/mysql/gcomm_uri rm -f /var/lib/mysql/mysql.sock rm -f $URI_FILE +# Discover the state of the local galera database and report it +# back to the galera CR so the mariadb-operator can decide how +# to start mysqld in this container. +if [ -f /var/lib/operator-scripts/report_local_galera_state.py ]; then + LOGFILE=$(my_print_defaults mysqld | grep log-error | cut -d= -f2) + if [ -f "${LOGFILE}" ]; then + REPORT_ARGS="--file ${LOGFILE}" + else + REPORT_ARGS="" + fi + python3 /var/lib/operator-scripts/report_local_galera_state.py $REPORT_ARGS --push +fi + echo "Waiting for gcomm URI to be configured for this POD" while [ ! -f $URI_FILE ]; do sleep 2 diff --git a/templates/galera/bin/detect_last_commit.sh b/templates/galera/bin/detect_last_commit.sh index 58af23c5..079d35b9 100755 --- a/templates/galera/bin/detect_last_commit.sh +++ b/templates/galera/bin/detect_last_commit.sh @@ -5,7 +5,12 @@ set -eu # OSPRH-27031: Conditional sourcing for backwards compatibility with old pods # where script is updated but mysql_root_auth.sh is not yet available if [ -f /var/lib/operator-scripts/mysql_root_auth.sh ]; then - source /var/lib/operator-scripts/mysql_root_auth.sh + # Make sure we are using up-to-date DB credentials for inspecting + # the state of the local mysql: + # - disable password check, as when this script runs, + # the mysql server has not started yet. + # - discard warning logged while fetching new credentials + MYSQL_ROOT_AUTH_BYPASS_CHECKS=true source /var/lib/operator-scripts/mysql_root_auth.sh 2>/dev/null else export MYSQL_PWD="${DB_ROOT_PASSWORD}" fi @@ -38,6 +43,11 @@ function json_summary { trap json_summary EXIT +if pgrep mysqld >/dev/null; then + echo "Cannot run this script while mysqld is running! Aborting" >&2 + exit 2 +fi + # codership/galera#354 # Some ungraceful shutdowns can leave an empty gvwstate.dat on # disk. This will prevent galera to join the cluster if it is diff --git a/templates/galera/bin/report_local_galera_state.py b/templates/galera/bin/report_local_galera_state.py new file mode 100755 index 00000000..32388b3d --- /dev/null +++ b/templates/galera/bin/report_local_galera_state.py @@ -0,0 +1,170 @@ +#!/usr/bin/python3 +# -*- python -*- + +import argparse +import json +import logging +import os +import re +import socket +import ssl +import subprocess +import sys +from time import sleep +from urllib.error import HTTPError +from urllib.request import Request, urlopen + +LOG_FORMAT = "%(asctime)s - %(filename)s - %(levelname)s: %(message)s" + + +class APIServer: + """REST call to the k8s API server and automatic json (un)marshalling""" + + def __init__(self, namespace: str, token: str, cert: str, timeout: int = 10, + api_server: str = "https://kubernetes.default.svc"): + self.api_server = api_server + self.namespace = namespace + self.token = token + self.cert = cert + self.timeout = timeout + + def get(self, url: str): + context = ssl.create_default_context(cafile=self.cert) + req = Request(self.api_server + url) + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", "Bearer " + self.token) + res = urlopen(req, timeout=self.timeout, context=context) + if res.status == 200: + content = res.read() + data = json.loads(content) + return data + else: + raise Exception("get error") + + def put(self, url: str, data: str): + jsondata = json.dumps(data).encode() + context = ssl.create_default_context(cafile=self.cert) + req = Request(self.api_server + url, data=jsondata, method='PUT') + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", "Bearer " + self.token) + res = urlopen(req, timeout=self.timeout, context=context) + return res + + +def retry(times, backoff=1.3): + """Primitive retry decorator with exponential backoff""" + + def decorator(func): + def wrapped_func(*args, **kwargs): + attempt = 0 + sleep_time = 1 + last_error = None + while attempt < times: + try: + return func(*args, **kwargs) + except HTTPError as e: + last_error = e + if e.code == 403: # forbidden + raise Exception("API access no longer authorized") + elif e.code == 409: # conflict + log.warning(f"object changed, retrying {func.__name__}()") + except KeyError as e: + last_error = e + log.warning(f"expected input not present ({e}), retrying {func.__name__}()") + # pass here if there was an caught exception + sleep(sleep_time) + sleep_time *= backoff + attempt += 1 + raise Exception(f"Error after retrying {func.__name__} {times} times: {last_error}") + return wrapped_func + return decorator + + +@retry(10) +def get_cid(api: APIServer, namespace: str, pod_name: str) -> str: + pod: dict = api.get(f"/api/v1/namespaces/{namespace}/pods/{pod_name}") + if 'status' in pod and 'containerStatuses' in pod['status']: + for cstatus in pod['status']['containerStatuses']: + if cstatus['name'] == 'galera': + # status of container should report 'running' for the containerID + # key to be present in the returned status, otherwise a retry is attempted + return cstatus['containerID'] + raise KeyError("ContainerID not found for 'galera'") + + +def inspect_galera_state(): + status = subprocess.run(['bash', '/var/lib/operator-scripts/detect_last_commit.sh'], capture_output=True) + if status.returncode == 2: + raise Exception("mysqld seems to be running, cannot inspect local state") + assert status.returncode == 0 + detected = json.loads(status.stdout) + return detected + + +@retry(10) +def push_state(api, namespace, galera_name, pod_name, detected): + galera_cr = api.get(f"/apis/mariadb.openstack.org/v1beta1/namespaces/{namespace}/galeras/{galera_name}/status") + attributes = galera_cr['status'].get('attributes', {}) + attributes[pod_name] = detected + galera_cr['status']['attributes'] = attributes + api.put(f"/apis/mariadb.openstack.org/v1beta1/namespaces/{namespace}/galeras/{galera_name}/status", galera_cr) + + +def report_state(push: bool): + pod_name = socket.gethostname() + galera_name = re.sub(r'-galera-[0-9]*', '', pod_name) + + svc_account = "/var/run/secrets/kubernetes.io/serviceaccount" + with open(os.path.join(svc_account, "namespace")) as f: + namespace = f.read() + with open(os.path.join(svc_account, "token")) as f: + token = f.read() + cert = os.path.join(svc_account, "ca.crt") + + api = APIServer(namespace, token, cert) + + try: + log.info("Retrieving the ID of the container we are running in") + cid = get_cid(api, namespace, pod_name) + log.info(f"found: {cid}") + + log.info(f"Extracting the Galera state from the local mariadb database on {pod_name}") + detected = inspect_galera_state() + log.info(f"found: {detected}") + detected['containerID'] = cid + + if push: + log.info(f"Saving the extracted Galera state into the status of Galera CR {galera_name} - {detected}") + push_state(api, namespace, galera_name, pod_name, detected) + else: + log.info(f"Local galera state detected: {detected}") + + except Exception as e: + log.error(f"Unexpected error while inspecting local galera state: {type(e).__name__}: {e}") + sys.exit(1) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Report the state of the local mysql database") + parser.add_argument("-p", "--push", + help="Push the computed state to the linked galera CR", + action=argparse.BooleanOptionalAction) + parser.add_argument("-f", "--file", + help="Also log output of this script to file", + ) + opts = parser.parse_args() + + log = logging.getLogger(sys.argv[0]) + log.setLevel(logging.DEBUG) + formatter = logging.Formatter(LOG_FORMAT) + + output = logging.StreamHandler(sys.stdout) + output.setFormatter(formatter) + log.addHandler(output) + + if opts.file: + file_output = logging.FileHandler(opts.file, mode='a') + file_output.setFormatter(formatter) + log.addHandler(file_output) + + report_state(opts.push)