Skip to content

Commit a36b962

Browse files
committed
Added async restore in place
1 parent afa2858 commit a36b962

File tree

3 files changed

+201
-88
lines changed

3 files changed

+201
-88
lines changed

pkg/cluster/cluster.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cluster
33
// Postgres CustomResourceDefinition object i.e. Spilo
44

55
import (
6+
"context"
67
"database/sql"
78
"encoding/json"
89
"fmt"
@@ -431,6 +432,30 @@ func (c *Cluster) Create() (err error) {
431432
c.logger.Errorf("could not list resources: %v", err)
432433
}
433434

435+
436+
if err := c.updatePITRResources(); err != nil {
437+
return fmt.Errorf("could not update pitr resources: %v", err)
438+
}
439+
return nil
440+
}
441+
442+
// update the label to finished for PITR for the given config map
443+
func (c *Cluster) updatePITRResources() error {
444+
cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name)
445+
cmNamespace := c.Namespace
446+
patchPayload := map[string]any{
447+
"metadata": map[string]any{
448+
"labels": map[string]string{
449+
PitrStateLabelKey: PitrStateLabelValueFinished,
450+
},
451+
},
452+
}
453+
454+
data, _ := json.Marshal(patchPayload)
455+
if _, err := c.KubeClient.ConfigMaps(cmNamespace).Patch(context.TODO(), cmName, types.MergePatchType, data, metav1.PatchOptions{}, ""); err != nil {
456+
c.logger.Errorf("restore-in-place: error updating config map label to final state: %v", err)
457+
return err
458+
}
434459
return nil
435460
}
436461

@@ -1200,6 +1225,15 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
12001225
return false
12011226
}
12021227

1228+
const (
1229+
PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state"
1230+
PitrStateLabelValuePending = "pending"
1231+
PitrStateLabelValueInProgress = "in-progress"
1232+
PitrStateLabelValueFinished = "finished"
1233+
PitrConfigMapNameTemplate = "pitr-state-%s"
1234+
PitrSpecDataKey = "spec"
1235+
)
1236+
12031237
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
12041238
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
12051239
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
@@ -1211,7 +1245,23 @@ func (c *Cluster) Delete() error {
12111245
defer c.mu.Unlock()
12121246
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
12131247

1214-
isRestoreInPlace := c.Annotations["postgres-operator.zalando.org/action"] == "restore-in-place"
1248+
1249+
cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name)
1250+
1251+
isRestoreInPlace := false
1252+
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), cmName, metav1.GetOptions{})
1253+
if err != nil {
1254+
c.logger.Debugf("restore-in-place: Error while fetching config map: %s before deletion", cmName)
1255+
}
1256+
1257+
if cm != nil {
1258+
if val, ok := cm.Labels[PitrStateLabelKey]; ok {
1259+
if val == PitrStateLabelValuePending {
1260+
isRestoreInPlace = true
1261+
}
1262+
}
1263+
}
1264+
12151265
c.logger.Debugf("restore-in-place: Deleting the cluster, verifying whether resotore-in-place is true or not: %+v\n", isRestoreInPlace)
12161266
if err := c.deleteStreams(); err != nil {
12171267
anyErrors = true

pkg/cluster/resources.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
batchv1 "k8s.io/api/batch/v1"
1111
v1 "k8s.io/api/core/v1"
1212
policyv1 "k8s.io/api/policy/v1"
13+
"k8s.io/apimachinery/pkg/api/errors"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/apimachinery/pkg/types"
1516

@@ -301,6 +302,21 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
301302
c.setProcessName("creating %v service", role)
302303

303304
serviceSpec := c.generateService(role, &c.Spec)
305+
306+
// check if the service already exists in case of pitr
307+
svc, err := c.KubeClient.Services(serviceSpec.Namespace).Get(context.TODO(), serviceSpec.Name, metav1.GetOptions{})
308+
309+
// service already exists
310+
if err == nil {
311+
c.Services[role] = svc
312+
return svc, nil
313+
}
314+
315+
if !errors.IsNotFound(err) {
316+
return nil, err
317+
}
318+
319+
// at last create the service
304320
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
305321
if err != nil {
306322
return nil, err

pkg/controller/postgresql.go

Lines changed: 134 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1717
"k8s.io/apimachinery/pkg/api/errors"
1818
"k8s.io/apimachinery/pkg/types"
19-
"k8s.io/apimachinery/pkg/util/wait"
2019
"k8s.io/client-go/tools/cache"
2120

2221
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
@@ -27,6 +26,11 @@ import (
2726
"github.com/zalando/postgres-operator/pkg/util/ringlog"
2827
)
2928

29+
const (
30+
restoreAnnotationKey = "postgres-operator.zalando.org/action"
31+
restoreAnnotationValue = "restore-in-place"
32+
)
33+
3034
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
3135
defer wg.Done()
3236
ticker := time.NewTicker(c.opConfig.ResyncPeriod)
@@ -37,6 +41,9 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
3741
if err := c.clusterListAndSync(); err != nil {
3842
c.logger.Errorf("could not list clusters: %v", err)
3943
}
44+
if err := c.processPendingRestores(); err != nil {
45+
c.logger.Errorf("could not process pending restores: %v", err)
46+
}
4047
case <-stopCh:
4148
return
4249
}
@@ -542,7 +549,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
542549
pgNew := c.postgresqlCheck(cur)
543550
if pgOld != nil && pgNew != nil {
544551

545-
if pgNew.Annotations["postgres-operator.zalando.org/action"] == "restore-in-place" {
552+
if pgNew.Annotations[restoreAnnotationKey] == restoreAnnotationValue {
546553
c.logger.Debugf("restore-in-place: postgresqlUpdate called for cluster %q", pgNew.Name)
547554
c.handlerRestoreInPlace(pgOld, pgNew)
548555
return
@@ -604,117 +611,157 @@ func (c *Controller) validateRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) err
604611
return nil
605612
}
606613

607-
// waitForOldResourcesTermination waits until the postgresql CR and its StatefulSet are terminated
608-
func (c *Controller) waitForOldResourcesTermination(pgOld *acidv1.Postgresql, statefulSetName string) error {
609-
c.logger.Debugf("restore-in-place: Waiting for old CR %q and StatefulSet %q to be fully terminated", pgOld.Name, statefulSetName)
610-
611-
err := wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) {
612-
// Check for CR
613-
_, crErr := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(pgOld.Namespace).Get(ctx, pgOld.Name, metav1.GetOptions{})
614-
crGone := errors.IsNotFound(crErr)
615-
if crErr != nil && !crGone {
616-
c.logger.Errorf("restore-in-place: Error while waiting for CR deletion: %v", crErr)
617-
return false, crErr // A real error occurred
618-
}
619-
620-
// Check for StatefulSet
621-
_, stsErr := c.KubeClient.StatefulSets(pgOld.Namespace).Get(ctx, statefulSetName, metav1.GetOptions{})
622-
stsGone := errors.IsNotFound(stsErr)
623-
if stsErr != nil && !stsGone {
624-
c.logger.Errorf("restore-in-place: Error while waiting for StatefulSet deletion: %v", stsErr)
625-
return false, stsErr // A real error occurred
626-
}
627-
628-
if crGone && stsGone {
629-
c.logger.Debugf("restore-in-place: Both old CR and StatefulSet are fully terminated.")
630-
return true, nil
631-
}
632-
633-
if !crGone {
634-
c.logger.Infof("restore-in-place: still waiting for postgresql CR %q to be deleted", pgOld.Name)
635-
}
636-
if !stsGone {
637-
c.logger.Infof("restore-in-place: still waiting for StatefulSet %q to be deleted", statefulSetName)
638-
}
639-
640-
return false, nil // Not done yet, continue polling.
641-
})
642-
643-
if err != nil {
644-
return fmt.Errorf("error while waiting for old resources to be deleted: %v", err)
645-
}
646614

647-
c.logger.Debugf("restore-in-place: Finished waiting for old resource deletion.")
648-
return nil
649-
}
650615

651-
// handlerRestoreInPlace is to handle the resotre in place, it does few operatons
652-
// 1. Verifies the parameters required for restoring in place
653-
// 2. Removes the old CR if it exists, wait for it, if not present check the err that it is a k8sNotfound error and continue
654-
// 3. Wait for the successful removal of statefulsets, if not present check the err that it is a k8sNotfound error and continue
655-
// 4. Create a new CR with the latest details, while keeping few metadata about restore
616+
// handlerRestoreInPlace starts an asynchronous point-in-time-restore.
617+
// It creates a ConfigMap to store the state and then deletes the old Postgresql CR.
656618
func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) {
657-
c.logger.Infof("restore-in-place: starting restore-in-place for cluster %q", pgNew.Name)
619+
c.logger.Infof("restore-in-place: starting asynchronous restore-in-place for cluster %q", pgNew.Name)
658620

659621
if err := c.validateRestoreInPlace(pgOld, pgNew); err != nil {
660622
c.logger.Errorf("restore-in-place: validation failed for cluster %q: %v", pgNew.Name, err)
661623
return
662624
}
663625

626+
// Prepare new spec for the restored cluster
627+
c.logger.Debugf("restore-in-place: preparing new postgresql spec for cluster %q", pgNew.Name)
664628
newPgSpec := pgNew.DeepCopy()
665-
delete(newPgSpec.Annotations, "postgres-operator.zalando.org/action")
629+
delete(newPgSpec.Annotations, restoreAnnotationKey)
666630
newPgSpec.ResourceVersion = ""
667631
newPgSpec.UID = ""
668-
c.logger.Debugf("restore-in-place: newPgSpec after removing annotation: %+v", newPgSpec)
669632

670-
statefulSetName := pgOld.Name // Capture StatefulSet name, it's the same as the cluster name
633+
specData, err := json.Marshal(newPgSpec)
634+
if err != nil {
635+
c.logger.Errorf("restore-in-place: could not marshal new postgresql spec for cluster %q: %v", newPgSpec.Name, err)
636+
return
637+
}
638+
639+
// Create or update ConfigMap to store restore state
640+
cmName := fmt.Sprintf(cluster.PitrConfigMapNameTemplate, newPgSpec.Name)
641+
c.logger.Debugf("restore-in-place: creating or updating state ConfigMap %q for cluster %q", cmName, newPgSpec.Name)
642+
cm := &v1.ConfigMap{
643+
ObjectMeta: metav1.ObjectMeta{
644+
Name: cmName,
645+
Namespace: newPgSpec.Namespace,
646+
Labels: map[string]string{
647+
cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending,
648+
},
649+
},
650+
Data: map[string]string{
651+
cluster.PitrSpecDataKey: string(specData),
652+
},
653+
}
654+
655+
// Check if ConfigMap already exists
656+
_, err = c.KubeClient.ConfigMaps(cm.Namespace).Get(context.TODO(), cm.Name, metav1.GetOptions{})
657+
if err != nil {
658+
if errors.IsNotFound(err) {
659+
_, err = c.KubeClient.ConfigMaps(cm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{})
660+
}
661+
} else {
662+
// If for some reason CM exists, update it
663+
_, err = c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{})
664+
}
665+
666+
if err != nil {
667+
c.logger.Errorf("restore-in-place: could not create or update state ConfigMap %q for cluster %q: %v", cmName, newPgSpec.Name, err)
668+
return
669+
}
670+
c.logger.Infof("restore-in-place: state ConfigMap %q created for cluster %q", cmName, newPgSpec.Name)
671671

672-
// Initiate CR deletion first, as requested
673-
c.logger.Debugf("restore-in-place: Attempting direct API deletion of postgresql CR %q", pgOld.Name)
674-
err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{})
672+
// Delete old postgresql CR to trigger cleanup and UID change
673+
c.logger.Debugf("restore-in-place: attempting deletion of postgresql CR %q", pgOld.Name)
674+
err = c.KubeClient.Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{})
675675
if err != nil && !errors.IsNotFound(err) {
676-
c.logger.Errorf("restore-in-place: could not delete postgresql CR via API: %v", err)
677-
return // Stop if there's a critical error deleting the CR
676+
c.logger.Errorf("restore-in-place: could not delete postgresql CR %q: %v", pgOld.Name, err)
677+
// Consider deleting the ConfigMap here to allow a retry
678+
return
679+
}
680+
c.logger.Infof("restore-in-place: initiated deletion of postgresql CR %q", pgOld.Name)
681+
}
682+
683+
// processPendingRestores handles the re-creation part of the asynchronous point-in-time-restore.
684+
// It is called periodically and checks for ConfigMaps that signal a pending or in-progress restore.
685+
func (c *Controller) processPendingRestores() error {
686+
c.logger.Debug("restore-in-place: checking for pending restores")
687+
688+
namespace := c.opConfig.WatchedNamespace
689+
if namespace == "" {
690+
namespace = v1.NamespaceAll
678691
}
679-
c.logger.Debugf("restore-in-place: Direct API deletion of postgresql CR for %q initiated (or CR was already not found).", pgOld.Name)
680692

681-
// Then, initiate cluster sub-resource deletion if the cluster object is in memory
682-
clusterName := util.NameFromMeta(pgOld.ObjectMeta)
683-
c.clustersMu.RLock()
684-
cl, clusterFound := c.clusters[clusterName]
685-
c.clustersMu.RUnlock()
693+
// Process "pending" restores: wait for deletion and move to "in-progress"
694+
pendingOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValuePending)}
695+
pendingCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), pendingOpts)
696+
if err != nil {
697+
return fmt.Errorf("restore-in-place: could not list pending restore ConfigMaps: %v", err)
698+
}
699+
if len(pendingCmList.Items) > 0 {
700+
c.logger.Debugf("restore-in-place: found %d pending restore(s) to process", len(pendingCmList.Items))
701+
}
686702

687-
if clusterFound {
688-
c.logger.Debugf("restore-in-place: Cluster object found in memory. Calling cluster.Delete() for %q", clusterName)
689-
if cl.Annotations == nil {
690-
cl.Annotations = make(map[string]string)
703+
for _, cm := range pendingCmList.Items {
704+
c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name)
705+
clusterName := strings.TrimPrefix(cm.Name, "pitr-state-")
706+
707+
_, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
708+
if err == nil {
709+
c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName)
710+
continue
691711
}
692-
cl.Annotations["postgres-operator.zalando.org/action"] = "restore-in-place" // User requested to keep this
693-
if err := cl.Delete(); err != nil {
694-
// Log error but continue to ensure we wait for termination
695-
c.logger.Errorf("restore-in-place: error during cluster.Delete() for %q: %v. Proceeding to wait for termination.", clusterName, err)
712+
if !errors.IsNotFound(err) {
713+
c.logger.Errorf("restore-in-place: could not check for existence of Postgresql CR %q: %v", clusterName, err)
714+
continue
696715
}
697-
c.logger.Debugf("restore-in-place: cluster.Delete() returned for %q", clusterName)
698-
} else {
699-
c.logger.Warningf("restore-in-place: cluster %q not found in controller's map. Relying on CR deletion to trigger cleanup.", clusterName)
700-
}
701716

702-
if err := c.waitForOldResourcesTermination(pgOld, statefulSetName); err != nil {
703-
c.logger.Errorf("restore-in-place: %v", err)
704-
return
717+
c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName)
718+
cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress
719+
if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil {
720+
c.logger.Errorf("restore-in-place: could not update ConfigMap %q to 'in-progress': %v", cm.Name, err)
721+
}
705722
}
706723

707-
// Create a new CR with the latest details
708-
c.logger.Debugf("restore-in-place: Creating new postgresql CR %q", newPgSpec.Name)
709-
_, err = c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(newPgSpec.Namespace).Create(context.TODO(), newPgSpec, metav1.CreateOptions{})
724+
// Process "in-progress" restores: re-create the CR and clean up
725+
inProgressOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValueInProgress)}
726+
inProgressCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), inProgressOpts)
710727
if err != nil {
711-
c.logger.Errorf("restore-in-place: could not create postgresql CR for restore-in-place: %v", err)
712-
// If the new CR cannot be created, the user needs to intervene.
713-
return
728+
return fmt.Errorf("restore-in-place: could not list in-progress restore ConfigMaps: %v", err)
729+
}
730+
if len(inProgressCmList.Items) > 0 {
731+
c.logger.Debugf("restore-in-place: found %d in-progress restore(s) to process", len(inProgressCmList.Items))
732+
}
733+
734+
for _, cm := range inProgressCmList.Items {
735+
c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name)
736+
737+
c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name)
738+
var newPgSpec acidv1.Postgresql
739+
if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil {
740+
c.logger.Errorf("restore-in-place: could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err)
741+
continue
742+
}
743+
744+
c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name)
745+
_, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{})
746+
if err != nil {
747+
if errors.IsAlreadyExists(err) {
748+
c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name)
749+
// fallthrough to delete
750+
} else {
751+
c.logger.Errorf("restore-in-place: could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err)
752+
continue // Retry on next cycle
753+
}
754+
} else {
755+
c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name)
756+
}
757+
758+
// c.logger.Debugf("restore-in-place: deleting successfully used restore ConfigMap %q", cm.Name)
759+
// if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}); err != nil {
760+
// c.logger.Errorf("restore-in-place: could not delete state ConfigMap %q: %v", cm.Name, err)
761+
// }
714762
}
715-
c.logger.Debugf("restore-in-place: New postgresql CR %q created", newPgSpec.Name)
716763

717-
c.logger.Infof("restore-in-place: for cluster %q triggered successfully", pgNew.Name)
764+
return nil
718765
}
719766

720767
/*

0 commit comments

Comments
 (0)