Skip to content

Commit bb338d4

Browse files
Merge pull request #1368 from wking/upgradeable-as-update-risk
OTA-1935: pkg: Use risk.Source framework to feed Upgradeable
2 parents e75b70a + 7764ea6 commit bb338d4

21 files changed

Lines changed: 2184 additions & 2004 deletions

pkg/cvo/cvo.go

Lines changed: 41 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,13 @@ import (
5757
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
5858
"github.com/openshift/cluster-version-operator/pkg/proposal"
5959
"github.com/openshift/cluster-version-operator/pkg/risk"
60+
"github.com/openshift/cluster-version-operator/pkg/risk/adminack"
61+
"github.com/openshift/cluster-version-operator/pkg/risk/aggregate"
6062
"github.com/openshift/cluster-version-operator/pkg/risk/alert"
63+
deletionrisk "github.com/openshift/cluster-version-operator/pkg/risk/deletion"
64+
"github.com/openshift/cluster-version-operator/pkg/risk/overrides"
65+
updatingrisk "github.com/openshift/cluster-version-operator/pkg/risk/updating"
66+
upgradeablerisk "github.com/openshift/cluster-version-operator/pkg/risk/upgradeable"
6167
)
6268

6369
const (
@@ -133,21 +139,13 @@ type Operator struct {
133139
queue workqueue.TypedRateLimitingInterface[any]
134140
// availableUpdatesQueue tracks checking for updates from the update server.
135141
availableUpdatesQueue workqueue.TypedRateLimitingInterface[any]
136-
// upgradeableQueue tracks checking for upgradeable.
137-
upgradeableQueue workqueue.TypedRateLimitingInterface[any]
138142

139143
// statusLock guards access to modifying available updates
140144
statusLock sync.Mutex
141145
availableUpdates *availableUpdates
142146

143-
// upgradeableStatusLock guards access to modifying Upgradeable conditions
144-
upgradeableStatusLock sync.Mutex
145-
upgradeable *upgradeable
146-
upgradeableChecks []upgradeableCheck
147-
148-
// upgradeableCheckIntervals drives minimal intervals between Upgradeable status
149-
// synchronization
150-
upgradeableCheckIntervals upgradeableCheckIntervals
147+
// upgradeable tracks the risks that feed the ClusterVersion Upgradeable condition.
148+
upgradeable risk.Source
151149

152150
// conditionRegistry is used to evaluate whether a particular condition is risky or not.
153151
conditionRegistry clusterconditions.ConditionRegistry
@@ -264,7 +262,6 @@ func New(
264262

265263
statusInterval: 15 * time.Second,
266264
minimumUpdateCheckInterval: minimumInterval,
267-
upgradeableCheckIntervals: defaultUpgradeableCheckIntervals(),
268265
payloadDir: overridePayloadDir,
269266
updateService: updateService,
270267

@@ -274,13 +271,11 @@ func New(
274271
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
275272
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
276273
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
277-
upgradeableQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "upgradeable"}),
278274

279275
hypershift: hypershift,
280276
exclude: exclude,
281277
clusterProfile: clusterProfile,
282278
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
283-
risks: alert.New("Alert", promqlTarget),
284279
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,
285280

286281
requiredFeatureSet: featureSet,
@@ -293,12 +288,6 @@ func New(
293288
if _, err := cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()); err != nil {
294289
return nil, err
295290
}
296-
if _, err := cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler()); err != nil {
297-
return nil, err
298-
}
299-
if _, err := cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler()); err != nil {
300-
return nil, err
301-
}
302291
if _, err := coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler()); err != nil {
303292
return nil, err
304293
}
@@ -314,13 +303,45 @@ func New(
314303

315304
optr.proxyLister = proxyInformer.Lister()
316305
optr.cmConfigLister = cmConfigInformer.Lister().ConfigMaps(internal.ConfigNamespace)
306+
optr.cacheSynced = append(optr.cacheSynced, cmConfigInformer.Informer().HasSynced)
317307
optr.cmConfigManagedLister = cmConfigManagedInformer.Lister().ConfigMaps(internal.ConfigManagedNamespace)
308+
optr.cacheSynced = append(optr.cacheSynced, cmConfigManagedInformer.Informer().HasSynced)
318309

319310
optr.featureGateLister = featureGateInformer.Lister()
320311
optr.cacheSynced = append(optr.cacheSynced, featureGateInformer.Informer().HasSynced)
321312

322313
// make sure this is initialized after all the listers are initialized
323-
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
314+
riskSourceCallback := func() { optr.availableUpdatesQueue.Add(optr.queueKey()) }
315+
316+
risks := []risk.Source{}
317+
if source, err := updatingrisk.New("ClusterVersionUpdating", optr.name, cvInformer, riskSourceCallback); err != nil {
318+
return optr, err
319+
} else {
320+
risks = append(risks, source)
321+
}
322+
if source, err := overrides.New("ClusterVersionOverrides", optr.name, cvInformer, riskSourceCallback); err != nil {
323+
return optr, err
324+
} else {
325+
risks = append(risks, source)
326+
}
327+
risks = append(risks, deletionrisk.New("ResourceDeletionInProgress", optr.currentVersion))
328+
if source, err := adminack.New("AdminAck", func() string { return optr.currentVersion().Version }, cmConfigManagedInformer, cmConfigInformer, riskSourceCallback); err != nil {
329+
return optr, err
330+
} else {
331+
risks = append(risks, source)
332+
}
333+
if source, err := upgradeablerisk.New("ClusterOperatorUpgradeable", optr.currentVersion, coInformer, riskSourceCallback); err != nil {
334+
return optr, err
335+
} else {
336+
risks = append(risks, source)
337+
}
338+
339+
optr.upgradeable = aggregate.New(risks...)
340+
341+
optr.risks = aggregate.New(
342+
alert.New("Alert", promqlTarget),
343+
optr.upgradeable,
344+
)
324345

325346
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
326347

@@ -483,7 +504,6 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
483504
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context) error {
484505
defer optr.queue.ShutDown()
485506
defer optr.availableUpdatesQueue.ShutDown()
486-
defer optr.upgradeableQueue.ShutDown()
487507
defer optr.configuration.Queue().ShutDown()
488508
defer optr.proposalController.Queue().ShutDown()
489509
stopCh := runContext.Done()
@@ -554,25 +574,12 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
554574
klog.Infof("The proposal controller is disabled.")
555575
}
556576

557-
resultChannelCount++
558-
go func() {
559-
defer utilruntime.HandleCrash()
560-
wait.UntilWithContext(runContext, func(runContext context.Context) {
561-
optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSyncFunc(false))
562-
}, time.Second)
563-
resultChannel <- asyncResult{name: "upgradeable"}
564-
}()
565-
566577
resultChannelCount++
567578
go func() {
568579
defer utilruntime.HandleCrash()
569580
wait.UntilWithContext(runContext, func(runContext context.Context) {
570581
// run the worker, then when the queue is closed sync one final time to flush any pending status
571582
optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) })
572-
// This is to ensure upgradeableCondition to be synced and thus to avoid the race caused by the throttle
573-
if err := optr.upgradeableSyncFunc(true)(shutdownContext, optr.queueKey()); err != nil {
574-
utilruntime.HandleError(fmt.Errorf("unable to perform final upgradeable sync: %v", err))
575-
}
576583
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
577584
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
578585
}
@@ -622,7 +629,6 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
622629
shutdown = true
623630
optr.queue.ShutDown()
624631
optr.availableUpdatesQueue.ShutDown()
625-
optr.upgradeableQueue.ShutDown()
626632
optr.configuration.Queue().ShutDown()
627633
optr.proposalController.Queue().ShutDown()
628634
}
@@ -643,12 +649,10 @@ func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
643649
AddFunc: func(_ interface{}) {
644650
optr.queue.Add(workQueueKey)
645651
optr.availableUpdatesQueue.Add(workQueueKey)
646-
optr.upgradeableQueue.Add(workQueueKey)
647652
},
648653
UpdateFunc: func(_, _ interface{}) {
649654
optr.queue.Add(workQueueKey)
650655
optr.availableUpdatesQueue.Add(workQueueKey)
651-
optr.upgradeableQueue.Add(workQueueKey)
652656
},
653657
DeleteFunc: func(_ interface{}) {
654658
optr.queue.Add(workQueueKey)
@@ -859,31 +863,6 @@ func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) erro
859863
return optr.syncAvailableUpdates(ctx, config)
860864
}
861865

862-
// upgradeableSyncFunc returns a function that is triggered on cluster version change (and periodic requeues) to
863-
// sync upgradeableCondition. It only modifies cluster version.
864-
func (optr *Operator) upgradeableSyncFunc(ignoreThrottlePeriod bool) func(_ context.Context, key string) error {
865-
return func(_ context.Context, key string) error {
866-
startTime := time.Now()
867-
klog.V(2).Infof("Started syncing upgradeable %q", key)
868-
defer func() {
869-
klog.V(2).Infof("Finished syncing upgradeable %q (%v)", key, time.Since(startTime))
870-
}()
871-
872-
config, err := optr.cvLister.Get(optr.name)
873-
if apierrors.IsNotFound(err) {
874-
return nil
875-
}
876-
if err != nil {
877-
return err
878-
}
879-
if errs := validation.ValidateClusterVersion(config, optr.shouldReconcileAcceptRisks()); len(errs) > 0 {
880-
return nil
881-
}
882-
883-
return optr.syncUpgradeable(config, ignoreThrottlePeriod)
884-
}
885-
}
886-
887866
// isOlderThanLastUpdate returns true if the cluster version is older than
888867
// the last update we saw.
889868
func (optr *Operator) isOlderThanLastUpdate(config *configv1.ClusterVersion) bool {

pkg/cvo/cvo_scenarios_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/openshift/client-go/config/clientset/versioned/fake"
3030
"github.com/openshift/library-go/pkg/manifest"
3131

32+
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
33+
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
3234
"github.com/openshift/cluster-version-operator/pkg/featuregates"
3335
"github.com/openshift/cluster-version-operator/pkg/internal"
3436
"github.com/openshift/cluster-version-operator/pkg/payload"
@@ -114,13 +116,16 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]apiruntime.Object, *
114116
fmt.Printf("Cannot create cluster version object, err: %#v\n", err)
115117
}
116118

119+
registry := clusterconditions.NewConditionRegistry()
120+
registry.Register("Always", &always.Always{})
117121
o := &Operator{
118122
namespace: "test",
119123
name: "version",
120124
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "cvo-loop-test"}),
121125
client: client,
122126
enabledCVOFeatureGates: featuregates.DefaultCvoGates("version"),
123127
cvLister: &clientCVLister{client: client},
128+
conditionRegistry: registry,
124129
exclude: "exclude-test",
125130
eventRecorder: record.NewFakeRecorder(100),
126131
clusterProfile: payload.DefaultClusterProfile,

0 commit comments

Comments
 (0)