Skip to content

Commit ddf0131

Browse files
committed
pkg: Use risk.Source framework to feed Upgradeable
This simplifies the complex cvo package by shifting some logic into small, narrowly scoped helper packages. That makes for easier unit testing of that logic, and less complication when reading the remaining cvo package code. Using the risk.Source framework with the new aggregate implementation allows Upgradeable risks to feed into the conditionalUpdateRisks framework. This should help reduce confusion some users experience when the see Upgradeable=False and think it applies to all updates (when in reality it applies to major and minor bumps, but does not apply to patch updates, especially since 6f8f984, OTA-1860: Stop blocking patch updates when cluster version overrides are set, 2026-02-10, #1314). It will also allow structured access to individual risks that are reported via this pipeline, while Upgradeable had an aggregated message string only consumable by humans. I'm also removing internal.UpgradeableAdminAckRequired and similar from ClusterVersion status.conditions. While these were visible by end users, they were mostly a mechanism for passing data from one part of the CVO to another. There are no known consumers outside of the CVO. With the refactor providing clearer abstraction layers, I could drop the entire upgradeableCheckIntervals structure. Major touches in the history of that structure were bd05174 (pkg/cvo/upgradeable.go: Sync Upgradeable status of the CVO more often, 2022-08-02, #808) and cc94c95 (pkg/cvo/upgradeable: refactor throttling, 2023-01-11, #882). The new approach is event-driven via the informer handlers for almost all of the new risk.Source implementations. The one source that still relies on polling is the deletion source, where it's watching a CVO-side global variable, and not a Kube-side informer. It will still sync whenever ClusterVersion status is updated though, which should be often enough for this usually-transient situation. If folks want lower latency, follow up work could add informer-style handler callbacks to the deletion global. The addition of '.' and '-' to validName in pkg/risk allows for names like ClusterOperatorUpgradeable-test-operator in the Upgradeable test, composed with the ClusterOperator name. That requires us to extend the regular expression to allow the characters supported by ClusterOperator names [2]. We could alternatively build a system that CamelCased ClusterOperator names, but that seemed difficult, and not worth the effort. The changeCallback in unit tests is unbuffered, because I want to know exactly how often that callback is being called. If a logic change in the packages causes additional callback calls, the unbuffered channel will block them from sending the message, deadlock the informer for that handler [1], and break later test-cases. [1]: https://github.com/kubernetes/client-go/blob/v0.35.3/tools/cache/shared_informer.go#L1052-L1086 [2]: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-subdomain-names
1 parent de31a3f commit ddf0131

21 files changed

Lines changed: 2025 additions & 2003 deletions

pkg/cvo/cvo.go

Lines changed: 21 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,13 @@ import (
5757
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
5858
"github.com/openshift/cluster-version-operator/pkg/proposal"
5959
"github.com/openshift/cluster-version-operator/pkg/risk"
60+
"github.com/openshift/cluster-version-operator/pkg/risk/adminack"
61+
"github.com/openshift/cluster-version-operator/pkg/risk/aggregate"
6062
"github.com/openshift/cluster-version-operator/pkg/risk/alert"
63+
deletionrisk "github.com/openshift/cluster-version-operator/pkg/risk/deletion"
64+
"github.com/openshift/cluster-version-operator/pkg/risk/overrides"
65+
updatingrisk "github.com/openshift/cluster-version-operator/pkg/risk/updating"
66+
upgradeablerisk "github.com/openshift/cluster-version-operator/pkg/risk/upgradeable"
6167
)
6268

6369
const (
@@ -133,21 +139,13 @@ type Operator struct {
133139
queue workqueue.TypedRateLimitingInterface[any]
134140
// availableUpdatesQueue tracks checking for updates from the update server.
135141
availableUpdatesQueue workqueue.TypedRateLimitingInterface[any]
136-
// upgradeableQueue tracks checking for upgradeable.
137-
upgradeableQueue workqueue.TypedRateLimitingInterface[any]
138142

139143
// statusLock guards access to modifying available updates
140144
statusLock sync.Mutex
141145
availableUpdates *availableUpdates
142146

143-
// upgradeableStatusLock guards access to modifying Upgradeable conditions
144-
upgradeableStatusLock sync.Mutex
145-
upgradeable *upgradeable
146-
upgradeableChecks []upgradeableCheck
147-
148-
// upgradeableCheckIntervals drives minimal intervals between Upgradeable status
149-
// synchronization
150-
upgradeableCheckIntervals upgradeableCheckIntervals
147+
// upgradeable tracks the risks that feed the ClusterVersion Upgradeable condition.
148+
upgradeable risk.Source
151149

152150
// conditionRegistry is used to evaluate whether a particular condition is risky or not.
153151
conditionRegistry clusterconditions.ConditionRegistry
@@ -264,7 +262,6 @@ func New(
264262

265263
statusInterval: 15 * time.Second,
266264
minimumUpdateCheckInterval: minimumInterval,
267-
upgradeableCheckIntervals: defaultUpgradeableCheckIntervals(),
268265
payloadDir: overridePayloadDir,
269266
updateService: updateService,
270267

@@ -274,13 +271,11 @@ func New(
274271
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
275272
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
276273
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
277-
upgradeableQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "upgradeable"}),
278274

279275
hypershift: hypershift,
280276
exclude: exclude,
281277
clusterProfile: clusterProfile,
282278
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
283-
risks: alert.New("Alert", promqlTarget),
284279
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,
285280

286281
requiredFeatureSet: featureSet,
@@ -293,12 +288,6 @@ func New(
293288
if _, err := cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()); err != nil {
294289
return nil, err
295290
}
296-
if _, err := cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler()); err != nil {
297-
return nil, err
298-
}
299-
if _, err := cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler()); err != nil {
300-
return nil, err
301-
}
302291
if _, err := coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler()); err != nil {
303292
return nil, err
304293
}
@@ -320,7 +309,19 @@ func New(
320309
optr.cacheSynced = append(optr.cacheSynced, featureGateInformer.Informer().HasSynced)
321310

322311
// make sure this is initialized after all the listers are initialized
323-
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
312+
riskSourceCallback := func() { optr.availableUpdatesQueue.Add(optr.queueKey()) }
313+
optr.upgradeable = aggregate.New(
314+
updatingrisk.New("ClusterVersionUpdating", optr.name, cvInformer, riskSourceCallback),
315+
overrides.New("ClusterVersionOverrides", optr.name, cvInformer, riskSourceCallback),
316+
deletionrisk.New("ResourceDeletionInProgress", optr.currentVersion),
317+
adminack.New("AdminAck", optr.currentVersion, cmConfigManagedInformer, cmConfigInformer, riskSourceCallback),
318+
upgradeablerisk.New("ClusterOperatorUpgradeable", optr.currentVersion, coInformer, riskSourceCallback),
319+
)
320+
321+
optr.risks = aggregate.New(
322+
alert.New("Alert", promqlTarget),
323+
optr.upgradeable,
324+
)
324325

325326
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
326327

@@ -483,7 +484,6 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
483484
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context) error {
484485
defer optr.queue.ShutDown()
485486
defer optr.availableUpdatesQueue.ShutDown()
486-
defer optr.upgradeableQueue.ShutDown()
487487
defer optr.configuration.Queue().ShutDown()
488488
defer optr.proposalController.Queue().ShutDown()
489489
stopCh := runContext.Done()
@@ -554,25 +554,12 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
554554
klog.Infof("The proposal controller is disabled.")
555555
}
556556

557-
resultChannelCount++
558-
go func() {
559-
defer utilruntime.HandleCrash()
560-
wait.UntilWithContext(runContext, func(runContext context.Context) {
561-
optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSyncFunc(false))
562-
}, time.Second)
563-
resultChannel <- asyncResult{name: "upgradeable"}
564-
}()
565-
566557
resultChannelCount++
567558
go func() {
568559
defer utilruntime.HandleCrash()
569560
wait.UntilWithContext(runContext, func(runContext context.Context) {
570561
// run the worker, then when the queue is closed sync one final time to flush any pending status
571562
optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) })
572-
// This is to ensure upgradeableCondition to be synced and thus to avoid the race caused by the throttle
573-
if err := optr.upgradeableSyncFunc(true)(shutdownContext, optr.queueKey()); err != nil {
574-
utilruntime.HandleError(fmt.Errorf("unable to perform final upgradeable sync: %v", err))
575-
}
576563
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
577564
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
578565
}
@@ -622,7 +609,6 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
622609
shutdown = true
623610
optr.queue.ShutDown()
624611
optr.availableUpdatesQueue.ShutDown()
625-
optr.upgradeableQueue.ShutDown()
626612
optr.configuration.Queue().ShutDown()
627613
optr.proposalController.Queue().ShutDown()
628614
}
@@ -643,12 +629,10 @@ func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
643629
AddFunc: func(_ interface{}) {
644630
optr.queue.Add(workQueueKey)
645631
optr.availableUpdatesQueue.Add(workQueueKey)
646-
optr.upgradeableQueue.Add(workQueueKey)
647632
},
648633
UpdateFunc: func(_, _ interface{}) {
649634
optr.queue.Add(workQueueKey)
650635
optr.availableUpdatesQueue.Add(workQueueKey)
651-
optr.upgradeableQueue.Add(workQueueKey)
652636
},
653637
DeleteFunc: func(_ interface{}) {
654638
optr.queue.Add(workQueueKey)
@@ -859,31 +843,6 @@ func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) erro
859843
return optr.syncAvailableUpdates(ctx, config)
860844
}
861845

862-
// upgradeableSyncFunc returns a function that is triggered on cluster version change (and periodic requeues) to
863-
// sync upgradeableCondition. It only modifies cluster version.
864-
func (optr *Operator) upgradeableSyncFunc(ignoreThrottlePeriod bool) func(_ context.Context, key string) error {
865-
return func(_ context.Context, key string) error {
866-
startTime := time.Now()
867-
klog.V(2).Infof("Started syncing upgradeable %q", key)
868-
defer func() {
869-
klog.V(2).Infof("Finished syncing upgradeable %q (%v)", key, time.Since(startTime))
870-
}()
871-
872-
config, err := optr.cvLister.Get(optr.name)
873-
if apierrors.IsNotFound(err) {
874-
return nil
875-
}
876-
if err != nil {
877-
return err
878-
}
879-
if errs := validation.ValidateClusterVersion(config, optr.shouldReconcileAcceptRisks()); len(errs) > 0 {
880-
return nil
881-
}
882-
883-
return optr.syncUpgradeable(config, ignoreThrottlePeriod)
884-
}
885-
}
886-
887846
// isOlderThanLastUpdate returns true if the cluster version is older than
888847
// the last update we saw.
889848
func (optr *Operator) isOlderThanLastUpdate(config *configv1.ClusterVersion) bool {

pkg/cvo/cvo_scenarios_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/openshift/client-go/config/clientset/versioned/fake"
3030
"github.com/openshift/library-go/pkg/manifest"
3131

32+
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
33+
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
3234
"github.com/openshift/cluster-version-operator/pkg/featuregates"
3335
"github.com/openshift/cluster-version-operator/pkg/internal"
3436
"github.com/openshift/cluster-version-operator/pkg/payload"
@@ -114,13 +116,16 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]apiruntime.Object, *
114116
fmt.Printf("Cannot create cluster version object, err: %#v\n", err)
115117
}
116118

119+
registry := clusterconditions.NewConditionRegistry()
120+
registry.Register("Always", &always.Always{})
117121
o := &Operator{
118122
namespace: "test",
119123
name: "version",
120124
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "cvo-loop-test"}),
121125
client: client,
122126
enabledCVOFeatureGates: featuregates.DefaultCvoGates("version"),
123127
cvLister: &clientCVLister{client: client},
128+
conditionRegistry: registry,
124129
exclude: "exclude-test",
125130
eventRecorder: record.NewFakeRecorder(100),
126131
clusterProfile: payload.DefaultClusterProfile,

0 commit comments

Comments
 (0)