Skip to content

Commit a09a5a9

Browse files
committed
🐛 fix: allow reconciliation of deadline-exceeded ClusterObjectSets
Previously, a watch predicate dropped update events for ClusterObjectSets with ProgressDeadlineExceeded, preventing them from being archived or cleaned up. Remove the predicate and replace the inline deadline logic in Reconcile with an enforceProgressDeadline() helper and a progressDeadline() function that computes the absolute deadline from spec and metadata. Add a deadline-aware rate limiter that caps exponential backoff at the deadline and allows one immediate requeue when it passes, without inspecting status conditions. Add an e2e scenario that forces ProgressDeadlineExceeded, archives the COS, and verifies resource cleanup.
1 parent dd57c28 commit a09a5a9

6 files changed

Lines changed: 411 additions & 46 deletions

File tree

internal/operator-controller/controllers/clusterobjectset_controller.go

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"k8s.io/apimachinery/pkg/runtime/schema"
2222
"k8s.io/apimachinery/pkg/types"
2323
"k8s.io/apimachinery/pkg/util/sets"
24+
"k8s.io/client-go/util/workqueue"
2425
"k8s.io/utils/clock"
2526
"pkg.package-operator.run/boxcutter"
2627
"pkg.package-operator.run/boxcutter/machinery"
@@ -30,8 +31,8 @@ import (
3031
ctrl "sigs.k8s.io/controller-runtime"
3132
"sigs.k8s.io/controller-runtime/pkg/builder"
3233
"sigs.k8s.io/controller-runtime/pkg/client"
34+
"sigs.k8s.io/controller-runtime/pkg/controller"
3335
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
34-
"sigs.k8s.io/controller-runtime/pkg/event"
3536
"sigs.k8s.io/controller-runtime/pkg/handler"
3637
"sigs.k8s.io/controller-runtime/pkg/log"
3738
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -80,30 +81,8 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
8081

8182
reconciledRev := existingRev.DeepCopy()
8283
res, reconcileErr := c.reconcile(ctx, reconciledRev)
84+
res = c.enforceProgressDeadline(reconciledRev, res, reconcileErr)
8385

84-
if pd := existingRev.Spec.ProgressDeadlineMinutes; pd > 0 {
85-
cnd := meta.FindStatusCondition(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing)
86-
isStillProgressing := cnd != nil && cnd.Status == metav1.ConditionTrue && cnd.Reason != ocv1.ReasonSucceeded
87-
succeeded := meta.IsStatusConditionTrue(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded)
88-
// check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet
89-
if isStillProgressing && !succeeded {
90-
timeout := time.Duration(pd) * time.Minute
91-
if c.Clock.Since(existingRev.CreationTimestamp.Time) > timeout {
92-
// progress deadline reached, reset any errors and stop reconciling this revision
93-
markAsNotProgressing(reconciledRev, ocv1.ReasonProgressDeadlineExceeded, fmt.Sprintf("Revision has not rolled out for %d minute(s).", pd))
94-
reconcileErr = nil
95-
res = ctrl.Result{}
96-
} else if reconcileErr == nil {
97-
// We want to requeue so far in the future that the next reconciliation
98-
// can detect if the revision did not progress within the given timeout.
99-
// Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed.
100-
drift := 2 * time.Second
101-
requeueAfter := existingRev.CreationTimestamp.Time.Add(timeout).Add(drift).Sub(c.Clock.Now()).Round(time.Second)
102-
l.Info(fmt.Sprintf("ProgressDeadline not exceeded, requeue after ~%v to check again.", requeueAfter))
103-
res = ctrl.Result{RequeueAfter: requeueAfter}
104-
}
105-
}
106-
}
10786
// Do checks before any Update()s, as Update() may modify the resource structure!
10887
updateStatus := !equality.Semantic.DeepEqual(existingRev.Status, reconciledRev.Status)
10988

@@ -288,9 +267,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
288267
} else {
289268
markAsUnavailable(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
290269
}
291-
if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) == nil {
292-
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
293-
}
270+
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
294271
}
295272

296273
return ctrl.Result{}, nil
@@ -333,29 +310,19 @@ type Sourcoser interface {
333310
}
334311

335312
func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
336-
skipProgressDeadlineExceededPredicate := predicate.Funcs{
337-
UpdateFunc: func(e event.UpdateEvent) bool {
338-
rev, ok := e.ObjectNew.(*ocv1.ClusterObjectSet)
339-
if !ok {
340-
return true
341-
}
342-
// allow deletions to happen
343-
if !rev.DeletionTimestamp.IsZero() {
344-
return true
345-
}
346-
if cnd := meta.FindStatusCondition(rev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing); cnd != nil && cnd.Status == metav1.ConditionFalse && cnd.Reason == ocv1.ReasonProgressDeadlineExceeded {
347-
return false
348-
}
349-
return true
350-
},
351-
}
352313
c.Clock = clock.RealClock{}
353314
return ctrl.NewControllerManagedBy(mgr).
315+
WithOptions(controller.Options{
316+
RateLimiter: newDeadlineAwareRateLimiter(
317+
workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](),
318+
mgr.GetClient(),
319+
c.Clock,
320+
),
321+
}).
354322
For(
355323
&ocv1.ClusterObjectSet{},
356324
builder.WithPredicates(
357325
predicate.ResourceVersionChangedPredicate{},
358-
skipProgressDeadlineExceededPredicate,
359326
),
360327
).
361328
WatchesRawSource(
@@ -638,6 +605,60 @@ func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string) {
638605
}
639606
}
640607

608+
// enforceProgressDeadline checks the progress deadline with a fresh clock
609+
// reading after the inner reconcile completes. If the deadline has passed
610+
// and probes have not all passed, it sets ProgressDeadlineExceeded. If the
611+
// deadline has not passed and there is no reconcile error, it caps RequeueAfter
612+
// to ensure a reconcile fires at the deadline.
613+
func (c *ClusterObjectSetReconciler) enforceProgressDeadline(cos *ocv1.ClusterObjectSet, res ctrl.Result, reconcileErr error) ctrl.Result {
614+
deadline, ok := progressDeadline(cos)
615+
if !ok {
616+
return res
617+
}
618+
remaining := deadline.Sub(c.Clock.Now())
619+
if remaining <= 0 {
620+
markAsNotProgressing(cos, ocv1.ReasonProgressDeadlineExceeded,
621+
fmt.Sprintf("Revision has not rolled out for %d minute(s).", cos.Spec.ProgressDeadlineMinutes))
622+
return res
623+
}
624+
if remaining > 0 && reconcileErr == nil && (res.RequeueAfter == 0 || remaining < res.RequeueAfter) {
625+
res.RequeueAfter = remaining
626+
}
627+
return res
628+
}
629+
630+
// progressDeadline returns the absolute time at which the progress deadline
631+
// expires. It derives the deadline from spec and metadata only, with one
632+
// exception: it checks the Succeeded status condition so that a revision
633+
// recovering from drift is not penalised by the original deadline.
634+
//
635+
// Succeeded is a latch: there is no way to deduce from current cluster state
636+
// alone that a COS succeeded in the past. If Succeeded is removed or set to
637+
// False, this function will return a deadline and the reconciler will set
638+
// ProgressDeadlineExceeded even though the revision previously succeeded.
639+
//
640+
// Returns (zero, false) when there is no active deadline:
641+
// - progressDeadlineMinutes is 0
642+
// - the revision has already succeeded
643+
// - the revision is archived (deadline is irrelevant)
644+
// - the revision is being deleted
645+
func progressDeadline(cos *ocv1.ClusterObjectSet) (time.Time, bool) {
646+
pd := cos.Spec.ProgressDeadlineMinutes
647+
if pd <= 0 {
648+
return time.Time{}, false
649+
}
650+
if meta.IsStatusConditionTrue(cos.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded) {
651+
return time.Time{}, false
652+
}
653+
if cos.Spec.LifecycleState == ocv1.ClusterObjectSetLifecycleStateArchived {
654+
return time.Time{}, false
655+
}
656+
if !cos.DeletionTimestamp.IsZero() {
657+
return time.Time{}, false
658+
}
659+
return cos.CreationTimestamp.Time.Add(time.Duration(pd) * time.Minute), true
660+
}
661+
641662
func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string) {
642663
meta.SetStatusCondition(&cos.Status.Conditions, metav1.Condition{
643664
Type: ocv1.ClusterObjectSetTypeProgressing,

internal/operator-controller/controllers/clusterobjectset_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ func Test_ClusterObjectSetReconciler_Reconcile_ProgressDeadline(t *testing.T) {
988988
revisionResult: &mockRevisionResult{
989989
inTransition: true,
990990
},
991-
reconcileResult: ctrl.Result{RequeueAfter: 62 * time.Second},
991+
reconcileResult: ctrl.Result{RequeueAfter: 60 * time.Second},
992992
validate: func(t *testing.T, c client.Client) {
993993
rev := &ocv1.ClusterObjectSet{}
994994
err := c.Get(t.Context(), client.ObjectKey{
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//go:build !standard
2+
3+
package controllers
4+
5+
import (
6+
"context"
7+
"sync"
8+
"time"
9+
10+
"k8s.io/client-go/util/workqueue"
11+
"k8s.io/utils/clock"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
15+
ocv1 "github.com/operator-framework/operator-controller/api/v1"
16+
)
17+
18+
// deadlineAwareRateLimiter wraps a delegate rate limiter and caps the backoff
19+
// duration to the time remaining until the COS progress deadline, ensuring
20+
// that ProgressDeadlineExceeded is set promptly even during exponential backoff.
21+
//
22+
// After the deadline passes, it allows one immediate requeue (returning 0) so
23+
// the reconciler can set the ProgressDeadlineExceeded condition, then falls
24+
// back to the delegate's normal backoff. This avoids both tight-looping and
25+
// coupling to the COS's status conditions.
26+
type deadlineAwareRateLimiter struct {
27+
delegate workqueue.TypedRateLimiter[ctrl.Request]
28+
client client.Reader
29+
clock clock.Clock
30+
pastDeadline sync.Map
31+
}
32+
33+
func newDeadlineAwareRateLimiter(
34+
delegate workqueue.TypedRateLimiter[ctrl.Request],
35+
c client.Reader,
36+
clk clock.Clock,
37+
) *deadlineAwareRateLimiter {
38+
return &deadlineAwareRateLimiter{delegate: delegate, client: c, clock: clk}
39+
}
40+
41+
func (r *deadlineAwareRateLimiter) When(item ctrl.Request) time.Duration {
42+
backoff := r.delegate.When(item)
43+
44+
cos := &ocv1.ClusterObjectSet{}
45+
if err := r.client.Get(context.Background(), item.NamespacedName, cos); err != nil {
46+
return backoff
47+
}
48+
49+
deadline, hasDeadline := progressDeadline(cos)
50+
if !hasDeadline {
51+
return backoff
52+
}
53+
54+
remaining := deadline.Sub(r.clock.Now())
55+
if remaining > 0 {
56+
if remaining < backoff {
57+
return remaining
58+
}
59+
return backoff
60+
}
61+
62+
// Deadline has passed — allow one immediate requeue, then delegate.
63+
if _, already := r.pastDeadline.LoadOrStore(item, struct{}{}); !already {
64+
return 0
65+
}
66+
return backoff
67+
}
68+
69+
func (r *deadlineAwareRateLimiter) Forget(item ctrl.Request) {
70+
r.delegate.Forget(item)
71+
r.pastDeadline.Delete(item)
72+
}
73+
74+
func (r *deadlineAwareRateLimiter) NumRequeues(item ctrl.Request) int {
75+
return r.delegate.NumRequeues(item)
76+
}

0 commit comments

Comments
 (0)