Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 77 additions & 23 deletions internal/controller/numaresourcesoperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -28,9 +29,11 @@ import (
apiextensionv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -139,6 +142,14 @@ func (r *NUMAResourcesOperatorReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{}, err
}

initialStatus := *instance.Status.DeepCopy()
if len(initialStatus.Conditions) == 0 {
instance.Status.Conditions = status.NewNUMAResourcesOperatorConditions()
} else {
// on upgrade, backfill conditions added in newer versions
instance.Status.Conditions = status.EnsureNUMAResourcesOperatorConditions(instance.Status.Conditions)
}

if req.Name != objectnames.DefaultNUMAResourcesOperatorCrName {
err := fmt.Errorf("incorrect NUMAResourcesOperator resource name: %s", instance.Name)
return r.degradeStatus(ctx, instance, status.ConditionTypeIncorrectNUMAResourcesOperatorResourceName, err)
Expand Down Expand Up @@ -202,6 +213,11 @@ func updateStatusConditionsIfNeeded(instance *nropv1.NUMAResourcesOperator, cond
ObservedGeneration: instance.Generation,
}, time.Now())
if ok {
for _, c := range instance.Status.Conditions {
if status.FindCondition(conditions, c.Type) == nil {
conditions = append(conditions, c)
}
}
instance.Status.Conditions = conditions
}
}
Expand Down Expand Up @@ -247,7 +263,7 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResourceAPI(ctx context.Conte
func (r *NUMAResourcesOperatorReconciler) reconcileResourceMachineConfig(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) intreconcile.Step {
// we need to sync machine configs first and wait for the MachineConfigPool updates
// before checking additional components for updates
mcpUpdatedFunc, err := r.syncMachineConfigs(ctx, instance, existing, trees)
waitByPool, pausedMCPNames, err := r.syncMachineConfigs(ctx, instance, existing, trees)
if err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "FailedMCSync", "Failed to set up machine configuration for worker nodes: %v", err)
err = fmt.Errorf("failed to sync machine configs: %w", err)
Expand All @@ -257,8 +273,9 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResourceMachineConfig(ctx con

// MCO needs to update the SELinux context removal and other stuff, and need to trigger a reboot.
// It can take a while.
mcpStatuses, mcpNamePending := syncMachineConfigPoolsStatuses(instance.Name, trees, r.ForwardMCPConds, mcpUpdatedFunc)
mcpStatuses, mcpNamePending := syncMachineConfigPoolsStatuses(instance.Name, trees, r.ForwardMCPConds, waitByPool)
instance.Status.MachineConfigPools = mcpStatuses
instance.Status.Conditions = updateMachineConfigPoolPausedCondition(instance.Status.Conditions, instance.Generation, pausedMCPNames)

if mcpNamePending != "" {
// the Machine Config Pool still did not apply the machine config, wait for one minute
Expand Down Expand Up @@ -305,7 +322,9 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResource(ctx context.Context,
existing := rtestate.FromClient(ctx, r.Client, r.Platform, r.RTEManifests, instance, trees, r.Namespace)

if r.Platform == platform.OpenShift {
if step := r.reconcileResourceMachineConfig(ctx, instance, existing, trees); step.EarlyStop() {
var step intreconcile.Step
step = r.reconcileResourceMachineConfig(ctx, instance, existing, trees)
if step.EarlyStop() {
updateStatusConditionsIfNeeded(instance, step.ConditionInfo)
return step
}
Expand Down Expand Up @@ -408,7 +427,7 @@ func (r *NUMAResourcesOperatorReconciler) syncNodeResourceTopologyAPI(ctx contex
return (updatedCount == len(objStates)), err
}

func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) (rtestate.MCPWaitForUpdatedFunc, error) {
func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) (map[string]rtestate.MCPWaitForUpdatedFunc, sets.Set[string], error) {
klog.V(4).InfoS("Machine Config Sync start", "trees", len(trees))
defer klog.V(4).Info("Machine Config Sync stop")

Expand All @@ -418,8 +437,11 @@ func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context
// In case of operator upgrade from 4.1X → 4.18, it's necessary to remove the old MachineConfig,
// unless an emergency annotation is provided which forces the operator to use custom policy

objStates, waitFunc := existing.MachineConfigsState(r.RTEManifests)
for _, objState := range objStates {
mcObjStates, pausedMCPNames := existing.MachineConfigsState(r.RTEManifests)
waitByPool := make(map[string]rtestate.MCPWaitForUpdatedFunc, len(mcObjStates))
for _, mcObjState := range mcObjStates {
waitByPool[mcObjState.PoolName] = mcObjState.WaitForUpdated
objState := mcObjState.ObjectState
klog.InfoS("objState", "desired", objState.Desired, "existing", objState.Existing, "createOrUpdate", objState.IsCreateOrUpdate())
if objState.IsCreateOrUpdate() {
if err2 := controllerutil.SetControllerReference(instance, objState.Desired, r.Scheme); err2 != nil {
Expand All @@ -438,19 +460,24 @@ func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context
break
}
}
return waitFunc, err
return waitByPool, pausedMCPNames, err
}

func syncMachineConfigPoolsStatuses(instanceName string, trees []nodegroupv1.Tree, forwardMCPConds bool, updatedFunc rtestate.MCPWaitForUpdatedFunc) ([]nropv1.MachineConfigPool, string) {
klog.V(4).InfoS("Machine Config Status Sync start", "trees", len(trees))
defer klog.V(4).Info("Machine Config Status Sync stop")
func syncMachineConfigPoolsStatuses(instanceName string, trees []nodegroupv1.Tree, forwardMCPConds bool, waitByPool map[string]rtestate.MCPWaitForUpdatedFunc) ([]nropv1.MachineConfigPool, string) {
klog.V(4).InfoS("Machine Config Pools Status Sync start", "trees", len(trees))
defer klog.V(4).Info("Machine Config Pools Status Sync stop")

mcpStatuses := []nropv1.MachineConfigPool{}
for _, tree := range trees {
for _, mcp := range tree.MachineConfigPools {
mcpStatuses = append(mcpStatuses, extractMCPStatus(mcp, forwardMCPConds))

isUpdated := updatedFunc(instanceName, mcp)
waitFunc, ok := waitByPool[mcp.Name]
if !ok {
continue
}

isUpdated := waitFunc(instanceName, mcp)
klog.V(5).InfoS("Machine Config Pool state", "name", mcp.Name, "instance", instanceName, "updated", isUpdated)

if !isUpdated {
Expand Down Expand Up @@ -615,7 +642,8 @@ func (r *NUMAResourcesOperatorReconciler) SetupWithManager(mgr ctrl.Manager) err
return !reflect.DeepEqual(mcpOld.Status.Conditions, mcpNew.Status.Conditions) ||
!apiequality.Semantic.DeepEqual(mcpOld.Labels, mcpNew.Labels) ||
!apiequality.Semantic.DeepEqual(mcpOld.Spec.MachineConfigSelector, mcpNew.Spec.MachineConfigSelector) ||
!apiequality.Semantic.DeepEqual(mcpOld.Spec.NodeSelector, mcpNew.Spec.NodeSelector)
!apiequality.Semantic.DeepEqual(mcpOld.Spec.NodeSelector, mcpNew.Spec.NodeSelector) ||
mcpOld.Spec.Paused != mcpNew.Spec.Paused
},
}

Expand Down Expand Up @@ -671,29 +699,35 @@ func (r *NUMAResourcesOperatorReconciler) mcpToNUMAResourceOperator(ctx context.
nro := &nros.Items[i]
mcpLabels := labels.Set(mcp.Labels)
for _, nodeGroup := range nro.Spec.NodeGroups {
if nodeGroup.MachineConfigPoolSelector == nil {
continue
}

nodeGroupSelector, err := metav1.LabelSelectorAsSelector(nodeGroup.MachineConfigPoolSelector)
if err != nil {
klog.Errorf("failed to parse the selector %v", mcp.Spec.NodeSelector)
return nil
}

if nodeGroupSelector.Matches(mcpLabels) {
if nodeGroupMatchesMCP(nodeGroup, mcp.Name, mcpLabels) {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Name: nro.Name,
},
})
break
}
}
}

return requests
}

func nodeGroupMatchesMCP(nodeGroup nropv1.NodeGroup, mcpName string, mcpLabels labels.Set) bool {
if nodeGroup.PoolName != nil {
return mcpName == *nodeGroup.PoolName
}
if nodeGroup.MachineConfigPoolSelector == nil {
return false
}
selector, err := metav1.LabelSelectorAsSelector(nodeGroup.MachineConfigPoolSelector)
if err != nil {
klog.Errorf("failed to parse the selector %v", nodeGroup.MachineConfigPoolSelector)
return false
}
return selector.Matches(mcpLabels)
}

func (r *NUMAResourcesOperatorReconciler) configMapToNUMAResourceOperator(ctx context.Context, cmObj client.Object) []reconcile.Request {
cm := &corev1.ConfigMap{}

Expand Down Expand Up @@ -806,6 +840,26 @@ func isDaemonSetReady(ds *appsv1.DaemonSet) bool {
return ds.Status.DesiredNumberScheduled > 0 && ds.Status.DesiredNumberScheduled == ds.Status.NumberReady
}

func updateMachineConfigPoolPausedCondition(conditions []metav1.Condition, generation int64, pausedMCPNames sets.Set[string]) []metav1.Condition {
pausedStatus := metav1.ConditionFalse
message := ""
if pausedMCPNames.Len() > 0 {
klog.InfoS("detected paused MCPs", "pausedMCPs", pausedMCPNames)
pausedStatus = metav1.ConditionTrue
message = "detected paused MCPs: " + strings.Join(pausedMCPNames.UnsortedList(), ", ")
}
condition := metav1.Condition{
Type: status.ConditionMachineConfigPoolPaused,
Status: pausedStatus,
Reason: status.ConditionMachineConfigPoolPaused,
Message: message,
ObservedGeneration: generation,
LastTransitionTime: metav1.Now(),
}
apimeta.SetStatusCondition(&conditions, condition)
return conditions
}

func getTreesByNodeGroup(ctx context.Context, cli client.Client, nodeGroups []nropv1.NodeGroup, platf platform.Platform) ([]nodegroupv1.Tree, error) {
switch platf {
case platform.OpenShift:
Expand Down
128 changes: 123 additions & 5 deletions internal/controller/numaresourcesoperator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2125,12 +2125,12 @@ var _ = Describe("Test NUMAResourcesOperator Reconcile", func() {
reconciler = checkSELinuxPolicyProcessing(ctx, nro, mcp1, mcp2)

By("upgrading from 4.1X to 4.18")
Expect(reconciler.Client.Get(context.TODO(), client.ObjectKeyFromObject(nro), nro)).To(Succeed())
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(nro), nro)).To(Succeed())
clearSELinuxPolicyCustomAnnotations(nro)
Expect(reconciler.Client.Update(context.TODO(), nro)).To(Succeed())
})

It("should delete existing mc", func() {
It("should delete existing mc", func(ctx context.Context) {
key := client.ObjectKeyFromObject(nro)
// removing the annotation will trigger reboot which requires resync after 1 min
Expect(reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})).To(CauseRequeue())
Expand All @@ -2139,15 +2139,121 @@ var _ = Describe("Test NUMAResourcesOperator Reconcile", func() {
Name: objectnames.GetMachineConfigName(nro.Name, mcp1.Name),
}
mc := &machineconfigv1.MachineConfig{}
err := reconciler.Client.Get(context.TODO(), mc1Key, mc)
err := reconciler.Client.Get(ctx, mc1Key, mc)
Expect(apierrors.IsNotFound(err)).To(BeTrue(), "MachineConfig %s expected to be deleted; err=%v", mc1Key.Name, err)

mc2Key := client.ObjectKey{
Name: objectnames.GetMachineConfigName(nro.Name, mcp2.Name),
}
err = reconciler.Client.Get(context.TODO(), mc2Key, mc)
err = reconciler.Client.Get(ctx, mc2Key, mc)
Expect(apierrors.IsNotFound(err)).To(BeTrue(), "MachineConfig %s expected to be deleted; err=%v", mc2Key.Name, err)
})

It("should converge with mixed policy: one pool custom, one pool default", func(ctx context.Context) {
nro.Spec.NodeGroups[0].Annotations[annotations.SELinuxPolicyConfigAnnotation] = annotations.SELinuxPolicyCustom
// ng2 has no custom annotation -> default policy (MC deletion)

var err error
reconciler, err = NewFakeNUMAResourcesOperatorReconciler(platform.OpenShift, defaultOCPVersion, nro, mcp1, mcp2)
Expect(err).ToNot(HaveOccurred())

key := client.ObjectKeyFromObject(nro)

By("First reconcile: MC created for pool1, MC deleted for pool2, waiting for MCO")
Expect(reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})).To(CauseRequeue())

By("Make pool1 ready with MC present (custom policy)")
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(mcp1), mcp1)).To(Succeed())
ensureMCPIsReady(mcp1, nro.Name)
Expect(reconciler.Client.Update(ctx, mcp1)).To(Succeed())

By("Make pool2 ready after MC deletion (default policy)")
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(mcp2), mcp2)).To(Succeed())
ensureMCPIsReadyAfterMCDeletion(mcp2)
Expect(reconciler.Client.Update(ctx, mcp2)).To(Succeed())

By("Second reconcile: should converge")
Expect(reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})).ToNot(CauseRequeue())

By("Verify DaemonSets for both pools exist")
ds := &appsv1.DaemonSet{}
mcp1DSKey := client.ObjectKey{
Name: objectnames.GetComponentName(nro.Name, mcp1.Name),
Namespace: testNamespace,
}
Expect(reconciler.Client.Get(ctx, mcp1DSKey, ds)).To(Succeed())

mcp2DSKey := client.ObjectKey{
Name: objectnames.GetComponentName(nro.Name, mcp2.Name),
Namespace: testNamespace,
}
Expect(reconciler.Client.Get(ctx, mcp2DSKey, ds)).To(Succeed())

By("Verify NRO status is Available with no paused condition")
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(nro), nro)).To(Succeed())
Expect(nro).To(BeInCondition(status.ConditionAvailable))

pausedCond := getConditionByType(nro.Status.Conditions, status.ConditionMachineConfigPoolPaused)
Expect(pausedCond).ToNot(BeNil())
Expect(pausedCond.Status).To(Equal(metav1.ConditionFalse))
})
It("should converge when one pool is paused", func(ctx context.Context) {
// neither pool has custom annotation -> both in MC deletion path
mcp2.Spec.Paused = true

var err error
reconciler, err = NewFakeNUMAResourcesOperatorReconciler(platform.OpenShift, defaultOCPVersion, nro, mcp1, mcp2)
Expect(err).ToNot(HaveOccurred())

key := client.ObjectKeyFromObject(nro)

By("Reconcile: pool1 converges (default policy, no MC to wait for), pool2 skipped (paused)")
Expect(reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})).ToNot(CauseRequeue())

By("Verify DaemonSets for both pools exist")
ds := &appsv1.DaemonSet{}
mcp1DSKey := client.ObjectKey{
Name: objectnames.GetComponentName(nro.Name, mcp1.Name),
Namespace: testNamespace,
}
Expect(reconciler.Client.Get(ctx, mcp1DSKey, ds)).To(Succeed())

mcp2DSKey := client.ObjectKey{
Name: objectnames.GetComponentName(nro.Name, mcp2.Name),
Namespace: testNamespace,
}
Expect(reconciler.Client.Get(ctx, mcp2DSKey, ds)).To(Succeed())

By("Verify NRO status is Available with paused condition")
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(nro), nro)).To(Succeed())
Expect(nro).To(BeInCondition(status.ConditionAvailable))

pausedCond := getConditionByType(nro.Status.Conditions, status.ConditionMachineConfigPoolPaused)
Expect(pausedCond).ToNot(BeNil())
Expect(pausedCond.Status).To(Equal(metav1.ConditionTrue))
})

It("should report paused condition while Progressing", func(ctx context.Context) {
// pool1 has custom policy -> MC created, needs MCO rollout
// pool2 is paused -> skipped
nro.Spec.NodeGroups[0].Annotations[annotations.SELinuxPolicyConfigAnnotation] = annotations.SELinuxPolicyCustom
mcp2.Spec.Paused = true

var err error
reconciler, err = NewFakeNUMAResourcesOperatorReconciler(platform.OpenShift, defaultOCPVersion, nro, mcp1, mcp2)
Expect(err).ToNot(HaveOccurred())

key := client.ObjectKeyFromObject(nro)

By("First reconcile: MC created for pool1, pool2 skipped (paused), waiting for MCO on pool1")
Expect(reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})).To(CauseRequeue())

By("Verify paused condition is set even while Progressing")
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(nro), nro)).To(Succeed())
pausedCond := getConditionByType(nro.Status.Conditions, status.ConditionMachineConfigPoolPaused)
Expect(pausedCond).ToNot(BeNil(), "paused condition should be set during Progressing phase")
Expect(pausedCond.Status).To(Equal(metav1.ConditionTrue))
})
})
})

Expand Down Expand Up @@ -2280,8 +2386,10 @@ func checkSELinuxPolicyProcessing(ctx context.Context, nro *nropv1.NUMAResources
ensureMCPIsReady(mcp1, nro.Name)
Expect(reconciler.Client.Update(ctx, mcp1)).To(Succeed())

// Node group 2 has no custom SELinux policy: wait logic expects our MachineConfig to be *absent* from
// the pool, so do not put it in Configuration.Source (unlike the pool that has custom policy).
Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(mcp2), mcp2)).To(Succeed())
ensureMCPIsReady(mcp2, nro.Name)
ensureMCPIsReadyAfterMCDeletion(mcp2)
Expect(reconciler.Client.Update(ctx, mcp2)).To(Succeed())

// triggering a second reconcile will create the RTEs and fully update the statuses making the operator in Available condition -> no more reconciliation needed thus the result is clean
Expand Down Expand Up @@ -2396,6 +2504,16 @@ func ensureMCPIsReady(mcp *machineconfigv1.MachineConfigPool, nroName string) {
}
}

func ensureMCPIsReadyAfterMCDeletion(mcp *machineconfigv1.MachineConfigPool) {
mcp.Status.Configuration.Source = nil
mcp.Status.Conditions = []machineconfigv1.MachineConfigPoolCondition{
{
Type: machineconfigv1.MachineConfigPoolUpdated,
Status: corev1.ConditionTrue,
},
}
}

func BeStringPercentageAtLeast(amount int) gomegatypes.GomegaMatcher {
return gcustom.MakeMatcher(func(val string) (bool, error) {
if !strings.HasSuffix(val, "%") {
Expand Down
Loading