Skip to content

Commit d33bcbb

Browse files
committed
OTA-1966: Init the Proposal Lifecycle Controller
1 parent e9c1c39 commit d33bcbb

3 files changed

Lines changed: 148 additions & 0 deletions

File tree

pkg/cvo/availableupdates.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1
182182

183183
// queue optr.sync() to update ClusterVersion status
184184
optr.queue.Add(queueKey)
185+
// queue optr.proposalController.Sync() to manage proposals
186+
optr.proposalController.Queue().Add(optr.proposalController.QueueKey())
185187
return nil
186188
}
187189

pkg/cvo/cvo.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"sync"
99
"time"
1010

11+
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
12+
"sigs.k8s.io/controller-runtime/pkg/client/config"
13+
1114
corev1 "k8s.io/api/core/v1"
1215
apierrors "k8s.io/apimachinery/pkg/api/errors"
1316
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -53,6 +56,7 @@ import (
5356
"github.com/openshift/cluster-version-operator/pkg/payload"
5457
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
5558
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
59+
"github.com/openshift/cluster-version-operator/pkg/proposal"
5660
"github.com/openshift/cluster-version-operator/pkg/risk"
5761
"github.com/openshift/cluster-version-operator/pkg/risk/alert"
5862
)
@@ -213,6 +217,9 @@ type Operator struct {
213217
// risks holds update-risk source (in-cluster alerts, etc.)
214218
// that will be aggregated into conditional update risks.
215219
risks risk.Source
220+
221+
// proposalController, if enabled, watch available updates and manage proposals
222+
proposalController *proposal.Controller
216223
}
217224

218225
// New returns a new cluster version operator.
@@ -317,6 +324,17 @@ func New(
317324

318325
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
319326

327+
c, err := runtimeclient.New(config.GetConfigOrDie(), runtimeclient.Options{})
328+
if err != nil {
329+
return nil, err
330+
}
331+
optr.proposalController = proposal.NewController(func() ([]configv1.Release, []configv1.ConditionalUpdate, error) {
332+
if optr.availableUpdates == nil {
333+
return nil, nil, nil
334+
}
335+
return optr.availableUpdates.Updates, optr.availableUpdates.ConditionalUpdates, nil
336+
}, c)
337+
320338
return optr, nil
321339
}
322340

@@ -470,6 +488,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
470488
defer optr.availableUpdatesQueue.ShutDown()
471489
defer optr.upgradeableQueue.ShutDown()
472490
defer optr.configuration.Queue().ShutDown()
491+
defer optr.proposalController.Queue().ShutDown()
473492
stopCh := runContext.Done()
474493

475494
klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
@@ -525,6 +544,19 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
525544
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.")
526545
}
527546

547+
if optr.shouldEnableProposalController() {
548+
resultChannelCount++
549+
go func() {
550+
defer utilruntime.HandleCrash()
551+
wait.UntilWithContext(runContext, func(runContext context.Context) {
552+
optr.worker(runContext, optr.proposalController.Queue(), optr.proposalController.Sync)
553+
}, time.Second)
554+
resultChannel <- asyncResult{name: "proposal controller"}
555+
}()
556+
} else {
557+
klog.Infof("The proposal controller is disabled.")
558+
}
559+
528560
resultChannelCount++
529561
go func() {
530562
defer utilruntime.HandleCrash()
@@ -595,6 +627,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
595627
optr.availableUpdatesQueue.ShutDown()
596628
optr.upgradeableQueue.ShutDown()
597629
optr.configuration.Queue().ShutDown()
630+
optr.proposalController.Queue().ShutDown()
598631
}
599632
}
600633

@@ -1191,3 +1224,9 @@ func (optr *Operator) shouldReconcileAcceptRisks() bool {
11911224
// HyperShift will be supported later if needed
11921225
return optr.enabledCVOFeatureGates.AcceptRisks() && !optr.hypershift
11931226
}
1227+
1228+
// shouldEnableProposalController returns whether the CVO should enable the proposal controller
1229+
func (optr *Operator) shouldEnableProposalController() bool {
1230+
// TODO: guarded by some feature gate?
1231+
return true
1232+
}

pkg/proposal/controller.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package proposal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"sigs.k8s.io/controller-runtime/pkg/client"
9+
10+
corev1 "k8s.io/api/core/v1"
11+
kapierrors "k8s.io/apimachinery/pkg/api/errors"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
14+
"k8s.io/client-go/util/workqueue"
15+
"k8s.io/klog/v2"
16+
17+
configv1 "github.com/openshift/api/config/v1"
18+
19+
i "github.com/openshift/cluster-version-operator/pkg/internal"
20+
proposalv1alpha1 "github.com/openshift/cluster-version-operator/pkg/proposal/api/v1alpha1"
21+
)
22+
23+
type Controller struct {
24+
queueKey string
25+
// queue tracks checking for the CVO configuration.
26+
//
27+
// The type any is used to comply with the worker method of the cvo.Operator struct.
28+
queue workqueue.TypedRateLimitingInterface[any]
29+
30+
updatesGetterFunc UpdatesGetterFunc
31+
32+
client client.Client
33+
}
34+
35+
const controllerName = "proposal-lifecycle-controller"
36+
37+
type UpdatesGetterFunc func() ([]configv1.Release, []configv1.ConditionalUpdate, error)
38+
39+
// NewController returns Controller to manage Proposals.
40+
func NewController(updatesGetterFunc UpdatesGetterFunc, client client.Client) *Controller {
41+
return &Controller{
42+
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", controllerName),
43+
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
44+
workqueue.DefaultTypedControllerRateLimiter[any](),
45+
workqueue.TypedRateLimitingQueueConfig[any]{Name: controllerName}),
46+
updatesGetterFunc: updatesGetterFunc,
47+
client: client,
48+
}
49+
}
50+
51+
func (c *Controller) Queue() workqueue.TypedRateLimitingInterface[any] {
52+
return c.queue
53+
}
54+
55+
func (c *Controller) QueueKey() string {
56+
return c.queueKey
57+
}
58+
59+
// Sync ensures:
60+
// TODO:
61+
func (c *Controller) Sync(ctx context.Context, key string) error {
62+
startTime := time.Now()
63+
klog.V(i.Normal).Infof("Started syncing CVO configuration %q", key)
64+
defer func() {
65+
klog.V(i.Normal).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
66+
}()
67+
68+
updates, conditionalUpdates, err := c.updatesGetterFunc()
69+
if err != nil {
70+
klog.Errorf("Error getting available updates: %v", err)
71+
return err
72+
}
73+
klog.Infof("Got available updates: %#v", updates)
74+
klog.Infof("Got available updates: %#v", conditionalUpdates)
75+
76+
proposals := getProposals(updates, conditionalUpdates)
77+
78+
var errs []error
79+
for _, proposal := range proposals {
80+
err := c.client.Create(ctx, proposal)
81+
if err != nil {
82+
if !kapierrors.IsAlreadyExists(err) {
83+
errs = append(errs, err)
84+
}
85+
}
86+
}
87+
88+
return kutilerrors.NewAggregate(errs)
89+
}
90+
91+
// TODO: make it real
92+
func getProposals(updates []configv1.Release, conditionalUpdates []configv1.ConditionalUpdate) []*proposalv1alpha1.Proposal {
93+
return []*proposalv1alpha1.Proposal{
94+
{
95+
ObjectMeta: metav1.ObjectMeta{
96+
Name: controllerName,
97+
Namespace: metav1.NamespaceAll,
98+
},
99+
Spec: proposalv1alpha1.ProposalSpec{
100+
Request: "some-request",
101+
WorkflowRef: corev1.LocalObjectReference{
102+
Name: "ota-advisory",
103+
},
104+
},
105+
},
106+
}
107+
}

0 commit comments

Comments
 (0)