|
8 | 8 | "sync" |
9 | 9 | "time" |
10 | 10 |
|
| 11 | + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" |
| 12 | + |
11 | 13 | corev1 "k8s.io/api/core/v1" |
12 | 14 | apierrors "k8s.io/apimachinery/pkg/api/errors" |
13 | 15 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
@@ -53,6 +55,7 @@ import ( |
53 | 55 | "github.com/openshift/cluster-version-operator/pkg/payload" |
54 | 56 | "github.com/openshift/cluster-version-operator/pkg/payload/precondition" |
55 | 57 | preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion" |
| 58 | + "github.com/openshift/cluster-version-operator/pkg/proposal" |
56 | 59 | "github.com/openshift/cluster-version-operator/pkg/risk" |
57 | 60 | "github.com/openshift/cluster-version-operator/pkg/risk/alert" |
58 | 61 | ) |
@@ -213,6 +216,9 @@ type Operator struct { |
213 | 216 | // risks holds update-risk source (in-cluster alerts, etc.) |
214 | 217 | // that will be aggregated into conditional update risks. |
215 | 218 | risks risk.Source |
| 219 | + |
| 220 | + // proposalController, if enabled, watches available and conditionals updates and manage proposals for them |
| 221 | + proposalController *proposal.Controller |
216 | 222 | } |
217 | 223 |
|
218 | 224 | // New returns a new cluster version operator. |
@@ -242,6 +248,7 @@ func New( |
242 | 248 | featureSet configv1.FeatureSet, |
243 | 249 | cvoGates featuregates.CvoGateChecker, |
244 | 250 | startingEnabledManifestFeatureGates sets.Set[string], |
| 251 | + rtClient runtimeclient.Client, |
245 | 252 | ) (*Operator, error) { |
246 | 253 | eventBroadcaster := record.NewBroadcaster() |
247 | 254 | eventBroadcaster.StartLogging(klog.Infof) |
@@ -317,6 +324,14 @@ func New( |
317 | 324 |
|
318 | 325 | optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory) |
319 | 326 |
|
| 327 | + optr.proposalController = proposal.NewController(func() ([]configv1.Release, []configv1.ConditionalUpdate, error) { |
| 328 | + availableUpdates := optr.getAvailableUpdates() |
| 329 | + if availableUpdates == nil { |
| 330 | + return nil, nil, nil |
| 331 | + } |
| 332 | + return availableUpdates.Updates, availableUpdates.ConditionalUpdates, nil |
| 333 | + }, rtClient, cvInformer.Lister().Get) |
| 334 | + |
320 | 335 | return optr, nil |
321 | 336 | } |
322 | 337 |
|
@@ -470,6 +485,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co |
470 | 485 | defer optr.availableUpdatesQueue.ShutDown() |
471 | 486 | defer optr.upgradeableQueue.ShutDown() |
472 | 487 | defer optr.configuration.Queue().ShutDown() |
| 488 | + defer optr.proposalController.Queue().ShutDown() |
473 | 489 | stopCh := runContext.Done() |
474 | 490 |
|
475 | 491 | klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) |
@@ -525,6 +541,19 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co |
525 | 541 | klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.") |
526 | 542 | } |
527 | 543 |
|
| 544 | + if optr.shouldEnableProposalController() { |
| 545 | + resultChannelCount++ |
| 546 | + go func() { |
| 547 | + defer utilruntime.HandleCrash() |
| 548 | + wait.UntilWithContext(runContext, func(runContext context.Context) { |
| 549 | + optr.worker(runContext, optr.proposalController.Queue(), optr.proposalController.Sync) |
| 550 | + }, time.Second) |
| 551 | + resultChannel <- asyncResult{name: "proposal controller"} |
| 552 | + }() |
| 553 | + } else { |
| 554 | + klog.Infof("The proposal controller is disabled.") |
| 555 | + } |
| 556 | + |
528 | 557 | resultChannelCount++ |
529 | 558 | go func() { |
530 | 559 | defer utilruntime.HandleCrash() |
@@ -595,6 +624,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co |
595 | 624 | optr.availableUpdatesQueue.ShutDown() |
596 | 625 | optr.upgradeableQueue.ShutDown() |
597 | 626 | optr.configuration.Queue().ShutDown() |
| 627 | + optr.proposalController.Queue().ShutDown() |
598 | 628 | } |
599 | 629 | } |
600 | 630 |
|
@@ -1191,3 +1221,10 @@ func (optr *Operator) shouldReconcileAcceptRisks() bool { |
1191 | 1221 | // HyperShift will be supported later if needed |
1192 | 1222 | return optr.enabledCVOFeatureGates.AcceptRisks() && !optr.hypershift |
1193 | 1223 | } |
| 1224 | + |
| 1225 | +// shouldEnableProposalController returns whether the CVO should enable the proposal controller |
| 1226 | +func (optr *Operator) shouldEnableProposalController() bool { |
| 1227 | + // We do not have a specific gate for the Proposal feature and use the TechPreviewNoUpgrade instead. |
| 1228 | + // It can ensure that featuregates.ChangeStopper restarts CVO when the returns of this function flips. |
| 1229 | + return optr.requiredFeatureSet == configv1.TechPreviewNoUpgrade |
| 1230 | +} |
0 commit comments