@@ -3,13 +3,17 @@ package snapshots
33import (
44 "context"
55 "fmt"
6+ "strings"
67 "time"
78
9+ merr "github.com/hashicorp/go-multierror"
810 volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
911 "github.com/pkg/errors"
1012 corev1 "k8s.io/api/core/v1"
13+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
1114 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1215 "k8s.io/apimachinery/pkg/labels"
16+ "k8s.io/apimachinery/pkg/types"
1317 "k8s.io/client-go/util/retry"
1418 "k8s.io/utils/ptr"
1519 "sigs.k8s.io/controller-runtime/pkg/client"
@@ -20,6 +24,7 @@ import (
2024 "github.com/percona/percona-postgresql-operator/v2/internal/feature"
2125 "github.com/percona/percona-postgresql-operator/v2/internal/logging"
2226 "github.com/percona/percona-postgresql-operator/v2/internal/naming"
27+ "github.com/percona/percona-postgresql-operator/v2/percona/k8s"
2328 pNaming "github.com/percona/percona-postgresql-operator/v2/percona/naming"
2429 v2 "github.com/percona/percona-postgresql-operator/v2/pkg/apis/pgv2.percona.com/v2"
2530)
@@ -30,6 +35,8 @@ const (
3035 defaultSnapshotErrorDeadline = 5 * time .Minute
3136)
3237
38+ var errVolumeSnapshotFailed = errors .New ("volume snapshot failed" )
39+
3340type snapshotExecutor interface {
3441 // Prepare the cluster for performing a snapshot.
3542 // Returns the name of the instance whose PVCs will be snapshotted.
@@ -149,6 +156,15 @@ func (r *snapshotReconciler) reconcileNew(ctx context.Context) (reconcile.Result
149156 return reconcile.Result {RequeueAfter : time .Second * 5 }, nil
150157 }
151158
159+ acquired , err := r .tryAcquireLease (ctx )
160+ if err != nil {
161+ return reconcile.Result {}, errors .Wrap (err , "failed to acquire lease" )
162+ }
163+ if ! acquired {
164+ r .log .Info ("Backup lock is held by another backup" )
165+ return reconcile.Result {RequeueAfter : time .Second * 5 }, nil
166+ }
167+
152168 if updErr := r .backup .UpdateStatus (ctx , r .cl , func (bcp * v2.PerconaPGBackup ) {
153169 bcp .Status .State = v2 .BackupStarting
154170 bcp .Status .BackupType = v2 .PGBackupTypeSnapshot
@@ -181,21 +197,42 @@ func (r *snapshotReconciler) reconcileRunning(ctx context.Context) (reconcile.Re
181197 return reconcile.Result {}, errors .Wrap (err , "failed to get target PVCs" )
182198 }
183199
200+ // Gather VolumeSnapshot errors from all and report it at once in the status
201+ // while failing the backup.
202+ var snapshotErrors error
203+
184204 dataOk , err := r .reconcileDataSnapshot (ctx , dataPVC )
185- if err != nil {
205+ if errors .Is (err , errVolumeSnapshotFailed ) {
206+ snapshotErrors = merr .Append (snapshotErrors , err )
207+ } else if err != nil {
186208 return reconcile.Result {}, errors .Wrap (err , "failed to reconcile data snapshot" )
187209 }
188210
189211 walOk , err := r .reconcileWALSnapshot (ctx , walPVC )
190- if err != nil {
212+ if errors .Is (err , errVolumeSnapshotFailed ) {
213+ snapshotErrors = merr .Append (snapshotErrors , err )
214+ } else if err != nil {
191215 return reconcile.Result {}, errors .Wrap (err , "failed to reconcile WAL snapshot" )
192216 }
193217
194218 tablespaceOk , err := r .reconcileTablespaceSnapshot (ctx , tablespacePVCs )
195- if err != nil {
219+ if errors .Is (err , errVolumeSnapshotFailed ) {
220+ snapshotErrors = merr .Append (snapshotErrors , err )
221+ } else if err != nil {
196222 return reconcile.Result {}, errors .Wrap (err , "failed to reconcile tablespace snapshot" )
197223 }
198224
225+ if snapshotErrors != nil {
226+ if updErr := r .backup .UpdateStatus (ctx , r .cl , func (bcp * v2.PerconaPGBackup ) {
227+ bcp .Status .State = v2 .BackupFailed
228+ bcp .Status .Error = fmt .Sprintf ("one or more snapshots failed: %s" , snapshotErrors )
229+ bcp .Status .CompletedAt = ptr .To (metav1 .Now ())
230+ }); updErr != nil {
231+ return reconcile.Result {}, errors .Wrap (updErr , "failed to update backup status" )
232+ }
233+ return reconcile.Result {}, errors .Wrap (errVolumeSnapshotFailed , snapshotErrors .Error ())
234+ }
235+
199236 if ! dataOk || ! walOk || ! tablespaceOk {
200237 return reconcile.Result {RequeueAfter : time .Second * 5 }, nil
201238 }
@@ -248,7 +285,7 @@ func (r *snapshotReconciler) reconcileSnapshot(ctx context.Context, volumeSnapsh
248285 return false , nil
249286 }
250287
251- err := errors .New ( message )
288+ err := errors .Wrap ( errVolumeSnapshotFailed , fmt . Sprintf ( "VolumeSnapshot %s failed: %s" , volumeSnapshot . GetName (), message ) )
252289
253290 log .Error (err , "Volume snapshot failed" )
254291 return false , err
@@ -480,6 +517,11 @@ func (r *snapshotReconciler) complete(ctx context.Context) error {
480517 return errors .Wrap (err , "finalize failed" )
481518 }
482519
520+ // release lease
521+ if err := k8s .ReleaseLease (ctx , r .cl , r .backupLeaseName (), r .backupLeaseHolder (), r .cluster .GetNamespace ()); err != nil {
522+ return errors .Wrap (err , "failed to release lease" )
523+ }
524+
483525 // remove finalizer
484526 if err := retry .RetryOnConflict (retry .DefaultBackoff , func () error {
485527 bcp := r .backup .DeepCopy ()
@@ -494,3 +536,55 @@ func (r *snapshotReconciler) complete(ctx context.Context) error {
494536 }
495537 return nil
496538}
539+
540+ func (r * snapshotReconciler ) backupLeaseName () string {
541+ return "pg-" + r .cluster .GetName () + "-backup-lock"
542+ }
543+
544+ func (r * snapshotReconciler ) backupLeaseHolder () string {
545+ return fmt .Sprintf ("%s|%s" , r .backup .GetName (), r .backup .GetUID ())
546+ }
547+
548+ func (r * snapshotReconciler ) parseBackupLeaseHolder (holder string ) (string , types.UID ) {
549+ parts := strings .Split (holder , "|" )
550+ if len (parts ) != 2 {
551+ return "" , ""
552+ }
553+ return parts [0 ], types .UID (parts [1 ])
554+ }
555+
556+ func (r * snapshotReconciler ) tryAcquireLease (ctx context.Context ) (bool , error ) {
557+ leaseName := r .backupLeaseName ()
558+ leaseHolderID := r .backupLeaseHolder ()
559+
560+ checkStale := func (ctx context.Context , currentHolder string ) (bool , error ) {
561+ backupName , backupUID := r .parseBackupLeaseHolder (currentHolder )
562+ if backupName == "" || backupUID == "" {
563+ r .log .Info ("Backup lease holder is malformed, acquiring lease anyway" )
564+ return true , nil
565+ }
566+
567+ backup := & v2.PerconaPGBackup {}
568+ if err := r .cl .Get (ctx , client.ObjectKey {Name : backupName , Namespace : r .cluster .GetNamespace ()}, backup ); k8serrors .IsNotFound (err ) {
569+ return true , nil
570+ } else if err != nil {
571+ return false , errors .Wrap (err , "failed to get backup" )
572+ } else if backup .GetUID () != backupUID {
573+ // We found a backup with the same name, but different UID.
574+ // So this isn't the same backup that was holding the lease.
575+ return true , nil
576+ }
577+
578+ // We found the backup that holds the lease. Check if it has completed fully before we acquire the lease.
579+ return (backup .Status .State == v2 .BackupFailed || backup .Status .State == v2 .BackupSucceeded ) &&
580+ ! controllerutil .ContainsFinalizer (backup , pNaming .FinalizerSnapshotInProgress ), nil
581+ }
582+
583+ if err := k8s .AcquireLease (ctx , r .cl , leaseName , leaseHolderID , r .cluster .GetNamespace (), checkStale ); err != nil {
584+ if errors .Is (err , k8s .ErrLeaseAlreadyHeld ) || k8serrors .IsAlreadyExists (err ) || k8serrors .IsConflict (err ) {
585+ return false , nil
586+ }
587+ return false , errors .Wrap (err , "failed to acquire lease" )
588+ }
589+ return true , nil
590+ }
0 commit comments