@@ -27,11 +27,12 @@ import (
2727 machinerytypes "pkg.package-operator.run/boxcutter/machinery/types"
2828 "pkg.package-operator.run/boxcutter/ownerhandling"
2929 "pkg.package-operator.run/boxcutter/probing"
30+ "k8s.io/client-go/util/workqueue"
3031 ctrl "sigs.k8s.io/controller-runtime"
3132 "sigs.k8s.io/controller-runtime/pkg/builder"
3233 "sigs.k8s.io/controller-runtime/pkg/client"
34+ "sigs.k8s.io/controller-runtime/pkg/controller"
3335 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
34- "sigs.k8s.io/controller-runtime/pkg/event"
3536 "sigs.k8s.io/controller-runtime/pkg/handler"
3637 "sigs.k8s.io/controller-runtime/pkg/log"
3738 "sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -81,28 +82,13 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
8182 reconciledRev := existingRev .DeepCopy ()
8283 res , reconcileErr := c .reconcile (ctx , reconciledRev )
8384
84- if pd := existingRev .Spec .ProgressDeadlineMinutes ; pd > 0 {
85- cnd := meta .FindStatusCondition (reconciledRev .Status .Conditions , ocv1 .ClusterObjectSetTypeProgressing )
86- isStillProgressing := cnd != nil && cnd .Status == metav1 .ConditionTrue && cnd .Reason != ocv1 .ReasonSucceeded
87- succeeded := meta .IsStatusConditionTrue (reconciledRev .Status .Conditions , ocv1 .ClusterObjectSetTypeSucceeded )
88- // check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet
89- if isStillProgressing && ! succeeded {
90- timeout := time .Duration (pd ) * time .Minute
91- if c .Clock .Since (existingRev .CreationTimestamp .Time ) > timeout {
92- // progress deadline reached, reset any errors and stop reconciling this revision
93- markAsNotProgressing (reconciledRev , ocv1 .ReasonProgressDeadlineExceeded , fmt .Sprintf ("Revision has not rolled out for %d minute(s)." , pd ))
94- reconcileErr = nil
95- res = ctrl.Result {}
96- } else if reconcileErr == nil {
97- // We want to requeue so far in the future that the next reconciliation
98- // can detect if the revision did not progress within the given timeout.
99- // Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed.
100- drift := 2 * time .Second
101- requeueAfter := existingRev .CreationTimestamp .Time .Add (timeout ).Add (drift ).Sub (c .Clock .Now ()).Round (time .Second )
102- l .Info (fmt .Sprintf ("ProgressDeadline not exceeded, requeue after ~%v to check again." , requeueAfter ))
103- res = ctrl.Result {RequeueAfter : requeueAfter }
104- }
105- }
85+ if remaining , hasDeadline := c .durationUntilDeadline (existingRev ); hasDeadline && remaining > 0 && reconcileErr == nil {
86+ // Requeue slightly after the deadline so the next reconciliation
87+ // can detect that the revision did not progress in time.
88+ drift := 2 * time .Second
89+ requeueAfter := (remaining + drift ).Round (time .Second )
90+ l .Info (fmt .Sprintf ("ProgressDeadline not exceeded, requeue after ~%v to check again." , requeueAfter ))
91+ res = ctrl.Result {RequeueAfter : requeueAfter }
10692 }
10793 // Do checks before any Update()s, as Update() may modify the resource structure!
10894 updateStatus := ! equality .Semantic .DeepEqual (existingRev .Status , reconciledRev .Status )
@@ -144,15 +130,18 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
144130 return c .delete (ctx , cos )
145131 }
146132
133+ remaining , hasDeadline := c .durationUntilDeadline (cos )
134+ isDeadlineExceeded := hasDeadline && remaining <= 0
135+
147136 phases , opts , err := c .buildBoxcutterPhases (ctx , cos )
148137 if err != nil {
149- setRetryingConditions (cos , err .Error ())
138+ setRetryingConditions (cos , err .Error (), isDeadlineExceeded )
150139 return ctrl.Result {}, fmt .Errorf ("converting to boxcutter revision: %v" , err )
151140 }
152141
153142 revisionEngine , err := c .RevisionEngineFactory .CreateRevisionEngine (ctx , cos )
154143 if err != nil {
155- setRetryingConditions (cos , err .Error ())
144+ setRetryingConditions (cos , err .Error (), isDeadlineExceeded )
156145 return ctrl.Result {}, fmt .Errorf ("failed to create revision engine: %v" , err )
157146 }
158147
@@ -178,7 +167,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
178167
179168 if err := c .establishWatch (ctx , cos , revision ); err != nil {
180169 werr := fmt .Errorf ("establish watch: %v" , err )
181- setRetryingConditions (cos , werr .Error ())
170+ setRetryingConditions (cos , werr .Error (), isDeadlineExceeded )
182171 return ctrl.Result {}, werr
183172 }
184173
@@ -188,22 +177,22 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
188177 // Log detailed reconcile reports only in debug mode (V(1)) to reduce verbosity.
189178 l .V (1 ).Info ("reconcile report" , "report" , rres .String ())
190179 }
191- setRetryingConditions (cos , err .Error ())
180+ setRetryingConditions (cos , err .Error (), isDeadlineExceeded )
192181 return ctrl.Result {}, fmt .Errorf ("revision reconcile: %v" , err )
193182 }
194183
195184 // Retry failing preflight checks with a flat 10s retry.
196185 // TODO: report status, backoff?
197186 if verr := rres .GetValidationError (); verr != nil {
198187 l .Error (fmt .Errorf ("%w" , verr ), "preflight validation failed, retrying after 10s" )
199- setRetryingConditions (cos , fmt .Sprintf ("revision validation error: %s" , verr ))
188+ setRetryingConditions (cos , fmt .Sprintf ("revision validation error: %s" , verr ), isDeadlineExceeded )
200189 return ctrl.Result {RequeueAfter : 10 * time .Second }, nil
201190 }
202191
203192 for i , pres := range rres .GetPhases () {
204193 if verr := pres .GetValidationError (); verr != nil {
205194 l .Error (fmt .Errorf ("%w" , verr ), "phase preflight validation failed, retrying after 10s" , "phase" , i )
206- setRetryingConditions (cos , fmt .Sprintf ("phase %d validation error: %s" , i , verr ))
195+ setRetryingConditions (cos , fmt .Sprintf ("phase %d validation error: %s" , i , verr ), isDeadlineExceeded )
207196 return ctrl.Result {RequeueAfter : 10 * time .Second }, nil
208197 }
209198
@@ -216,14 +205,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
216205
217206 if len (collidingObjs ) > 0 {
218207 l .Error (fmt .Errorf ("object collision detected" ), "object collision, retrying after 10s" , "phase" , i , "collisions" , collidingObjs )
219- setRetryingConditions (cos , fmt .Sprintf ("revision object collisions in phase %d\n %s" , i , strings .Join (collidingObjs , "\n \n " )))
208+ setRetryingConditions (cos , fmt .Sprintf ("revision object collisions in phase %d\n %s" , i , strings .Join (collidingObjs , "\n \n " )), isDeadlineExceeded )
220209 return ctrl.Result {RequeueAfter : 10 * time .Second }, nil
221210 }
222211 }
223212
224213 revVersion := cos .GetAnnotations ()[labels .BundleVersionKey ]
225214 if rres .InTransition () {
226- markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ))
215+ markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ), isDeadlineExceeded )
227216 }
228217
229218 //nolint:nestif
@@ -243,7 +232,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
243232 }
244233 }
245234
246- markAsProgressing (cos , ocv1 .ReasonSucceeded , fmt .Sprintf ("Revision %s has rolled out." , revVersion ))
235+ markAsProgressing (cos , ocv1 .ReasonSucceeded , fmt .Sprintf ("Revision %s has rolled out." , revVersion ), isDeadlineExceeded )
247236 markAsAvailable (cos , ocv1 .ClusterObjectSetReasonProbesSucceeded , "Objects are available and pass all probes." )
248237
249238 // We'll probably only want to remove this once we are done updating the ClusterExtension conditions
@@ -289,7 +278,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
289278 markAsUnavailable (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ))
290279 }
291280 if meta .FindStatusCondition (cos .Status .Conditions , ocv1 .ClusterObjectSetTypeProgressing ) == nil {
292- markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ))
281+ markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ), isDeadlineExceeded )
293282 }
294283 }
295284
@@ -311,11 +300,11 @@ func (c *ClusterObjectSetReconciler) archive(ctx context.Context, revisionEngine
311300 tdres , err := revisionEngine .Teardown (ctx , revision )
312301 if err != nil {
313302 err = fmt .Errorf ("error archiving revision: %v" , err )
314- setRetryingConditions (cos , err .Error ())
303+ setRetryingConditions (cos , err .Error (), false )
315304 return ctrl.Result {}, err
316305 }
317306 if tdres != nil && ! tdres .IsComplete () {
318- setRetryingConditions (cos , "removing revision resources that are not owned by another revision" )
307+ setRetryingConditions (cos , "removing revision resources that are not owned by another revision" , false )
319308 return ctrl.Result {RequeueAfter : 5 * time .Second }, nil
320309 }
321310 // Ensure conditions are set before removing the finalizer when archiving
@@ -333,29 +322,19 @@ type Sourcoser interface {
333322}
334323
335324func (c * ClusterObjectSetReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
336- skipProgressDeadlineExceededPredicate := predicate.Funcs {
337- UpdateFunc : func (e event.UpdateEvent ) bool {
338- rev , ok := e .ObjectNew .(* ocv1.ClusterObjectSet )
339- if ! ok {
340- return true
341- }
342- // allow deletions to happen
343- if ! rev .DeletionTimestamp .IsZero () {
344- return true
345- }
346- if cnd := meta .FindStatusCondition (rev .Status .Conditions , ocv1 .ClusterObjectSetTypeProgressing ); cnd != nil && cnd .Status == metav1 .ConditionFalse && cnd .Reason == ocv1 .ReasonProgressDeadlineExceeded {
347- return false
348- }
349- return true
350- },
351- }
352325 c .Clock = clock.RealClock {}
353326 return ctrl .NewControllerManagedBy (mgr ).
327+ WithOptions (controller.Options {
328+ RateLimiter : newDeadlineAwareRateLimiter (
329+ workqueue .DefaultTypedControllerRateLimiter [ctrl.Request ](),
330+ mgr .GetClient (),
331+ c .Clock ,
332+ ),
333+ }).
354334 For (
355335 & ocv1.ClusterObjectSet {},
356336 builder .WithPredicates (
357337 predicate.ResourceVersionChangedPredicate {},
358- skipProgressDeadlineExceededPredicate ,
359338 ),
360339 ).
361340 WatchesRawSource (
@@ -367,6 +346,51 @@ func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
367346 Complete (c )
368347}
369348
349+ // deadlineAwareRateLimiter wraps a delegate rate limiter and caps the backoff
350+ // duration to the time remaining until the COS progress deadline (+2s), ensuring
351+ // that ProgressDeadlineExceeded is set promptly even during exponential backoff.
352+ type deadlineAwareRateLimiter struct {
353+ delegate workqueue.TypedRateLimiter [ctrl.Request ]
354+ client client.Reader
355+ clock clock.Clock
356+ }
357+
358+ func newDeadlineAwareRateLimiter (
359+ delegate workqueue.TypedRateLimiter [ctrl.Request ],
360+ c client.Reader ,
361+ clk clock.Clock ,
362+ ) * deadlineAwareRateLimiter {
363+ return & deadlineAwareRateLimiter {delegate : delegate , client : c , clock : clk }
364+ }
365+
366+ func (r * deadlineAwareRateLimiter ) When (item ctrl.Request ) time.Duration {
367+ backoff := r .delegate .When (item )
368+
369+ cos := & ocv1.ClusterObjectSet {}
370+ if err := r .client .Get (context .Background (), item .NamespacedName , cos ); err != nil {
371+ return backoff
372+ }
373+
374+ remaining , hasDeadline := durationUntilDeadline (r .clock , cos )
375+ if ! hasDeadline {
376+ return backoff
377+ }
378+
379+ deadline := remaining + 2 * time .Second
380+ if deadline > 0 && deadline < backoff {
381+ return deadline
382+ }
383+ return backoff
384+ }
385+
386+ func (r * deadlineAwareRateLimiter ) Forget (item ctrl.Request ) {
387+ r .delegate .Forget (item )
388+ }
389+
390+ func (r * deadlineAwareRateLimiter ) NumRequeues (item ctrl.Request ) int {
391+ return r .delegate .NumRequeues (item )
392+ }
393+
370394func (c * ClusterObjectSetReconciler ) establishWatch (ctx context.Context , cos * ocv1.ClusterObjectSet , revision boxcutter.RevisionBuilder ) error {
371395 gvks := sets .New [schema.GroupVersionKind ]()
372396 for _ , phase := range revision .GetPhases () {
@@ -631,14 +655,61 @@ func buildProgressionProbes(progressionProbes []ocv1.ProgressionProbe) (probing.
631655 return userProbes , nil
632656}
633657
634- func setRetryingConditions (cos * ocv1.ClusterObjectSet , message string ) {
635- markAsProgressing (cos , ocv1 .ClusterObjectSetReasonRetrying , message )
658+ func setRetryingConditions (cos * ocv1.ClusterObjectSet , message string , isDeadlineExceeded bool ) {
659+ markAsProgressing (cos , ocv1 .ClusterObjectSetReasonRetrying , message , isDeadlineExceeded )
636660 if meta .FindStatusCondition (cos .Status .Conditions , ocv1 .ClusterObjectSetTypeAvailable ) != nil {
637661 markAsAvailableUnknown (cos , ocv1 .ClusterObjectSetReasonReconciling , message )
638662 }
639663}
640664
641- func markAsProgressing (cos * ocv1.ClusterObjectSet , reason , message string ) {
665+ // durationUntilDeadline returns how much time is left before the progress deadline
666+ // is exceeded. A negative duration means the deadline has already passed. If there
667+ // is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded),
668+ // it returns -1 and false.
669+ func (c * ClusterObjectSetReconciler ) durationUntilDeadline (cos * ocv1.ClusterObjectSet ) (time.Duration , bool ) {
670+ return durationUntilDeadline (c .Clock , cos )
671+ }
672+
673+ // durationUntilDeadline returns how much time is left before the progress deadline
674+ // is exceeded. A negative duration means the deadline has already passed. If there
675+ // is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded),
676+ // it returns -1 and false.
677+ func durationUntilDeadline (clk clock.Clock , cos * ocv1.ClusterObjectSet ) (time.Duration , bool ) {
678+ pd := cos .Spec .ProgressDeadlineMinutes
679+ if pd <= 0 {
680+ return - 1 , false
681+ }
682+ // Succeeded is a latch — once set, it's never cleared. A revision that
683+ // has already succeeded should not be blocked by the deadline, even if
684+ // it temporarily goes back to InTransition (e.g., recovery after drift).
685+ if meta .IsStatusConditionTrue (cos .Status .Conditions , ocv1 .ClusterObjectSetTypeSucceeded ) {
686+ return - 1 , false
687+ }
688+ timeout := time .Duration (pd ) * time .Minute
689+ return timeout - clk .Since (cos .CreationTimestamp .Time ), true
690+ }
691+
692+ // markAsProgressing sets the Progressing condition to True with the given reason.
693+ //
694+ // For non-terminal reasons (RollingOut, Retrying), if isDeadlineExceeded is true,
695+ // the condition is set to Progressing=False/ProgressDeadlineExceeded instead. This
696+ // prevents a reconcile loop where RollingOut and ProgressDeadlineExceeded overwrite
697+ // each other on every cycle.
698+ //
699+ // Terminal reasons (Succeeded) are always applied. Unregistered reasons panic.
700+ func markAsProgressing (cos * ocv1.ClusterObjectSet , reason , message string , isDeadlineExceeded bool ) {
701+ switch reason {
702+ case ocv1 .ReasonSucceeded :
703+ // Terminal — always apply.
704+ case ocv1 .ReasonRollingOut , ocv1 .ClusterObjectSetReasonRetrying :
705+ if isDeadlineExceeded {
706+ markAsNotProgressing (cos , ocv1 .ReasonProgressDeadlineExceeded ,
707+ fmt .Sprintf ("Revision has not rolled out for %d minute(s)." , cos .Spec .ProgressDeadlineMinutes ))
708+ return
709+ }
710+ default :
711+ panic (fmt .Sprintf ("unregistered progressing reason: %q" , reason ))
712+ }
642713 meta .SetStatusCondition (& cos .Status .Conditions , metav1.Condition {
643714 Type : ocv1 .ClusterObjectSetTypeProgressing ,
644715 Status : metav1 .ConditionTrue ,
0 commit comments