Skip to content

Commit 86929de

Browse files
committed
OTA-1966: Init the Proposal Lifecycle Controller
1 parent 361d681 commit 86929de

6 files changed

Lines changed: 229 additions & 9 deletions

File tree

pkg/cvo/availableupdates.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1
182182

183183
// queue optr.sync() to update ClusterVersion status
184184
optr.queue.Add(queueKey)
185+
// queue optr.proposalController.Sync() to manage proposals
186+
optr.proposalController.Queue().Add(optr.proposalController.QueueKey())
185187
return nil
186188
}
187189

pkg/cvo/cvo.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"sync"
99
"time"
1010

11+
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
12+
1113
corev1 "k8s.io/api/core/v1"
1214
apierrors "k8s.io/apimachinery/pkg/api/errors"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -53,6 +55,7 @@ import (
5355
"github.com/openshift/cluster-version-operator/pkg/payload"
5456
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
5557
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
58+
"github.com/openshift/cluster-version-operator/pkg/proposal"
5659
"github.com/openshift/cluster-version-operator/pkg/risk"
5760
"github.com/openshift/cluster-version-operator/pkg/risk/alert"
5861
)
@@ -213,6 +216,9 @@ type Operator struct {
213216
// risks holds update-risk source (in-cluster alerts, etc.)
214217
// that will be aggregated into conditional update risks.
215218
risks risk.Source
219+
220+
// proposalController, if enabled, watch available updates and manage proposals
221+
proposalController *proposal.Controller
216222
}
217223

218224
// New returns a new cluster version operator.
@@ -242,6 +248,7 @@ func New(
242248
featureSet configv1.FeatureSet,
243249
cvoGates featuregates.CvoGateChecker,
244250
startingEnabledManifestFeatureGates sets.Set[string],
251+
rtClient runtimeclient.Client,
245252
) (*Operator, error) {
246253
eventBroadcaster := record.NewBroadcaster()
247254
eventBroadcaster.StartLogging(klog.Infof)
@@ -317,6 +324,14 @@ func New(
317324

318325
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
319326

327+
optr.proposalController = proposal.NewController(func() ([]configv1.Release, []configv1.ConditionalUpdate, error) {
328+
availableUpdates := optr.getAvailableUpdates()
329+
if availableUpdates == nil {
330+
return nil, nil, nil
331+
}
332+
return availableUpdates.Updates, availableUpdates.ConditionalUpdates, nil
333+
}, rtClient)
334+
320335
return optr, nil
321336
}
322337

@@ -470,6 +485,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
470485
defer optr.availableUpdatesQueue.ShutDown()
471486
defer optr.upgradeableQueue.ShutDown()
472487
defer optr.configuration.Queue().ShutDown()
488+
defer optr.proposalController.Queue().ShutDown()
473489
stopCh := runContext.Done()
474490

475491
klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
@@ -525,6 +541,19 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
525541
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.")
526542
}
527543

544+
if optr.shouldEnableProposalController() {
545+
resultChannelCount++
546+
go func() {
547+
defer utilruntime.HandleCrash()
548+
wait.UntilWithContext(runContext, func(runContext context.Context) {
549+
optr.worker(runContext, optr.proposalController.Queue(), optr.proposalController.Sync)
550+
}, time.Second)
551+
resultChannel <- asyncResult{name: "proposal controller"}
552+
}()
553+
} else {
554+
klog.Infof("The proposal controller is disabled.")
555+
}
556+
528557
resultChannelCount++
529558
go func() {
530559
defer utilruntime.HandleCrash()
@@ -595,6 +624,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
595624
optr.availableUpdatesQueue.ShutDown()
596625
optr.upgradeableQueue.ShutDown()
597626
optr.configuration.Queue().ShutDown()
627+
optr.proposalController.Queue().ShutDown()
598628
}
599629
}
600630

@@ -1191,3 +1221,8 @@ func (optr *Operator) shouldReconcileAcceptRisks() bool {
11911221
// HyperShift will be supported later if needed
11921222
return optr.enabledCVOFeatureGates.AcceptRisks() && !optr.hypershift
11931223
}
1224+
1225+
// shouldEnableProposalController returns whether the CVO should enable the proposal controller
1226+
func (optr *Operator) shouldEnableProposalController() bool {
1227+
return optr.enabledCVOFeatureGates.Proposal()
1228+
}

pkg/cvo/status_test.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func TestOperator_syncFailingStatus(t *testing.T) {
108108
Desired: configv1.Release{
109109
Version: "4.0.1",
110110
Image: "image/image:v4.0.1",
111-
URL: configv1.URL("https://example.com/v4.0.1"),
111+
URL: "https://example.com/v4.0.1",
112112
},
113113
VersionHash: "",
114114
Conditions: []configv1.ClusterOperatorStatusCondition{
@@ -149,7 +149,7 @@ func TestOperator_syncFailingStatus(t *testing.T) {
149149
Desired: configv1.Release{
150150
Version: "4.0.1",
151151
Image: "image/image:v4.0.1",
152-
URL: configv1.URL("https://example.com/v4.0.1"),
152+
URL: "https://example.com/v4.0.1",
153153
},
154154
VersionHash: "",
155155
Conditions: []configv1.ClusterOperatorStatusCondition{
@@ -206,6 +206,7 @@ type fakeRiFlags struct {
206206
statusReleaseArchitecture bool
207207
cvoConfiguration bool
208208
acceptRisks bool
209+
proposal bool
209210
}
210211

211212
func (f fakeRiFlags) DesiredVersion() string {
@@ -228,6 +229,10 @@ func (f fakeRiFlags) AcceptRisks() bool {
228229
return f.acceptRisks
229230
}
230231

232+
func (f fakeRiFlags) Proposal() bool {
233+
return f.proposal
234+
}
235+
231236
func TestUpdateClusterVersionStatus_FilteringMultipleErrorsForFailingCondition(t *testing.T) {
232237
ignoreLastTransitionTime := cmpopts.IgnoreFields(configv1.ClusterOperatorStatusCondition{}, "LastTransitionTime")
233238
type args struct {
@@ -998,12 +1003,12 @@ func Test_conditionalUpdateWithRiskNamesAndRiskConditions(t *testing.T) {
9981003
Risks: []configv1.ConditionalUpdateRisk{
9991004
{
10001005
Name: "Risk1", Conditions: []metav1.Condition{{
1001-
Type: "Applies",
1002-
Status: metav1.ConditionUnknown,
1003-
Reason: "InternalErrorFoundNoRiskCondition",
1004-
Message: "failed to find risk condition for risk Risk1",
1005-
},
1006-
}},
1006+
Type: "Applies",
1007+
Status: metav1.ConditionUnknown,
1008+
Reason: "InternalErrorFoundNoRiskCondition",
1009+
Message: "failed to find risk condition for risk Risk1",
1010+
},
1011+
}},
10071012
},
10081013
}},
10091014
},
@@ -1084,7 +1089,7 @@ func Test_conditionalUpdateWithRiskNamesAndRiskConditions(t *testing.T) {
10841089
desiredImage: "not-important",
10851090
availableUpdates: &availableUpdates{
10861091
RiskConditions: map[string][]metav1.Condition{
1087-
"TestAlert": []metav1.Condition{{
1092+
"TestAlert": {{
10881093
Type: "Applies",
10891094
Status: "True",
10901095
Reason: "Alert:firing",

pkg/featuregates/featuregates.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type CvoGateChecker interface {
4545

4646
// AcceptRisks controls whether the CVO reconciles spec.desiredUpdate.acceptRisks.
4747
AcceptRisks() bool
48+
49+
// Proposal controls whether the CVO makes proposals for updates.
50+
Proposal() bool
4851
}
4952

5053
// CvoGates contains flags that control CVO functionality gated by product feature gates. The
@@ -60,6 +63,7 @@ type CvoGates struct {
6063
statusReleaseArchitecture bool
6164
cvoConfiguration bool
6265
acceptRisks bool
66+
proposal bool
6367
}
6468

6569
func (c CvoGates) DesiredVersion() string {
@@ -82,6 +86,10 @@ func (c CvoGates) AcceptRisks() bool {
8286
return c.acceptRisks
8387
}
8488

89+
func (c CvoGates) Proposal() bool {
90+
return c.proposal
91+
}
92+
8593
// DefaultCvoGates apply when actual features for given version are unknown
8694
func DefaultCvoGates(version string) CvoGates {
8795
return CvoGates{
@@ -90,6 +98,7 @@ func DefaultCvoGates(version string) CvoGates {
9098
statusReleaseArchitecture: false,
9199
cvoConfiguration: false,
92100
acceptRisks: false,
101+
proposal: false,
93102
}
94103
}
95104

@@ -98,6 +107,9 @@ func DefaultCvoGates(version string) CvoGates {
98107
func CvoGatesFromFeatureGate(gate *configv1.FeatureGate, version string) CvoGates {
99108
enabledGates := DefaultCvoGates(version)
100109

110+
// We do not have a specific gate for the Proposal feature and use the TechPreviewNoUpgrade instead
111+
enabledGates.proposal = gate.Spec.FeatureSet == configv1.TechPreviewNoUpgrade
112+
101113
for _, g := range gate.Status.FeatureGates {
102114

103115
if g.Version != version {

pkg/proposal/controller.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package proposal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
9+
crcontrollerutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
10+
11+
corev1 "k8s.io/api/core/v1"
12+
kapierrors "k8s.io/apimachinery/pkg/api/errors"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
15+
"k8s.io/client-go/util/workqueue"
16+
"k8s.io/klog/v2"
17+
18+
configv1 "github.com/openshift/api/config/v1"
19+
20+
i "github.com/openshift/cluster-version-operator/pkg/internal"
21+
proposalv1alpha1 "github.com/openshift/cluster-version-operator/pkg/proposal/api/v1alpha1"
22+
)
23+
24+
type Controller struct {
25+
queueKey string
26+
queue workqueue.TypedRateLimitingInterface[any]
27+
updatesGetterFunc UpdatesGetterFunc
28+
client ctrlruntimeclient.Client
29+
}
30+
31+
const controllerName = "proposal-lifecycle-controller"
32+
33+
type UpdatesGetterFunc func() ([]configv1.Release, []configv1.ConditionalUpdate, error)
34+
35+
// NewController returns Controller to manage Proposals.
36+
func NewController(updatesGetterFunc UpdatesGetterFunc, client ctrlruntimeclient.Client) *Controller {
37+
return &Controller{
38+
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", controllerName),
39+
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
40+
workqueue.DefaultTypedControllerRateLimiter[any](),
41+
workqueue.TypedRateLimitingQueueConfig[any]{Name: controllerName}),
42+
updatesGetterFunc: updatesGetterFunc,
43+
client: client,
44+
}
45+
}
46+
47+
func (c *Controller) Queue() workqueue.TypedRateLimitingInterface[any] {
48+
return c.queue
49+
}
50+
51+
func (c *Controller) QueueKey() string {
52+
return c.queueKey
53+
}
54+
55+
// Sync ensures:
56+
// TODO:
57+
func (c *Controller) Sync(ctx context.Context, key string) error {
58+
startTime := time.Now()
59+
klog.V(i.Normal).Infof("Started syncing CVO configuration %q", key)
60+
defer func() {
61+
klog.V(i.Normal).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
62+
}()
63+
64+
updates, conditionalUpdates, err := c.updatesGetterFunc()
65+
if err != nil {
66+
klog.Errorf("Error getting available updates: %v", err)
67+
return err
68+
}
69+
klog.V(i.Debug).Infof("Got available updates: %#v", updates)
70+
klog.V(i.Debug).Infof("Got conditional updates: %#v", conditionalUpdates)
71+
72+
proposals := getProposals(updates, conditionalUpdates)
73+
74+
var errs []error
75+
for _, proposal := range proposals {
76+
err := ensureProposal(ctx, proposal, c.client)
77+
if err != nil {
78+
errs = append(errs, err)
79+
}
80+
}
81+
82+
return kutilerrors.NewAggregate(errs)
83+
}
84+
85+
// TODO: make it real
86+
func getProposals(_ []configv1.Release, _ []configv1.ConditionalUpdate) []*proposalv1alpha1.Proposal {
87+
return []*proposalv1alpha1.Proposal{
88+
{
89+
ObjectMeta: metav1.ObjectMeta{
90+
Name: "test-proposal",
91+
Namespace: i.DefaultCVONamespace,
92+
},
93+
// Feed required fields only to pass API server validation
94+
// The workflow does not exist but that should cause no trouble because no controller watches it before lightspeed-operator is installed on the cluster
95+
Spec: proposalv1alpha1.ProposalSpec{
96+
Request: "some-request",
97+
WorkflowRef: corev1.LocalObjectReference{
98+
Name: "ota-advisory",
99+
},
100+
},
101+
},
102+
}
103+
}
104+
105+
func ensureProposal(ctx context.Context, proposal *proposalv1alpha1.Proposal, client ctrlruntimeclient.Client) error {
106+
p, mutateFn := cvoProposal(proposal)
107+
return upsertObject(ctx, client, p, mutateFn)
108+
}
109+
110+
func cvoProposal(proposal *proposalv1alpha1.Proposal) (*proposalv1alpha1.Proposal, crcontrollerutil.MutateFn) {
111+
p := &proposalv1alpha1.Proposal{
112+
ObjectMeta: metav1.ObjectMeta{
113+
Namespace: proposal.Namespace,
114+
Name: proposal.Name,
115+
},
116+
}
117+
return p, func() error {
118+
if len(proposal.Annotations) > 0 {
119+
p.Annotations = make(map[string]string, len(proposal.Annotations))
120+
for k, v := range proposal.Annotations {
121+
p.Annotations[k] = v
122+
}
123+
}
124+
if len(proposal.Labels) > 0 {
125+
p.Labels = make(map[string]string, len(proposal.Labels))
126+
for k, v := range proposal.Labels {
127+
p.Labels[k] = v
128+
}
129+
}
130+
p.Spec = *proposal.Spec.DeepCopy()
131+
return nil
132+
}
133+
}
134+
135+
func upsertObject(ctx context.Context, c ctrlruntimeclient.Client, obj ctrlruntimeclient.Object, mutateFn crcontrollerutil.MutateFn) error {
136+
result, err := crcontrollerutil.CreateOrUpdate(ctx, c, obj, mutateFn)
137+
if err != nil && !kapierrors.IsConflict(err) {
138+
klog.V(i.Normal).Infof("Upsert object (%s/%s) of type %s failed with result %s: %v", obj.GetNamespace(), obj.GetName(), fmt.Sprintf("%T", obj), result, err)
139+
} else if result != crcontrollerutil.OperationResultNone {
140+
klog.V(i.Normal).Infof("Upsert object (%s/%s) of type %s succeeded with result %s", obj.GetNamespace(), obj.GetName(), fmt.Sprintf("%T", obj), result)
141+
}
142+
return err
143+
}

0 commit comments

Comments
 (0)