Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ verify-%:
make $*
./hack/verify-diff.sh

verify: fmt lint verify-ocp-manifests ## Run formatting and linting checks
verify: generate fmt lint verify-ocp-manifests ## Run formatting and linting checks

test: verify unit ## Run verification and unit tests

build: bin/capi-operator bin/capi-installer bin/capi-controllers bin/machine-api-migration bin/crd-compatibility-checker bin/cluster-capi-operator-tests-ext manifests-gen ## Build all binaries
.PHONY: generate
generate:
go generate ./...

build: generate bin/capi-operator bin/capi-installer bin/capi-controllers bin/machine-api-migration bin/crd-compatibility-checker bin/cluster-capi-operator-tests-ext manifests-gen ## Build all binaries

clean:
rm -rf bin/*
Expand Down
20 changes: 10 additions & 10 deletions cmd/capi-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ import (
)

var (
errPodIdentityNotSet = errors.New("POD_NAME and POD_NAMESPACE must be set")
errContainerNotInPod = errors.New("container not found in pod spec")
errPodIdentityNotSet = errors.New("POD_NAME and POD_NAMESPACE must be set")
errContainerNotInPod = errors.New("container not found in pod spec")
errInfrastructurePlatformStatusNotSet = errors.New("infrastructure platform status is not set")
)

const (
Expand Down Expand Up @@ -106,22 +107,21 @@ func setupControllers(ctx context.Context, log logr.Logger, mgr ctrl.Manager, op
return fmt.Errorf("unable to get infrastructure: %w", err)
}

platform, err := util.GetPlatformFromInfra(infra)
if err != nil {
return fmt.Errorf("unable to get platform: %w", err)
}

featureGates, err := util.GetFeatureGates(ctx, log, managerName, mgr.GetConfig(), cancel)
if err != nil {
return fmt.Errorf("unable to get feature gates: %w", err)
}

if infra.Status.PlatformStatus == nil {
return errInfrastructurePlatformStatusNotSet
}

supportedPlatform := util.IsCAPIEnabledForPlatform(featureGates, infra.Status.PlatformStatus.Type)

if err := (&clusteroperator.ClusterOperatorController{
ClusterOperatorStatusClient: operatorConfig.GetClusterOperatorStatusClient(mgr, platform, "clusteroperator"),
Scheme: mgr.GetScheme(),
IsUnsupportedPlatform: !supportedPlatform,
Client: mgr.GetClient(),
ReleaseVersion: util.GetReleaseVersion(),
IsUnsupportedPlatform: !supportedPlatform,
}).SetupWithManager(mgr); err != nil {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return fmt.Errorf("unable to create clusteroperator controller: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tool (
github.com/openshift/api/machine/v1beta1/zz_generated.crd-manifests
github.com/openshift/api/operator/v1/zz_generated.crd-manifests
github.com/openshift/api/operator/v1alpha1/zz_generated.crd-manifests
golang.org/x/tools/cmd/stringer
sigs.k8s.io/controller-runtime/tools/setup-envtest
)

Expand Down
232 changes: 212 additions & 20 deletions pkg/controllers/clusteroperator/clusteroperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,250 @@ package clusteroperator
import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"slices"
"strings"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"

configv1 "github.com/openshift/api/config/v1"
configv1apply "github.com/openshift/client-go/config/applyconfigurations/config/v1"
"github.com/openshift/cluster-capi-operator/pkg/controllers"
"github.com/openshift/cluster-capi-operator/pkg/controllers/installer"
"github.com/openshift/cluster-capi-operator/pkg/controllers/revision"
"github.com/openshift/cluster-capi-operator/pkg/operatorstatus"
"github.com/openshift/cluster-capi-operator/pkg/util"
)

const (
capiUnsupportedPlatformMsg = "Cluster API is not yet implemented on this platform"
controllerName = "ClusterOperatorController"
)

// ClusterOperatorController watches and keeps the cluster-api ClusterObject up to date.
// ClusterOperatorController watches the cluster-api ClusterOperator and
// aggregates per-controller sub-conditions into top-level conditions.
type ClusterOperatorController struct {
operatorstatus.ClusterOperatorStatusClient
Scheme *runtime.Scheme
client.Client
ReleaseVersion string
IsUnsupportedPlatform bool
}

// Reconcile reconciles the cluster-api ClusterOperator object.
func (r *ClusterOperatorController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *ClusterOperatorController) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx).WithName(controllerName)
log.Info(fmt.Sprintf("Reconciling %q ClusterObject", controllers.ClusterOperatorName))

message := func() string {
if r.IsUnsupportedPlatform {
return capiUnsupportedPlatformMsg
}
co := &configv1.ClusterOperator{}
if err := r.Get(ctx, client.ObjectKey{Name: controllers.ClusterOperatorName}, co); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get ClusterOperator: %w", err)
}

log.Info("Reconciling ClusterOperator aggregation")

return ""
}()
var conditions []*configv1apply.ClusterOperatorStatusConditionApplyConfiguration

// TODO: wrap this into status aggregation logic to get these conditions to
// represent the meaningful aggregation of all other controller statuses.
// We should also only update the version after the aggregator confirms
// all controllers have succeeded in the new version.
if err := r.SetStatusAvailable(ctx, message, operatorstatus.WithVersions(r.OperandVersions())); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to set conditions for %q ClusterObject: %w", controllers.ClusterOperatorName, err)
if r.IsUnsupportedPlatform {
conditions = r.unsupportedPlatformStatus()
} else {
conditions = r.aggregatedStatus(co.Status.Conditions)
}

// Merge new conditions with existing conditions and patch if changes are required.
conditionsChanged := operatorstatus.MergeConditions(conditions, co.Status.Conditions)
versionChanged := r.IsUnsupportedPlatform &&
currentOperatorVersion(co.Status.Versions, operatorstatus.OperatorVersionKey) != r.ReleaseVersion

if conditionsChanged || versionChanged {
if err := r.writeStatus(ctx, co, conditions); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

// currentOperatorVersion returns the version string for the given key in the
// ClusterOperator's status versions list, or an empty string if not found.
func currentOperatorVersion(versions []configv1.OperandVersion, name string) string {
for i := range versions {
if versions[i].Name == name {
return versions[i].Version
}
}

return ""
}

func (r *ClusterOperatorController) writeStatus(ctx context.Context, co *configv1.ClusterOperator, conditions []*configv1apply.ClusterOperatorStatusConditionApplyConfiguration) error {
applyConfig := configv1apply.ClusterOperator(controllers.ClusterOperatorName).
WithUID(co.UID).
WithStatus(configv1apply.ClusterOperatorStatus().
WithConditions(conditions...),
)

// We don't run the revision controller on unsupported platforms, so we must
// write the release version here.
if r.IsUnsupportedPlatform {
applyConfig.Status = applyConfig.Status.WithVersions(
configv1apply.OperandVersion().
WithName(operatorstatus.OperatorVersionKey).
WithVersion(r.ReleaseVersion))
}

if err := r.Status().Patch(ctx, co, util.ApplyConfigPatch(applyConfig),
operatorstatus.CAPIFieldOwner(controllerName), client.ForceOwnership); err != nil {
return fmt.Errorf("failed to write ClusterOperator status: %w", err)
}

return nil
}

// unsupportedPlatformStatus sets a fixed status with Available=true,
// Progressing=false, Degraded=false, Upgradeable=true when running on an
// unsupported platform.
func (r *ClusterOperatorController) unsupportedPlatformStatus() []*configv1apply.ClusterOperatorStatusConditionApplyConfiguration {
return []*configv1apply.ClusterOperatorStatusConditionApplyConfiguration{
condition(configv1.OperatorAvailable, configv1.ConditionTrue, operatorstatus.ReasonAsExpected, capiUnsupportedPlatformMsg),
condition(configv1.OperatorProgressing, configv1.ConditionFalse, operatorstatus.ReasonAsExpected, ""),
condition(configv1.OperatorDegraded, configv1.ConditionFalse, operatorstatus.ReasonAsExpected, ""),
condition(configv1.OperatorUpgradeable, configv1.ConditionTrue, operatorstatus.ReasonAsExpected, ""),
}
}

type subcontrollerStatus struct {
controller operatorstatus.ControllerResultGenerator
available, progressing subcontrollerCondition
}

type subcontrollerCondition struct {
status configv1.ConditionStatus
reason operatorstatus.Reason
message string
}

func getSubcontrollerCondition(conditions []configv1.ClusterOperatorStatusCondition, condType configv1.ClusterStatusConditionType) subcontrollerCondition {
for i := range conditions {
if conditions[i].Type == condType {
return subcontrollerCondition{
status: conditions[i].Status,
reason: operatorstatus.ReasonFromString(conditions[i].Reason),
message: conditions[i].Message,
}
}
}

return subcontrollerCondition{
status: configv1.ConditionUnknown,
reason: operatorstatus.ReasonUninitialized,
message: "initializing",
}
}

func (r *ClusterOperatorController) aggregatedStatus(currentConditions []configv1.ClusterOperatorStatusCondition) []*configv1apply.ClusterOperatorStatusConditionApplyConfiguration {
newConditions := []*configv1apply.ClusterOperatorStatusConditionApplyConfiguration{
// The capi operator does not yet set the degraded condition. This will
// be added by automatically flagging a Progressing condition which
// lasts longer than some duration.
condition(configv1.OperatorDegraded, configv1.ConditionFalse, operatorstatus.ReasonAsExpected, ""),

// Nothing the capi operator currently does prevents upgradeability.
// This will be added when CRD compatibility is integrated with the
// installer and revision controllers.
condition(configv1.OperatorUpgradeable, configv1.ConditionTrue, operatorstatus.ReasonAsExpected, ""),
}

// Sub-controllers whose Progressing and Degraded conditions are aggregated
subControllers := []operatorstatus.ControllerResultGenerator{
installer.ResultGenerator,
revision.ResultGenerator,
// TBD as they are migrated:
// - corecluster
// - infracluster
// - secretsync
// - kubeconfig
}

subcontrollerStatuses := util.SliceMap(subControllers, func(subController operatorstatus.ControllerResultGenerator) subcontrollerStatus {
availableType := subController.SubConditionType(operatorstatus.ConditionAvailableSuffix)
progressingType := subController.SubConditionType(operatorstatus.ConditionProgressingSuffix)

return subcontrollerStatus{
controller: subController,
available: getSubcontrollerCondition(currentConditions, availableType),
progressing: getSubcontrollerCondition(currentConditions, progressingType),
}
})

isProgressing := slices.IndexFunc(subcontrollerStatuses, func(status subcontrollerStatus) bool {
return status.progressing.status == configv1.ConditionTrue || status.progressing.status == configv1.ConditionUnknown
}) >= 0
progressingReason, progressingMessage := aggregateReasonAndMessage(subcontrollerStatuses, func(s subcontrollerStatus) subcontrollerCondition {
return s.progressing
})

switch {
case isProgressing:
newConditions = append(newConditions, condition(configv1.OperatorProgressing, configv1.ConditionTrue, progressingReason, progressingMessage))
case progressingReason > operatorstatus.ReasonAsExpected:
newConditions = append(newConditions, condition(configv1.OperatorProgressing, configv1.ConditionFalse, progressingReason, progressingMessage))
default:
newConditions = append(newConditions, condition(configv1.OperatorProgressing, configv1.ConditionFalse, operatorstatus.ReasonAsExpected, ""))
}

notAvailable := slices.IndexFunc(subcontrollerStatuses, func(status subcontrollerStatus) bool {
return status.available.status != configv1.ConditionTrue
}) >= 0
availableReason, availableMessage := aggregateReasonAndMessage(subcontrollerStatuses, func(s subcontrollerStatus) subcontrollerCondition {
return s.available
})

if notAvailable {
newConditions = append(newConditions, condition(configv1.OperatorAvailable, configv1.ConditionFalse, availableReason, availableMessage))
} else {
newConditions = append(newConditions, condition(configv1.OperatorAvailable, configv1.ConditionTrue, operatorstatus.ReasonAsExpected, "Cluster API Operator is available"))
}

return newConditions
}

func aggregateReasonAndMessage(statuses []subcontrollerStatus, extract func(subcontrollerStatus) subcontrollerCondition) (operatorstatus.Reason, string) {
var maxReason operatorstatus.Reason

var parts []string

for _, s := range statuses {
cond := extract(s)
if cond.reason <= operatorstatus.ReasonAsExpected {
continue
}

if cond.reason > maxReason {
maxReason = cond.reason
}

if cond.message != "" {
parts = append(parts, fmt.Sprintf("%s: %s", s.controller, cond.message))
} else {
parts = append(parts, string(s.controller))
}
}

return maxReason, strings.Join(parts, "; ")
}

func condition(condType configv1.ClusterStatusConditionType, status configv1.ConditionStatus, reason operatorstatus.Reason, message string) *configv1apply.ClusterOperatorStatusConditionApplyConfiguration {
return configv1apply.ClusterOperatorStatusCondition().
WithType(condType).
WithStatus(status).
WithReason(reason.String()).
WithMessage(message)
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterOperatorController) SetupWithManager(mgr ctrl.Manager) error {
if err := ctrl.NewControllerManagedBy(mgr).
Named(controllerName).
For(&configv1.ClusterOperator{}, builder.WithPredicates(operatorstatus.ClusterOperatorOnceOnly())).
For(&configv1.ClusterOperator{}, builder.WithPredicates(operatorstatus.ClusterOperatorStatusChanged())).
Complete(r); err != nil {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return fmt.Errorf("failed to create controller: %w", err)
}
Expand Down
Loading