Skip to content

Commit fd1ff0c

Browse files
committed
Refactored restore-in-place async flow, not removing the config map for now
1 parent a36b962 commit fd1ff0c

File tree

2 files changed

+85
-67
lines changed

2 files changed

+85
-67
lines changed

pkg/cluster/cluster.go

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -433,27 +433,27 @@ func (c *Cluster) Create() (err error) {
433433
}
434434

435435

436-
if err := c.updatePITRResources(); err != nil {
436+
if err := c.updatePITRResources(PitrStateLabelValueFinished); err != nil {
437437
return fmt.Errorf("could not update pitr resources: %v", err)
438438
}
439439
return nil
440440
}
441441

442442
// update the label to finished for PITR for the given config map
443-
func (c *Cluster) updatePITRResources() error {
443+
func (c *Cluster) updatePITRResources(state string) error {
444444
cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name)
445445
cmNamespace := c.Namespace
446446
patchPayload := map[string]any{
447447
"metadata": map[string]any{
448448
"labels": map[string]string{
449-
PitrStateLabelKey: PitrStateLabelValueFinished,
449+
PitrStateLabelKey: state,
450450
},
451451
},
452452
}
453453

454454
data, _ := json.Marshal(patchPayload)
455455
if _, err := c.KubeClient.ConfigMaps(cmNamespace).Patch(context.TODO(), cmName, types.MergePatchType, data, metav1.PatchOptions{}, ""); err != nil {
456-
c.logger.Errorf("restore-in-place: error updating config map label to final state: %v", err)
456+
c.logger.Errorf("restore-in-place: error updating config map label to state: %v", err)
457457
return err
458458
}
459459
return nil
@@ -1226,42 +1226,44 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
12261226
}
12271227

12281228
const (
1229-
PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state"
1229+
PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state"
12301230
PitrStateLabelValuePending = "pending"
12311231
PitrStateLabelValueInProgress = "in-progress"
1232-
PitrStateLabelValueFinished = "finished"
1233-
PitrConfigMapNameTemplate = "pitr-state-%s"
1234-
PitrSpecDataKey = "spec"
1232+
PitrStateLabelValueFinished = "finished"
1233+
PitrConfigMapNameTemplate = "pitr-state-%s"
1234+
PitrSpecDataKey = "spec"
12351235
)
12361236

1237-
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
1238-
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
1239-
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
1240-
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
1241-
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
1242-
func (c *Cluster) Delete() error {
1243-
var anyErrors = false
1244-
c.mu.Lock()
1245-
defer c.mu.Unlock()
1246-
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
1247-
1248-
1237+
func (c *Cluster) isRestoreInPlace() bool {
12491238
cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name)
1250-
1251-
isRestoreInPlace := false
12521239
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), cmName, metav1.GetOptions{})
12531240
if err != nil {
12541241
c.logger.Debugf("restore-in-place: Error while fetching config map: %s before deletion", cmName)
1242+
return false
12551243
}
12561244

12571245
if cm != nil {
12581246
if val, ok := cm.Labels[PitrStateLabelKey]; ok {
12591247
if val == PitrStateLabelValuePending {
1260-
isRestoreInPlace = true
1248+
return true
12611249
}
12621250
}
12631251
}
1252+
return false
1253+
}
1254+
1255+
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
1256+
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
1257+
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
1258+
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
1259+
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
1260+
func (c *Cluster) Delete() error {
1261+
var anyErrors = false
1262+
c.mu.Lock()
1263+
defer c.mu.Unlock()
1264+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
12641265

1266+
isRestoreInPlace := c.isRestoreInPlace()
12651267
c.logger.Debugf("restore-in-place: Deleting the cluster, verifying whether resotore-in-place is true or not: %+v\n", isRestoreInPlace)
12661268
if err := c.deleteStreams(); err != nil {
12671269
anyErrors = true

pkg/controller/postgresql.go

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -680,17 +680,25 @@ func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) {
680680
c.logger.Infof("restore-in-place: initiated deletion of postgresql CR %q", pgOld.Name)
681681
}
682682

683-
// processPendingRestores handles the re-creation part of the asynchronous point-in-time-restore.
684-
// It is called periodically and checks for ConfigMaps that signal a pending or in-progress restore.
685683
func (c *Controller) processPendingRestores() error {
686684
c.logger.Debug("restore-in-place: checking for pending restores")
687-
688685
namespace := c.opConfig.WatchedNamespace
689686
if namespace == "" {
690687
namespace = v1.NamespaceAll
691688
}
692689

693-
// Process "pending" restores: wait for deletion and move to "in-progress"
690+
if err := c.processPendingCm(namespace); err != nil {
691+
return err
692+
}
693+
694+
if err := c.processInProgressCm(namespace); err != nil {
695+
return err
696+
}
697+
698+
return nil
699+
}
700+
701+
func (c *Controller) processPendingCm(namespace string) error {
694702
pendingOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValuePending)}
695703
pendingCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), pendingOpts)
696704
if err != nil {
@@ -701,27 +709,35 @@ func (c *Controller) processPendingRestores() error {
701709
}
702710

703711
for _, cm := range pendingCmList.Items {
704-
c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name)
705-
clusterName := strings.TrimPrefix(cm.Name, "pitr-state-")
706-
707-
_, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
708-
if err == nil {
709-
c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName)
710-
continue
711-
}
712-
if !errors.IsNotFound(err) {
713-
c.logger.Errorf("restore-in-place: could not check for existence of Postgresql CR %q: %v", clusterName, err)
714-
continue
712+
if err := c.processSinglePendingCm(cm); err != nil {
713+
c.logger.Errorf("restore-in-place: could not process pending restore for config map %s: %v", cm.Name, err)
715714
}
715+
}
716+
return nil
717+
}
716718

717-
c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName)
718-
cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress
719-
if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil {
720-
c.logger.Errorf("restore-in-place: could not update ConfigMap %q to 'in-progress': %v", cm.Name, err)
721-
}
719+
func (c *Controller) processSinglePendingCm(cm v1.ConfigMap) error {
720+
c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name)
721+
clusterName := strings.TrimPrefix(cm.Name, "pitr-state-")
722+
723+
_, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
724+
if err == nil {
725+
c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName)
726+
return nil
722727
}
728+
if !errors.IsNotFound(err) {
729+
return fmt.Errorf("could not check for existence of Postgresql CR %q: %v", clusterName, err)
730+
}
731+
732+
c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName)
733+
cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress
734+
if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil {
735+
return fmt.Errorf("could not update ConfigMap %q to 'in-progress': %v", cm.Name, err)
736+
}
737+
return nil
738+
}
723739

724-
// Process "in-progress" restores: re-create the CR and clean up
740+
func (c *Controller) processInProgressCm(namespace string) error {
725741
inProgressOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValueInProgress)}
726742
inProgressCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), inProgressOpts)
727743
if err != nil {
@@ -732,33 +748,33 @@ func (c *Controller) processPendingRestores() error {
732748
}
733749

734750
for _, cm := range inProgressCmList.Items {
735-
c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name)
736-
737-
c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name)
738-
var newPgSpec acidv1.Postgresql
739-
if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil {
740-
c.logger.Errorf("restore-in-place: could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err)
741-
continue
751+
if err := c.processSingleInProgressCm(cm); err != nil {
752+
c.logger.Errorf("restore-in-place: could not process in-progress restore for config map %s: %v", cm.Name, err)
742753
}
754+
}
755+
return nil
756+
}
743757

744-
c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name)
745-
_, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{})
746-
if err != nil {
747-
if errors.IsAlreadyExists(err) {
748-
c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name)
749-
// fallthrough to delete
750-
} else {
751-
c.logger.Errorf("restore-in-place: could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err)
752-
continue // Retry on next cycle
753-
}
758+
func (c *Controller) processSingleInProgressCm(cm v1.ConfigMap) error {
759+
c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name)
760+
761+
c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name)
762+
var newPgSpec acidv1.Postgresql
763+
if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil {
764+
return fmt.Errorf("could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err)
765+
}
766+
767+
c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name)
768+
_, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{})
769+
if err != nil {
770+
if errors.IsAlreadyExists(err) {
771+
c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name)
772+
// fallthrough to delete
754773
} else {
755-
c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name)
774+
return fmt.Errorf("could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err)
756775
}
757-
758-
// c.logger.Debugf("restore-in-place: deleting successfully used restore ConfigMap %q", cm.Name)
759-
// if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}); err != nil {
760-
// c.logger.Errorf("restore-in-place: could not delete state ConfigMap %q: %v", cm.Name, err)
761-
// }
776+
} else {
777+
c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name)
762778
}
763779

764780
return nil

0 commit comments

Comments
 (0)