Skip to content

Commit cba244e

Browse files
committed
pkg: Use risk.Source framework to feed Upgradeable
This simplifies the complex cvo package by shifting some logic into small, narrowly scoped helper packages. That makes for easier unit testing of that logic, and less complication when reading the remaining cvo package code. Using the risk.Source framework with the new aggregate implementation allows Upgradeable risks to feed into the conditionalUpdateRisks framework. This should help reduce confusion some users experience when the see Upgradeable=False and think it applies to all updates (when in reality it applies to major and minor bumps, but does not apply to patch updates, especially since 6f8f984, OTA-1860: Stop blocking patch updates when cluster version overrides are set, 2026-02-10, #1314). It will also allow structured access to individual risks that are reported via this pipeline, while Upgradeable had an aggregated message string only consumable by humans. I'm also removing internal.UpgradeableAdminAckRequired and similar from ClusterVersion status.conditions. While these were visible by end users, they were mostly a mechanism for passing data from one part of the CVO to another. There are no known consumers outside of the CVO. With the refactor providing clearer abstraction layers, I could drop the entire upgradeableCheckIntervals structure. Major touches in the history of that structure were bd05174 (pkg/cvo/upgradeable.go: Sync Upgradeable status of the CVO more often, 2022-08-02, #808) and cc94c95 (pkg/cvo/upgradeable: refactor throttling, 2023-01-11, #882). The new approach is event-driven via the informer handlers for almost all of the new risk.Source implementations. The one source that still relies on polling is the deletion source, where it's watching a CVO-side global variable, and not a Kube-side informer. It will still sync whenever ClusterVersion status is updated though, which should be often enough for this usually-transient situation. If folks want lower latency, follow up work could add informer-style handler callbacks to the deletion global.
1 parent f2ac936 commit cba244e

20 files changed

Lines changed: 1991 additions & 2003 deletions

pkg/cvo/cvo.go

Lines changed: 21 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,13 @@ import (
5454
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
5555
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
5656
"github.com/openshift/cluster-version-operator/pkg/risk"
57+
"github.com/openshift/cluster-version-operator/pkg/risk/adminack"
58+
"github.com/openshift/cluster-version-operator/pkg/risk/aggregate"
5759
"github.com/openshift/cluster-version-operator/pkg/risk/alert"
60+
deletionrisk "github.com/openshift/cluster-version-operator/pkg/risk/deletion"
61+
"github.com/openshift/cluster-version-operator/pkg/risk/overrides"
62+
updatingrisk "github.com/openshift/cluster-version-operator/pkg/risk/updating"
63+
upgradeablerisk "github.com/openshift/cluster-version-operator/pkg/risk/upgradeable"
5864
)
5965

6066
const (
@@ -130,21 +136,13 @@ type Operator struct {
130136
queue workqueue.TypedRateLimitingInterface[any]
131137
// availableUpdatesQueue tracks checking for updates from the update server.
132138
availableUpdatesQueue workqueue.TypedRateLimitingInterface[any]
133-
// upgradeableQueue tracks checking for upgradeable.
134-
upgradeableQueue workqueue.TypedRateLimitingInterface[any]
135139

136140
// statusLock guards access to modifying available updates
137141
statusLock sync.Mutex
138142
availableUpdates *availableUpdates
139143

140-
// upgradeableStatusLock guards access to modifying Upgradeable conditions
141-
upgradeableStatusLock sync.Mutex
142-
upgradeable *upgradeable
143-
upgradeableChecks []upgradeableCheck
144-
145-
// upgradeableCheckIntervals drives minimal intervals between Upgradeable status
146-
// synchronization
147-
upgradeableCheckIntervals upgradeableCheckIntervals
144+
// upgradeable tracks the risks that feed the ClusterVersion Upgradeable condition.
145+
upgradeable risk.Source
148146

149147
// conditionRegistry is used to evaluate whether a particular condition is risky or not.
150148
conditionRegistry clusterconditions.ConditionRegistry
@@ -257,7 +255,6 @@ func New(
257255

258256
statusInterval: 15 * time.Second,
259257
minimumUpdateCheckInterval: minimumInterval,
260-
upgradeableCheckIntervals: defaultUpgradeableCheckIntervals(),
261258
payloadDir: overridePayloadDir,
262259
updateService: updateService,
263260

@@ -267,13 +264,11 @@ func New(
267264
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
268265
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
269266
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
270-
upgradeableQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "upgradeable"}),
271267

272268
hypershift: hypershift,
273269
exclude: exclude,
274270
clusterProfile: clusterProfile,
275271
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
276-
risks: alert.New("Alert", promqlTarget),
277272
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,
278273

279274
requiredFeatureSet: featureSet,
@@ -286,12 +281,6 @@ func New(
286281
if _, err := cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()); err != nil {
287282
return nil, err
288283
}
289-
if _, err := cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler()); err != nil {
290-
return nil, err
291-
}
292-
if _, err := cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler()); err != nil {
293-
return nil, err
294-
}
295284
if _, err := coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler()); err != nil {
296285
return nil, err
297286
}
@@ -313,7 +302,19 @@ func New(
313302
optr.cacheSynced = append(optr.cacheSynced, featureGateInformer.Informer().HasSynced)
314303

315304
// make sure this is initialized after all the listers are initialized
316-
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
305+
riskSourceCallback := func() { optr.availableUpdatesQueue.Add(optr.queueKey()) }
306+
optr.upgradeable = aggregate.New(
307+
updatingrisk.New("ClusterVersionUpdating", optr.name, cvInformer, riskSourceCallback),
308+
overrides.New("ClusterVersionOverrides", optr.name, cvInformer, riskSourceCallback),
309+
deletionrisk.New("ResourceDeletionInProgress", optr.currentVersion),
310+
adminack.New("AdminAck", optr.currentVersion, cmConfigManagedInformer, cmConfigInformer, riskSourceCallback),
311+
upgradeablerisk.New("ClusterOperatorUpgradeable", optr.currentVersion, coInformer, riskSourceCallback),
312+
)
313+
314+
optr.risks = aggregate.New(
315+
alert.New("Alert", promqlTarget),
316+
optr.upgradeable,
317+
)
317318

318319
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
319320

@@ -468,7 +469,6 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
468469
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context) error {
469470
defer optr.queue.ShutDown()
470471
defer optr.availableUpdatesQueue.ShutDown()
471-
defer optr.upgradeableQueue.ShutDown()
472472
defer optr.configuration.Queue().ShutDown()
473473
stopCh := runContext.Done()
474474

@@ -525,25 +525,12 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
525525
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.")
526526
}
527527

528-
resultChannelCount++
529-
go func() {
530-
defer utilruntime.HandleCrash()
531-
wait.UntilWithContext(runContext, func(runContext context.Context) {
532-
optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSyncFunc(false))
533-
}, time.Second)
534-
resultChannel <- asyncResult{name: "upgradeable"}
535-
}()
536-
537528
resultChannelCount++
538529
go func() {
539530
defer utilruntime.HandleCrash()
540531
wait.UntilWithContext(runContext, func(runContext context.Context) {
541532
// run the worker, then when the queue is closed sync one final time to flush any pending status
542533
optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) })
543-
// This is to ensure upgradeableCondition to be synced and thus to avoid the race caused by the throttle
544-
if err := optr.upgradeableSyncFunc(true)(shutdownContext, optr.queueKey()); err != nil {
545-
utilruntime.HandleError(fmt.Errorf("unable to perform final upgradeable sync: %v", err))
546-
}
547534
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
548535
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
549536
}
@@ -593,7 +580,6 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
593580
shutdown = true
594581
optr.queue.ShutDown()
595582
optr.availableUpdatesQueue.ShutDown()
596-
optr.upgradeableQueue.ShutDown()
597583
optr.configuration.Queue().ShutDown()
598584
}
599585
}
@@ -613,12 +599,10 @@ func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
613599
AddFunc: func(_ interface{}) {
614600
optr.queue.Add(workQueueKey)
615601
optr.availableUpdatesQueue.Add(workQueueKey)
616-
optr.upgradeableQueue.Add(workQueueKey)
617602
},
618603
UpdateFunc: func(_, _ interface{}) {
619604
optr.queue.Add(workQueueKey)
620605
optr.availableUpdatesQueue.Add(workQueueKey)
621-
optr.upgradeableQueue.Add(workQueueKey)
622606
},
623607
DeleteFunc: func(_ interface{}) {
624608
optr.queue.Add(workQueueKey)
@@ -829,31 +813,6 @@ func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) erro
829813
return optr.syncAvailableUpdates(ctx, config)
830814
}
831815

832-
// upgradeableSyncFunc returns a function that is triggered on cluster version change (and periodic requeues) to
833-
// sync upgradeableCondition. It only modifies cluster version.
834-
func (optr *Operator) upgradeableSyncFunc(ignoreThrottlePeriod bool) func(_ context.Context, key string) error {
835-
return func(_ context.Context, key string) error {
836-
startTime := time.Now()
837-
klog.V(2).Infof("Started syncing upgradeable %q", key)
838-
defer func() {
839-
klog.V(2).Infof("Finished syncing upgradeable %q (%v)", key, time.Since(startTime))
840-
}()
841-
842-
config, err := optr.cvLister.Get(optr.name)
843-
if apierrors.IsNotFound(err) {
844-
return nil
845-
}
846-
if err != nil {
847-
return err
848-
}
849-
if errs := validation.ValidateClusterVersion(config, optr.shouldReconcileAcceptRisks()); len(errs) > 0 {
850-
return nil
851-
}
852-
853-
return optr.syncUpgradeable(config, ignoreThrottlePeriod)
854-
}
855-
}
856-
857816
// isOlderThanLastUpdate returns true if the cluster version is older than
858817
// the last update we saw.
859818
func (optr *Operator) isOlderThanLastUpdate(config *configv1.ClusterVersion) bool {

0 commit comments

Comments
 (0)