From 15207665c515d281fbf08a9dfe5b08fdebbb0467 Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Thu, 22 Jan 2026 17:03:35 +0200 Subject: [PATCH 1/5] ctrl: kubeletconfig: introduce reconcileErrorHandler This is a preliminary step to expand the scenarios where the reconciliation can return more possible errors where some are tolerable. Signed-off-by: Shereen Haj --- .../controller/kubeletconfig_controller.go | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/internal/controller/kubeletconfig_controller.go b/internal/controller/kubeletconfig_controller.go index d97c2e8860..72307344c1 100644 --- a/internal/controller/kubeletconfig_controller.go +++ b/internal/controller/kubeletconfig_controller.go @@ -87,6 +87,11 @@ type kubeletConfigHandler struct { setCtrlRef func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error } +type reconcileErrorHandler struct { + err error + tolerateError bool +} + // Namespace Scoped // Cluster Scoped @@ -116,18 +121,22 @@ func (r *KubeletConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques // KubeletConfig changes are expected to be sporadic, yet are important enough // to be made visible at kubernetes level. So we generate events to handle them - cm, err := r.reconcileConfigMap(ctx, instance, req.NamespacedName) - if err != nil { + cm, errHandler := r.reconcileConfigMap(ctx, instance, req.NamespacedName) + if errHandler.err != nil { var klErr *InvalidKubeletConfig - if errors.As(err, &klErr) { + if errors.As(errHandler.err, &klErr) { r.Recorder.Event(instance, "Normal", "ProcessSkip", "ignored kubelet config "+klErr.ObjectName) return ctrl.Result{}, nil } + if errHandler.tolerateError { + r.Recorder.Event(instance, "Normal", "ProcessSkip", errHandler.err.Error()) + return ctrl.Result{}, nil + } - klog.ErrorS(err, "failed to reconcile configmap", "controller", "kubeletconfig") + klog.ErrorS(errHandler.err, "failed to reconcile configmap", "controller", "kubeletconfig") r.Recorder.Event(instance, "Warning", "ProcessFailed", "Failed to update RTE config from kubelet config "+req.NamespacedName.String()) - return ctrl.Result{}, err + return ctrl.Result{}, errHandler.err } r.Recorder.Event(instance, "Normal", "ProcessOK", fmt.Sprintf("Updated RTE config %s/%s from kubelet config %s", cm.Namespace, cm.Name, req.NamespacedName.String())) @@ -197,25 +206,26 @@ func (e *InvalidKubeletConfig) Unwrap() error { return e.Err } -func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*corev1.ConfigMap, error) { +func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*corev1.ConfigMap, reconcileErrorHandler) { // first check if the ConfigMap should be deleted // to save all the additional work related for create/update cm, deleted, err := r.deleteConfigMap(ctx, instance, kcKey) if deleted { - return cm, err + return cm, reconcileErrorHandler{err: err} } kcHandler, err := r.makeKCHandlerForPlatform(ctx, instance, kcKey) if err != nil { - return nil, err + return nil, reconcileErrorHandler{err: err} } kubeletConfig, err := kubeletconfig.MCOKubeletConfToKubeletConf(kcHandler.mcoKc) if err != nil { klog.ErrorS(err, "cannot extract KubeletConfiguration from MCO KubeletConfig", "name", kcKey.Name) - return nil, err + return nil, reconcileErrorHandler{err: err} } - return r.syncConfigMap(ctx, kubeletConfig, instance, kcHandler) + cm, err = r.syncConfigMap(ctx, kubeletConfig, instance, kcHandler) + return cm, reconcileErrorHandler{err: err} } func (r *KubeletConfigReconciler) syncConfigMap(ctx context.Context, kubeletConfig *kubeletconfigv1beta1.KubeletConfiguration, instance *nropv1.NUMAResourcesOperator, kcHandler *kubeletConfigHandler) (*corev1.ConfigMap, error) { From 72a8cac9adc8c4be053c4316892ca131a2cfe890 Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Fri, 23 Jan 2026 10:22:09 +0200 Subject: [PATCH 2/5] ctrl: kubeletConfig: handle paused MCPs Having a paused MCP should prevent updating the corresponding config map for the specified node group. So far, the code wasn't considering the case of paused MCPs, which lead to creating/updating the config map to the newest kubeletconfig CR updates,a thing that caused a mismatch between the configuration in the config map vs the one reflected on the NRTs. In this commit, we modify the kubeletconfig controller to handle paused MCPs such that it skips updating existing RTE config maps; and for new node groups whose MCP is paused, the controller will fetch the old machineConfig (before the pause) and creates RTE config map based on the decoded kubeletconfig data from it. Signed-off-by: Shereen Haj --- .../controller/kubeletconfig_controller.go | 116 +++++++++--- .../kubeletconfig_controller_test.go | 167 ++++++++++++++++++ internal/kubeletconfig/kubeletconfig.go | 89 ++++++++++ internal/kubeletconfig/kubeletconfig_test.go | 87 +++++++++ 4 files changed, 437 insertions(+), 22 deletions(-) create mode 100644 internal/kubeletconfig/kubeletconfig.go create mode 100644 internal/kubeletconfig/kubeletconfig_test.go diff --git a/internal/controller/kubeletconfig_controller.go b/internal/controller/kubeletconfig_controller.go index 72307344c1..03735a35f9 100644 --- a/internal/controller/kubeletconfig_controller.go +++ b/internal/controller/kubeletconfig_controller.go @@ -48,6 +48,7 @@ import ( "github.com/k8stopologyawareschedwg/deployer/pkg/deployer/platform" nropv1 "github.com/openshift-kni/numaresources-operator/api/v1" + intkubeletconfig "github.com/openshift-kni/numaresources-operator/internal/kubeletconfig" "github.com/openshift-kni/numaresources-operator/internal/machineconfigpools" "github.com/openshift-kni/numaresources-operator/pkg/apply" "github.com/openshift-kni/numaresources-operator/pkg/kubeletconfig" @@ -57,7 +58,8 @@ import ( ) const ( - kubeletConfigRetryPeriod = 30 * time.Second + kubeletConfigRetryPeriod = 30 * time.Second + MachineConfigPoolPausedRetryPeriod = 2 * time.Minute ) const ( @@ -90,6 +92,7 @@ type kubeletConfigHandler struct { type reconcileErrorHandler struct { err error tolerateError bool + result ctrl.Result } // Namespace Scoped @@ -126,21 +129,22 @@ func (r *KubeletConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques var klErr *InvalidKubeletConfig if errors.As(errHandler.err, &klErr) { r.Recorder.Event(instance, "Normal", "ProcessSkip", "ignored kubelet config "+klErr.ObjectName) - return ctrl.Result{}, nil + return errHandler.result, nil } if errHandler.tolerateError { r.Recorder.Event(instance, "Normal", "ProcessSkip", errHandler.err.Error()) - return ctrl.Result{}, nil + return errHandler.result, nil } klog.ErrorS(errHandler.err, "failed to reconcile configmap", "controller", "kubeletconfig") r.Recorder.Event(instance, "Warning", "ProcessFailed", "Failed to update RTE config from kubelet config "+req.NamespacedName.String()) - return ctrl.Result{}, errHandler.err + return errHandler.result, errHandler.err } r.Recorder.Event(instance, "Normal", "ProcessOK", fmt.Sprintf("Updated RTE config %s/%s from kubelet config %s", cm.Namespace, cm.Name, req.NamespacedName.String())) - return ctrl.Result{}, nil + //return ctrl.Result{}, nil + return errHandler.result, nil } func (r *KubeletConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -214,9 +218,9 @@ func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instan return cm, reconcileErrorHandler{err: err} } - kcHandler, err := r.makeKCHandlerForPlatform(ctx, instance, kcKey) - if err != nil { - return nil, reconcileErrorHandler{err: err} + kcHandler, errHandler := r.makeKCHandlerForPlatform(ctx, instance, kcKey) + if errHandler.err != nil { + return nil, errHandler } kubeletConfig, err := kubeletconfig.MCOKubeletConfToKubeletConf(kcHandler.mcoKc) if err != nil { @@ -225,7 +229,10 @@ func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instan } cm, err = r.syncConfigMap(ctx, kubeletConfig, instance, kcHandler) - return cm, reconcileErrorHandler{err: err} + if err != nil { + return nil, reconcileErrorHandler{err: err} + } + return cm, errHandler // FIXME use predicate } func (r *KubeletConfigReconciler) syncConfigMap(ctx context.Context, kubeletConfig *kubeletconfigv1beta1.KubeletConfiguration, instance *nropv1.NUMAResourcesOperator, kcHandler *kubeletConfigHandler) (*corev1.ConfigMap, error) { @@ -254,17 +261,17 @@ func (r *KubeletConfigReconciler) syncConfigMap(ctx context.Context, kubeletConf return rendered, nil } -func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*kubeletConfigHandler, error) { +func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*kubeletConfigHandler, reconcileErrorHandler) { switch r.Platform { case platform.OpenShift: mcoKc := &mcov1.KubeletConfig{} if err := r.Client.Get(ctx, kcKey, mcoKc); err != nil { - return nil, err + return nil, reconcileErrorHandler{err: err} } mcps, err := machineconfigpools.GetListByNodeGroupsV1(ctx, r.Client, instance.Spec.NodeGroups) if err != nil { - return nil, err + return nil, reconcileErrorHandler{err: err} } mcp, err := machineconfigpools.FindBySelector(mcps, mcoKc.Spec.MachineConfigPoolSelector) @@ -272,12 +279,12 @@ func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, klog.ErrorS(err, "cannot find a matching mcp for MCO KubeletConfig", "name", kcKey.Name) var notFound *machineconfigpools.NotFound if errors.As(err, ¬Found) { - return nil, &InvalidKubeletConfig{ + return nil, reconcileErrorHandler{err: &InvalidKubeletConfig{ ObjectName: kcKey.Name, Err: notFound, - } + }} } - return nil, err + return nil, reconcileErrorHandler{err: err} } klog.V(3).InfoS("matched MCP to MCO KubeletConfig", "kubeletconfig name", kcKey.Name, "MCP name", mcp.Name) @@ -285,32 +292,97 @@ func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, // nothing we care about, and we can't do much anyway if mcoKc.Spec.KubeletConfig == nil { klog.InfoS("detected KubeletConfig with empty payload, ignoring", "name", kcKey.Name) - return nil, &InvalidKubeletConfig{ObjectName: kcKey.Name} + return nil, reconcileErrorHandler{err: &InvalidKubeletConfig{ObjectName: kcKey.Name}} } + + if mcp.Spec.Paused { + klog.InfoS("detected paused MCP", "name", mcp.Name) + //if the CM exists -> just skip; + //if the CM does not exist -> create it based on the current active machineConfig + + expectedCMName := objectnames.GetComponentName(instance.Name, mcp.Name) + existingCM := &corev1.ConfigMap{} + if err := r.Client.Get(ctx, client.ObjectKey{Namespace: r.Namespace, Name: expectedCMName}, existingCM); err != nil { + if !apierrors.IsNotFound(err) { + return nil, reconcileErrorHandler{ + err: err, + tolerateError: true, + } + } + + currentConfigName := mcp.Status.Configuration.Name + currentConfigObj := &mcov1.MachineConfig{} + if err := r.Client.Get(ctx, client.ObjectKey{Name: currentConfigName}, currentConfigObj); err != nil { + klog.ErrorS(err, "cannot find the machineConfig", "name", currentConfigName) + return nil, reconcileErrorHandler{ + err: fmt.Errorf("failed to find the current machineConfig %s: %v", currentConfigName, err), + tolerateError: true, + result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}, + } + } + + // use local version of github.com/openshift/machine-config-operator/pkg/controller/common.ParseAndConvertConfig + _, dataInBytes, err := intkubeletconfig.ParseKubeletConfigRawData(currentConfigObj.Spec.Config.Raw) + if err != nil { + klog.ErrorS(err, "cannot parse the machineConfig", "name", currentConfigName) + return nil, reconcileErrorHandler{ + err: fmt.Errorf("failed to parse the machineConfig %s: %v", currentConfigName, err), + tolerateError: true, + result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}, + } + } + + decodeKc, err := intkubeletconfig.DecodeKubeletConfigurationFromData(dataInBytes) + if err != nil { + klog.ErrorS(err, "cannot decode the KubeletConfig data from MachineConfig", "name", currentConfigName) + return nil, reconcileErrorHandler{ + err: fmt.Errorf("failed to decode the KubeletConfig data from MachineConfig %s: %v", currentConfigName, err), + tolerateError: true, + result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}, + } + } + + return &kubeletConfigHandler{ + ownerObject: decodeKc, + mcoKc: decodeKc, + poolName: mcp.Name, + setCtrlRef: controllerutil.SetControllerReference, + }, reconcileErrorHandler{result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}} + } + + klog.InfoS("MachineConfigPool of KubeletConfig %s is paused and configMap %s exists", kcKey.Name, existingCM.Name) + return nil, reconcileErrorHandler{ + // the KubeletConfig has been already handled and we can skip the rest of reconciliation logic due to paused MCP + err: fmt.Errorf("MachineConfigPool of KubeletConfig %s is paused and configMap %s already exists", kcKey.Name, existingCM.Name), + tolerateError: true, + result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}, + } + } + return &kubeletConfigHandler{ ownerObject: mcoKc, mcoKc: mcoKc, poolName: mcp.Name, setCtrlRef: controllerutil.SetControllerReference, - }, nil + }, reconcileErrorHandler{} case platform.HyperShift: cmKc := &corev1.ConfigMap{} if err := r.Client.Get(ctx, kcKey, cmKc); err != nil { - return nil, err + return nil, reconcileErrorHandler{err: err} } nodePoolName := cmKc.Labels[HyperShiftNodePoolLabel] kcData := cmKc.Data[HyperShiftConfigMapConfigKey] mcoKc, err := kubeletconfig.DecodeFromData([]byte(kcData), r.Scheme) if err != nil { - return nil, err + return nil, reconcileErrorHandler{err: err} } // nothing we care about, and we can't do much anyway if mcoKc.Spec.KubeletConfig == nil { klog.InfoS("detected KubeletConfig with empty payload, ignoring", "name", kcKey.Name) - return nil, &InvalidKubeletConfig{ObjectName: kcKey.Name} + return nil, reconcileErrorHandler{err: &InvalidKubeletConfig{ObjectName: kcKey.Name}} } return &kubeletConfigHandler{ ownerObject: cmKc, @@ -322,9 +394,9 @@ func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, setCtrlRef: func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error { return nil }, - }, nil + }, reconcileErrorHandler{} } - return nil, fmt.Errorf("unsupported platform: %s", r.Platform) + return nil, reconcileErrorHandler{err: fmt.Errorf("unsupported platform: %s", r.Platform)} } func (r *KubeletConfigReconciler) deleteConfigMap(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*corev1.ConfigMap, bool, error) { diff --git a/internal/controller/kubeletconfig_controller_test.go b/internal/controller/kubeletconfig_controller_test.go index fae8a52f72..ee842c7c3d 100644 --- a/internal/controller/kubeletconfig_controller_test.go +++ b/internal/controller/kubeletconfig_controller_test.go @@ -272,4 +272,171 @@ var _ = Describe("Test KubeletConfig Reconcile", func() { Entry("OpenShift Platform", NewFakeKubeletConfigReconciler, platform.OpenShift), Entry("HyperShift Platform", NewFakeKubeletConfigReconcilerForHyperShift, platform.HyperShift), ) + + Context("kubeletconfig updates with paused MCPs", func() { + var nro *nropv1.NUMAResourcesOperator + var mcp1, mcpPaused *machineconfigv1.MachineConfigPool + var mcoKC1, mcoKCPaused *machineconfigv1.KubeletConfig + var label1 map[string]string + var kc1Key, kc2Key client.ObjectKey + var currentMachineConfig *machineconfigv1.MachineConfig + var poolName1, poolName2 string + var reconciler *KubeletConfigReconciler + var err error + + BeforeEach(func() { + label1 = map[string]string{ + "test1": "test1", + } + mcp1 = testobjs.NewMachineConfigPool("test1", label1, &metav1.LabelSelector{MatchLabels: label1}, &metav1.LabelSelector{MatchLabels: label1}) + poolName1 = mcp1.Name + kubeletConfig := &kubeletconfigv1beta1.KubeletConfiguration{} + mcoKC1 = testobjs.NewKubeletConfig(poolName1, label1, mcp1.Spec.MachineConfigSelector, kubeletConfig) + kc1Key = client.ObjectKeyFromObject(mcoKC1) + + label2 := map[string]string{ + "test2": "test2", + } + mcpPaused = testobjs.NewMachineConfigPool("test2", label2, &metav1.LabelSelector{MatchLabels: label2}, &metav1.LabelSelector{MatchLabels: label2}) + mcpPaused.Spec.Paused = true + poolName2 = mcpPaused.Name + kubeletConfigPaused := &kubeletconfigv1beta1.KubeletConfiguration{ + TopologyManagerPolicy: "restricted", + TopologyManagerScope: "container", + } + mcoKCPaused = testobjs.NewKubeletConfig(poolName2, label2, mcpPaused.Spec.MachineConfigSelector, kubeletConfigPaused) + kc2Key = client.ObjectKeyFromObject(mcoKCPaused) + currentMachineConfig = &machineconfigv1.MachineConfig{ + // this is a fake machineconfig that will be used to as the old machineconfig for the paused MCP + // it currently uses single-numa-node policy and pod scope + ObjectMeta: metav1.ObjectMeta{ + Name: "test2-mc", + }, + Spec: machineconfigv1.MachineConfigSpec{ + Config: runtime.RawExtension{ + Raw: []byte(`{ + "ignition": { + "version": "3.2.0" + }, + "storage": { + "files": [ + { + "contents": { + "source": "data:text/plain;charset=utf-8;base64,a2luZDogS3ViZWxldENvbmZpZ3VyYXRpb24KdG9wb2xvZ3lNYW5hZ2VyUG9saWN5OiBzaW5nbGUtbnVtYS1ub2RlCnRvcG9sb2d5TWFuYWdlclNjb3BlOiBwb2QK" + }, + "mode": 420, + "overwrite": true, + "path": "/etc/kubernetes/kubelet.conf" + } + ] + } + }`), + }, + }, + } + // we want a state where the active MachineConfig of a paused MCP is old + // intentionally because the MCP would be paused, so we can test that the old + // data is reflected in the generatedConfigMap + mcpPaused.Status.Configuration.Name = currentMachineConfig.Name + + ng1 := nropv1.NodeGroup{ + PoolName: &poolName1, + } + ng2 := nropv1.NodeGroup{ + PoolName: &poolName2, + } + nro = testobjs.NewNUMAResourcesOperator(objectnames.DefaultNUMAResourcesOperatorCrName, ng1, ng2) + + reconciler, err = NewFakeKubeletConfigReconciler(nro, mcp1, mcoKC1, mcpPaused, mcoKCPaused) + Expect(err).ToNot(HaveOccurred()) + }) + It("should create configmap for active MCP", func() { + result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc1Key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + cm := &corev1.ConfigMap{} + key := client.ObjectKey{ + Namespace: testNamespace, + Name: objectnames.GetComponentName(nro.Name, poolName1), + } + Expect(reconciler.Client.Get(context.TODO(), key, cm)).To(Succeed()) + }) + + It("should not create configmap for paused MCP if the current machineconfig is not found", func() { + result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod})) + + fakeRecorder, ok := reconciler.Recorder.(*record.FakeRecorder) + Expect(ok).To(BeTrue()) + event := <-fakeRecorder.Events + Expect(event).To(ContainSubstring("ProcessSkip")) + Expect(event).To(ContainSubstring("failed to find the current machineConfig")) + + cm := &corev1.ConfigMap{} + key := client.ObjectKey{ + Namespace: testNamespace, + Name: objectnames.GetComponentName(nro.Name, poolName2), + } + Expect(reconciler.Client.Get(context.TODO(), key, cm)).To(HaveOccurred()) + }) + It("should create configmap for paused MCP when machineconfig exists, with requeue", func() { + Expect(reconciler.Client.Create(context.TODO(), currentMachineConfig)).To(Succeed()) + result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod})) + + fakeRecorder, ok := reconciler.Recorder.(*record.FakeRecorder) + Expect(ok).To(BeTrue()) + event := <-fakeRecorder.Events + Expect(event).To(ContainSubstring("ProcessOK")) + Expect(event).To(ContainSubstring("Updated RTE config")) + + cm := &corev1.ConfigMap{} + key := client.ObjectKey{ + Namespace: testNamespace, + Name: objectnames.GetComponentName(nro.Name, poolName2), + } + Expect(reconciler.Client.Get(context.TODO(), key, cm)).To(Succeed()) + + cmData, ok := cm.Data["config.yaml"] + Expect(ok).To(BeTrue()) + Expect(cmData).To(ContainSubstring("single-numa-node")) + Expect(cmData).To(ContainSubstring("pod")) + }) + + It("should update configmap for paused MCP when MCP is unpaused", func() { + Expect(reconciler.Client.Create(context.TODO(), currentMachineConfig)).To(Succeed()) + result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod})) + + clonedMCP := mcpPaused.DeepCopy() + clonedMCP.Spec.Paused = false + Expect(reconciler.Client.Update(context.TODO(), clonedMCP)).To(Succeed()) + + result, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + + cm := &corev1.ConfigMap{} + key := client.ObjectKey{ + Namespace: testNamespace, + Name: objectnames.GetComponentName(nro.Name, poolName2), + } + Expect(reconciler.Client.Get(context.TODO(), key, cm)).To(Succeed()) + + fakeRecorder, ok := reconciler.Recorder.(*record.FakeRecorder) + Expect(ok).To(BeTrue()) + event := <-fakeRecorder.Events + Expect(event).To(ContainSubstring("ProcessOK")) + Expect(event).To(ContainSubstring(mcoKCPaused.Name)) + + cmData, ok := cm.Data["config.yaml"] + Expect(ok).To(BeTrue()) + Expect(cmData).To(ContainSubstring("restricted")) + Expect(cmData).To(ContainSubstring("container")) + }) + }) + }) diff --git a/internal/kubeletconfig/kubeletconfig.go b/internal/kubeletconfig/kubeletconfig.go new file mode 100644 index 0000000000..4540931375 --- /dev/null +++ b/internal/kubeletconfig/kubeletconfig.go @@ -0,0 +1,89 @@ +/* + * Copyright 2025 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubeletconfig + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/runtime" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" + + "sigs.k8s.io/yaml" + + mcov1 "github.com/openshift/api/machineconfiguration/v1" +) + +func ParseKubeletConfigRawData(b []byte) (string, []byte, error) { + // we are only interested in storage.files so get that part only and implement a local version of + // github.com/openshift/machine-config-operator/pkg/controller/common/helpers.go ParseAndConvertConfig() function + + type fileContent struct { + Source string `json:"source"` + } + type storageFile struct { + Path string `json:"path"` + Contents fileContent `json:"contents"` + } + + type storage struct { + Files []storageFile `json:"files"` + } + type config struct { + Storage storage `json:"storage"` + } + + cfg := &config{} + err := json.Unmarshal(b, cfg) + if err != nil { + return "", nil, err + } + + for _, file := range cfg.Storage.Files { + if file.Path == "/etc/kubernetes/kubelet.conf" { + if strings.HasPrefix(file.Contents.Source, "data:text/plain;charset=utf-8;base64,") { + base64Data := strings.TrimPrefix(file.Contents.Source, "data:text/plain;charset=utf-8;base64,") + decoded, err := base64.StdEncoding.DecodeString(base64Data) + if err != nil { + return "", nil, err + } + return string(decoded), decoded, nil + } + } + } + return "", nil, fmt.Errorf("kubelet config not found in MachineConfig data") +} + +func DecodeKubeletConfigurationFromData(data []byte) (*mcov1.KubeletConfig, error) { + kc := &kubeletconfigv1beta1.KubeletConfiguration{} + if err := yaml.Unmarshal(data, kc); err != nil { + return nil, err + } + + rawKc, err := json.Marshal(kc) + if err != nil { + return nil, err + } + + mcoKc := &mcov1.KubeletConfig{} + mcoKc.Spec.KubeletConfig = &runtime.RawExtension{ + Raw: rawKc, + } + return mcoKc, nil +} diff --git a/internal/kubeletconfig/kubeletconfig_test.go b/internal/kubeletconfig/kubeletconfig_test.go new file mode 100644 index 0000000000..7415c2afcf --- /dev/null +++ b/internal/kubeletconfig/kubeletconfig_test.go @@ -0,0 +1,87 @@ +/* + * Copyright 2025 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kubeletconfig + +import ( + "strings" + "testing" +) + +func TestParseKubeletConfigRawData(t *testing.T) { + type testCase struct { + name string + rawData string + expectedSetOfMatches []string + } + + testCases := []testCase{ + { + name: "Ignition v3 format (OCP 4.20)", + rawData: `{ + "ignition": { + "version": "3.2.0" + }, + "storage": { + "files": [ + { + "contents": { + "source": "data:text/plain;charset=utf-8;base64,a2luZDogS3ViZWxldENvbmZpZ3VyYXRpb24KdG9wb2xvZ3lNYW5hZ2VyUG9saWN5OiBzaW5nbGUtbnVtYS1ub2RlCnRvcG9sb2d5TWFuYWdlclNjb3BlOiBwb2QK" + }, + "mode": 420, + "overwrite": true, + "path": "/etc/kubernetes/kubelet.conf" + } + ] + } + }`, + expectedSetOfMatches: []string{ + "topologyManagerPolicy: single-numa-node", + "topologyManagerScope: pod", + }, + }, + { + name: "No kubelet config", + rawData: `{ + "ignition": { + "version": "3.2.0" + } + }`, + }, + { + name: "Empty raw data", + rawData: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kubeletConfigData, _, err := ParseKubeletConfigRawData([]byte(tc.rawData)) + if err != nil && len(tc.expectedSetOfMatches) > 0 { + t.Errorf("unexpected error: %v", err) + } + + if err != nil && len(tc.expectedSetOfMatches) == 0 { + return + } + + for _, match := range tc.expectedSetOfMatches { + if !strings.Contains(kubeletConfigData, match) { + t.Errorf(`expected kubeletConfigData to contain "%s", but got: %v`, match, kubeletConfigData) + } + } + }) + } +} From 38d6a911577ee2e0b57d4bf60c6edb102b0211bd Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Fri, 23 Jan 2026 13:24:59 +0200 Subject: [PATCH 3/5] ctrl:kubeletconfig: use MCP predicate instead of requeue We want to know when a paused MCP goes unpaused and do the needed updates to the config map when that happens. In previous commit, when a paused MCP was detected, the reconcile would requeue after some time; that was intended to catch the unpause operation eventually. In this commit, we don't requeue on detected paused MCP but rather watch for pause field updates on targeted MCPs. Signed-off-by: Shereen Haj --- .../controller/kubeletconfig_controller.go | 77 +++++++++++++++++-- .../kubeletconfig_controller_test.go | 5 +- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/internal/controller/kubeletconfig_controller.go b/internal/controller/kubeletconfig_controller.go index 03735a35f9..6c49cbb55b 100644 --- a/internal/controller/kubeletconfig_controller.go +++ b/internal/controller/kubeletconfig_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" @@ -189,11 +190,22 @@ func (r *KubeletConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { return equality.Semantic.DeepEqual(oldNodeGroups, newNodeGroups) }, } + + machineConfigPoolPredicate := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + mcpOld := e.ObjectOld.(*mcov1.MachineConfigPool) + mcpNew := e.ObjectNew.(*mcov1.MachineConfigPool) + return mcpOld.Spec.Paused && !mcpNew.Spec.Paused + }, + } + return ctrl.NewControllerManagedBy(mgr). For(o, builder.WithPredicates(p)). Owns(&corev1.ConfigMap{}). Watches(&nropv1.NUMAResourcesOperator{}, handler.EnqueueRequestsFromMapFunc(r.numaResourcesOperatorToKubeletConfig), builder.WithPredicates(numaResourcesOperatorPredicate)). + Watches(&mcov1.MachineConfigPool{}, handler.EnqueueRequestsFromMapFunc(r.nodeGroupToMachineConfigPool), + builder.WithPredicates(machineConfigPoolPredicate)). Complete(r) } @@ -229,10 +241,7 @@ func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instan } cm, err = r.syncConfigMap(ctx, kubeletConfig, instance, kcHandler) - if err != nil { - return nil, reconcileErrorHandler{err: err} - } - return cm, errHandler // FIXME use predicate + return cm, reconcileErrorHandler{err: err} } func (r *KubeletConfigReconciler) syncConfigMap(ctx context.Context, kubeletConfig *kubeletconfigv1beta1.KubeletConfiguration, instance *nropv1.NUMAResourcesOperator, kcHandler *kubeletConfigHandler) (*corev1.ConfigMap, error) { @@ -347,15 +356,15 @@ func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, mcoKc: decodeKc, poolName: mcp.Name, setCtrlRef: controllerutil.SetControllerReference, - }, reconcileErrorHandler{result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}} + }, reconcileErrorHandler{} } - klog.InfoS("MachineConfigPool of KubeletConfig %s is paused and configMap %s exists", kcKey.Name, existingCM.Name) + klog.InfoS("MachineConfigPool is paused and configMap exists", "KubeletConfig", kcKey.Name, "ConfigMap", existingCM.Name) return nil, reconcileErrorHandler{ // the KubeletConfig has been already handled and we can skip the rest of reconciliation logic due to paused MCP err: fmt.Errorf("MachineConfigPool of KubeletConfig %s is paused and configMap %s already exists", kcKey.Name, existingCM.Name), tolerateError: true, - result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod}, + result: ctrl.Result{}, } } @@ -455,6 +464,60 @@ func (r *KubeletConfigReconciler) numaResourcesOperatorToKubeletConfig(ctx conte return requests } +func (r *KubeletConfigReconciler) nodeGroupToMachineConfigPool(ctx context.Context, object client.Object) []reconcile.Request { + var requests []reconcile.Request + if r.Platform != platform.OpenShift { + return requests + } + + nro := &nropv1.NUMAResourcesOperator{} + if err := r.Client.Get(ctx, client.ObjectKey{Name: objectnames.DefaultNUMAResourcesOperatorCrName}, nro); err != nil { + klog.ErrorS(err, "failed to get NUMAResourcesOperator %v") + return requests + } + mcpList := &mcov1.MachineConfigPoolList{} + if err := r.Client.List(ctx, mcpList); err != nil { + klog.ErrorS(err, "failed to list MachineConfigPools %v") + } + mcpMap := make(map[string]mcov1.MachineConfigPool) + for _, mcp := range mcpList.Items { + mcpMap[mcp.Name] = mcp + } + + ngstatus := nro.Status.NodeGroups + targetMCPs := sets.New[string]() + for _, ngstatus := range ngstatus { + if mcp, ok := mcpMap[ngstatus.PoolName]; ok { + targetMCPs.Insert(mcp.Name) + } + } + + kcList := &mcov1.KubeletConfigList{} + if err := r.Client.List(ctx, kcList); err != nil { + klog.ErrorS(err, "failed to list KubeletConfigs %v") + } + //map mcp to kubeletconfig requests + for _, mcpName := range targetMCPs.UnsortedList() { + mcpLabels := labels.Set(mcpMap[mcpName].Labels) + for _, kc := range kcList.Items { + if kc.Spec.MachineConfigPoolSelector == nil { + continue + } + selector, err := metav1.LabelSelectorAsSelector(kc.Spec.MachineConfigPoolSelector) + if err != nil { + klog.ErrorS(err, "failed to parse MachineConfigPoolSelector", "kubeletconfig", kc.Name) + continue + } + if selector.Matches(mcpLabels) { + requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKey{ + Name: kc.Name, + }}) + } + } + } + return requests +} + func getDeletedOwner(kcKey client.ObjectKey, ownerConfigMaps []*corev1.ConfigMap) *corev1.ConfigMap { for i := range ownerConfigMaps { cm := ownerConfigMaps[i] diff --git a/internal/controller/kubeletconfig_controller_test.go b/internal/controller/kubeletconfig_controller_test.go index ee842c7c3d..a24af44786 100644 --- a/internal/controller/kubeletconfig_controller_test.go +++ b/internal/controller/kubeletconfig_controller_test.go @@ -384,7 +384,7 @@ var _ = Describe("Test KubeletConfig Reconcile", func() { Expect(reconciler.Client.Create(context.TODO(), currentMachineConfig)).To(Succeed()) result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) Expect(err).ToNot(HaveOccurred()) - Expect(result).To(Equal(reconcile.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod})) + Expect(result).To(Equal(reconcile.Result{})) fakeRecorder, ok := reconciler.Recorder.(*record.FakeRecorder) Expect(ok).To(BeTrue()) @@ -409,12 +409,13 @@ var _ = Describe("Test KubeletConfig Reconcile", func() { Expect(reconciler.Client.Create(context.TODO(), currentMachineConfig)).To(Succeed()) result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) Expect(err).ToNot(HaveOccurred()) - Expect(result).To(Equal(reconcile.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod})) + Expect(result).To(Equal(reconcile.Result{})) clonedMCP := mcpPaused.DeepCopy() clonedMCP.Spec.Paused = false Expect(reconciler.Client.Update(context.TODO(), clonedMCP)).To(Succeed()) + // would betriggered by MCP predicate result, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: kc2Key}) Expect(err).ToNot(HaveOccurred()) Expect(result).To(Equal(reconcile.Result{})) From b08d423b5b1fd7abd8c0a69f9f7927b8efe1d604 Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Wed, 5 Nov 2025 11:52:40 +0200 Subject: [PATCH 4/5] ctrl: nro: handle paused MCPs The situtaion is mainly mitigated in the kubeletconfig controller, however we have a bug in the NRO controller such that it puts the NRO CR in progressing state because paused MCP is not in updated condition. This commit ignores checking whether a paused MCP is up-to-date, and introduces a new status condition to report paused MCPs if exist. Signed-off-by: Shereen Haj --- .../numaresourcesoperator_controller.go | 46 +++++++-- .../numaresourcesoperator_controller_test.go | 99 +++++++++++++++++++ .../numaresourcesscheduler_controller.go | 2 +- pkg/objectstate/rte/machineconfigpool.go | 21 +++- pkg/status/status.go | 23 ++++- pkg/status/status_test.go | 4 +- 6 files changed, 178 insertions(+), 17 deletions(-) diff --git a/internal/controller/numaresourcesoperator_controller.go b/internal/controller/numaresourcesoperator_controller.go index df5c169611..017ed7ca4b 100644 --- a/internal/controller/numaresourcesoperator_controller.go +++ b/internal/controller/numaresourcesoperator_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "strings" "time" appsv1 "k8s.io/api/apps/v1" @@ -31,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -141,7 +143,7 @@ func (r *NUMAResourcesOperatorReconciler) Reconcile(ctx context.Context, req ctr initialStatus := *instance.Status.DeepCopy() if len(initialStatus.Conditions) == 0 { - instance.Status.Conditions = status.DefaultBaseConditions(time.Now()) + instance.Status.Conditions = status.NewNUMAResourcesOperatorConditions() } if req.Name != objectnames.DefaultNUMAResourcesOperatorCrName { @@ -236,7 +238,7 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResourceAPI(ctx context.Conte func (r *NUMAResourcesOperatorReconciler) reconcileResourceMachineConfig(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) intreconcile.Step { // we need to sync machine configs first and wait for the MachineConfigPool updates // before checking additional components for updates - mcpUpdatedFunc, err := r.syncMachineConfigs(ctx, instance, existing, trees) + mcpUpdatedFunc, pausedMCPs, err := r.syncMachineConfigs(ctx, instance, existing, trees) if err != nil { r.Recorder.Eventf(instance, corev1.EventTypeWarning, "FailedMCSync", "Failed to set up machine configuration for worker nodes: %v", err) err = fmt.Errorf("failed to sync machine configs: %w", err) @@ -246,7 +248,7 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResourceMachineConfig(ctx con // MCO needs to update the SELinux context removal and other stuff, and need to trigger a reboot. // It can take a while. - mcpStatuses, mcpNamePending := syncMachineConfigPoolsStatuses(instance.Name, trees, r.ForwardMCPConds, mcpUpdatedFunc) + mcpStatuses, mcpNamePending := syncMachineConfigPoolsStatuses(instance.Name, trees, r.ForwardMCPConds, mcpUpdatedFunc, pausedMCPs) instance.Status.MachineConfigPools = mcpStatuses if mcpNamePending != "" { @@ -255,6 +257,8 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResourceMachineConfig(ctx con } instance.Status.MachineConfigPools = syncMachineConfigPoolNodeGroupConfigStatuses(instance.Status.MachineConfigPools, trees) + instance.Status.Conditions = updateMachineConfigPoolPausedCondition(instance.Status.Conditions, instance.Generation, pausedMCPs) + return intreconcile.StepSuccess() } @@ -390,7 +394,7 @@ func (r *NUMAResourcesOperatorReconciler) syncNodeResourceTopologyAPI(ctx contex return (updatedCount == len(objStates)), err } -func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) (rtestate.MCPWaitForUpdatedFunc, error) { +func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) (rtestate.MCPWaitForUpdatedFunc, sets.Set[string], error) { klog.V(4).InfoS("Machine Config Sync start", "trees", len(trees)) defer klog.V(4).Info("Machine Config Sync stop") @@ -400,7 +404,7 @@ func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context // In case of operator upgrade from 4.1X → 4.18, it's necessary to remove the old MachineConfig, // unless an emergency annotation is provided which forces the operator to use custom policy - objStates, waitFunc := existing.MachineConfigsState(r.RTEManifests) + objStates, waitFunc, pausedMCPs := existing.MachineConfigsState(r.RTEManifests) for _, objState := range objStates { klog.InfoS("objState", "desired", objState.Desired, "existing", objState.Existing, "createOrUpdate", objState.IsCreateOrUpdate()) if objState.IsCreateOrUpdate() { @@ -420,10 +424,10 @@ func (r *NUMAResourcesOperatorReconciler) syncMachineConfigs(ctx context.Context break } } - return waitFunc, err + return waitFunc, pausedMCPs, err } -func syncMachineConfigPoolsStatuses(instanceName string, trees []nodegroupv1.Tree, forwardMCPConds bool, updatedFunc rtestate.MCPWaitForUpdatedFunc) ([]nropv1.MachineConfigPool, string) { +func syncMachineConfigPoolsStatuses(instanceName string, trees []nodegroupv1.Tree, forwardMCPConds bool, updatedFunc rtestate.MCPWaitForUpdatedFunc, pausedMCPs sets.Set[string]) ([]nropv1.MachineConfigPool, string) { klog.V(4).InfoS("Machine Config Status Sync start", "trees", len(trees)) defer klog.V(4).Info("Machine Config Status Sync stop") @@ -432,6 +436,11 @@ func syncMachineConfigPoolsStatuses(instanceName string, trees []nodegroupv1.Tre for _, mcp := range tree.MachineConfigPools { mcpStatuses = append(mcpStatuses, extractMCPStatus(mcp, forwardMCPConds)) + if pausedMCPs.Has(mcp.Name) { + klog.V(5).InfoS("Paused MachineConfigPool detected", "name", mcp.Name) + continue + } + isUpdated := updatedFunc(instanceName, mcp) klog.V(5).InfoS("Machine Config Pool state", "name", mcp.Name, "instance", instanceName, "updated", isUpdated) @@ -597,7 +606,8 @@ func (r *NUMAResourcesOperatorReconciler) SetupWithManager(mgr ctrl.Manager) err return !reflect.DeepEqual(mcpOld.Status.Conditions, mcpNew.Status.Conditions) || !apiequality.Semantic.DeepEqual(mcpOld.Labels, mcpNew.Labels) || !apiequality.Semantic.DeepEqual(mcpOld.Spec.MachineConfigSelector, mcpNew.Spec.MachineConfigSelector) || - !apiequality.Semantic.DeepEqual(mcpOld.Spec.NodeSelector, mcpNew.Spec.NodeSelector) + !apiequality.Semantic.DeepEqual(mcpOld.Spec.NodeSelector, mcpNew.Spec.NodeSelector) || + !reflect.DeepEqual(mcpOld.Spec.Paused, mcpNew.Spec.Paused) }, } @@ -802,3 +812,23 @@ func getTreesByNodeGroup(ctx context.Context, cli client.Client, nodeGroups []nr return nil, fmt.Errorf("unsupported platform") } } + +func updateMachineConfigPoolPausedCondition(conditions []metav1.Condition, generation int64, pausedMCPs sets.Set[string]) []metav1.Condition { + pausedStatus := metav1.ConditionFalse + message := "" + if pausedMCPs.Len() > 0 { + klog.InfoS("detected paused MCPs", "pausedMCPs", pausedMCPs.UnsortedList()) + pausedStatus = metav1.ConditionTrue + message = "detected paused MCPs: " + strings.Join(pausedMCPs.UnsortedList(), ", ") + } + condition := metav1.Condition{ + Type: status.ConditionMachineConfigPoolPaused, + Status: pausedStatus, + Reason: status.ConditionMachineConfigPoolPaused, + Message: message, + ObservedGeneration: generation, + LastTransitionTime: metav1.Now(), + } + conditions, _ = status.ComputeConditions(conditions, condition, time.Now()) + return conditions +} diff --git a/internal/controller/numaresourcesoperator_controller_test.go b/internal/controller/numaresourcesoperator_controller_test.go index 5f1429764d..e845c1388a 100644 --- a/internal/controller/numaresourcesoperator_controller_test.go +++ b/internal/controller/numaresourcesoperator_controller_test.go @@ -2149,6 +2149,105 @@ var _ = Describe("Test NUMAResourcesOperator Reconcile", func() { Expect(apierrors.IsNotFound(err)).To(BeTrue(), "MachineConfig %s expected to be deleted; err=%v", mc2Key.Name, err) }) }) + + Context("with status condition updates", func() { + var nro *nropv1.NUMAResourcesOperator + var mcp *machineconfigv1.MachineConfigPool + var reconciler *NUMAResourcesOperatorReconciler + var label map[string]string + var key client.ObjectKey + var err error + + ctx := context.TODO() + + BeforeEach(func() { + label = map[string]string{ + "test1": "test1", + } + + ng := nropv1.NodeGroup{ + MachineConfigPoolSelector: &metav1.LabelSelector{ + MatchLabels: label, + }, + } + nro = testobjs.NewNUMAResourcesOperator(objectnames.DefaultNUMAResourcesOperatorCrName, ng) + key = client.ObjectKeyFromObject(nro) + + mcp = testobjs.NewMachineConfigPool("test1", label, &metav1.LabelSelector{MatchLabels: label}, &metav1.LabelSelector{MatchLabels: label}) + + reconciler, err = NewFakeNUMAResourcesOperatorReconciler(platform.OpenShift, defaultOCPVersion, nro, mcp) + Expect(err).ToNot(HaveOccurred()) + + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + }) + It("should report progressing condition for un-paused MCPs but in updating state", func() { + Expect(reconciler.Client.Get(ctx, key, nro)).To(Succeed()) + Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(mcp), mcp)).To(Succeed()) + mcp.Status.Conditions = []machineconfigv1.MachineConfigPoolCondition{ + { + Type: machineconfigv1.MachineConfigPoolUpdated, + Status: corev1.ConditionFalse, + }, + } + + Expect(reconciler.Client.Update(ctx, mcp)).To(Succeed()) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{RequeueAfter: time.Minute})) + + Expect(reconciler.Client.Get(ctx, key, nro)).To(Succeed()) + + progressingCondition := getConditionByType(nro.Status.Conditions, status.ConditionProgressing) + Expect(progressingCondition.Status).To(Equal(metav1.ConditionTrue)) + Expect(progressingCondition.Reason).To(Equal("MachineConfigPoolIsUpdating")) + Expect(progressingCondition.Message).To(ContainSubstring("test1 is updating")) + }) + It("should not report progressing condition for paused MCPs", func() { + Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(mcp), mcp)).To(Succeed()) + mcp.Status.Conditions = []machineconfigv1.MachineConfigPoolCondition{ + { + Type: machineconfigv1.MachineConfigPoolUpdated, + Status: corev1.ConditionFalse, + }, + } + mcp.Spec.Paused = true + + Expect(reconciler.Client.Update(ctx, mcp)).To(Succeed()) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + + Expect(reconciler.Client.Get(ctx, key, nro)).To(Succeed()) + + progressingCondition := getConditionByType(nro.Status.Conditions, status.ConditionProgressing) + Expect(progressingCondition.Status).To(Equal(metav1.ConditionFalse), "Progressing condition should be false because the MCP is paused, got conditions: %v", nro.Status.Conditions) + }) + + It("should report MachineConfigPoolPaused condition for paused MCPs", func() { + Expect(reconciler.Client.Get(ctx, client.ObjectKeyFromObject(mcp), mcp)).To(Succeed()) + mcp.Status.Conditions = []machineconfigv1.MachineConfigPoolCondition{ + { + Type: machineconfigv1.MachineConfigPoolUpdated, + Status: corev1.ConditionFalse, + }, + } + mcp.Spec.Paused = true + + Expect(reconciler.Client.Update(ctx, mcp)).To(Succeed()) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + + Expect(reconciler.Client.Get(ctx, key, nro)).To(Succeed()) + + pausedCondition := getConditionByType(nro.Status.Conditions, status.ConditionMachineConfigPoolPaused) + Expect(pausedCondition.Status).To(Equal(metav1.ConditionTrue), "Paused condition should be true because the MCP is paused, got conditions: %v", nro.Status.Conditions) + Expect(pausedCondition.Reason).To(Equal(status.ConditionMachineConfigPoolPaused)) + Expect(pausedCondition.Message).To(ContainSubstring("detected paused MCPs: test1")) + }) + }) }) Describe("platform agnostic", func() { diff --git a/internal/controller/numaresourcesscheduler_controller.go b/internal/controller/numaresourcesscheduler_controller.go index 99896df2b3..3dc29e4ff9 100644 --- a/internal/controller/numaresourcesscheduler_controller.go +++ b/internal/controller/numaresourcesscheduler_controller.go @@ -117,7 +117,7 @@ func (r *NUMAResourcesSchedulerReconciler) Reconcile(ctx context.Context, req ct initialStatus := *instance.Status.DeepCopy() if len(initialStatus.Conditions) == 0 { - instance.Status.Conditions = status.NewNUMAResourcesSchedulerBaseConditions() + instance.Status.Conditions = status.NewNUMAResourcesSchedulerConditions() } if req.Name != objectnames.DefaultNUMAResourcesSchedulerCrName { diff --git a/pkg/objectstate/rte/machineconfigpool.go b/pkg/objectstate/rte/machineconfigpool.go index 11aa1afd78..daf53fa0a6 100644 --- a/pkg/objectstate/rte/machineconfigpool.go +++ b/pkg/objectstate/rte/machineconfigpool.go @@ -21,6 +21,7 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -129,14 +130,20 @@ func MatchMachineConfigPoolCondition(conditions []machineconfigv1.MachineConfigP type MCPWaitForUpdatedFunc func(string, *machineconfigv1.MachineConfigPool) bool -func (em *ExistingManifests) MachineConfigsState(mf Manifests) ([]objectstate.ObjectState, MCPWaitForUpdatedFunc) { +func (em *ExistingManifests) MachineConfigsState(mf Manifests) ([]objectstate.ObjectState, MCPWaitForUpdatedFunc, sets.Set[string]) { + pausedMCPs := sets.New[string]() var ret []objectstate.ObjectState if mf.Core.MachineConfig == nil { - return ret, nullMachineConfigPoolUpdated + return ret, nullMachineConfigPoolUpdated, pausedMCPs } enabledMCCount := 0 for _, tree := range em.trees { for _, mcp := range tree.MachineConfigPools { + if mcp.Spec.Paused { + pausedMCPs.Insert(mcp.Name) + continue + } + mcName := objectnames.GetMachineConfigName(em.instance.Name, mcp.Name) if mcp.Spec.MachineConfigSelector == nil { klog.Warningf("the machine config pool %q does not have machine config selector", mcp.Name) @@ -183,10 +190,16 @@ func (em *ExistingManifests) MachineConfigsState(mf Manifests) ([]objectstate.Ob } klog.V(4).InfoS("machineConfigsState", "enabledMachineConfigs", enabledMCCount) + // TODO: the API design allows to configure custom annotation per nodegroup, + // meaning that the function that checks if an MCP is updated or not is different + // in each case. This should be refactored to adapt the flexible configuration or + // get consensus on the API design to correct this. The problem with the current + // approach is that nodegroups with default selinux will need to have the custom + // machine config, which is never met. if enabledMCCount > 0 { - return ret, IsMachineConfigPoolUpdated + return ret, IsMachineConfigPoolUpdated, pausedMCPs } - return ret, IsMachineConfigPoolUpdatedAfterDeletion + return ret, IsMachineConfigPoolUpdatedAfterDeletion, pausedMCPs } func nullMachineConfigPoolUpdated(instanceName string, mcp *machineconfigv1.MachineConfigPool) bool { diff --git a/pkg/status/status.go b/pkg/status/status.go index 8a7a37f746..0ec0e93c5c 100644 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -37,9 +37,15 @@ const ( ) const ( + // scheduler conditions ConditionDedicatedInformerActive = "DedicatedInformerActive" ) +const ( + // operator conditions + ConditionMachineConfigPoolPaused = "MachineConfigPoolPaused" +) + // TODO: are we duping these? const ( ReasonAsExpected = "AsExpected" @@ -183,9 +189,9 @@ func DefaultBaseConditions(timestamp time.Time) []metav1.Condition { } } -// NewNUMAResourcesSchedulerBaseConditions creates specific conditions on +// NewNUMAResourcesSchedulerConditions creates specific scheduler conditions on // top of NewBaseConditions. -func NewNUMAResourcesSchedulerBaseConditions() []metav1.Condition { +func NewNUMAResourcesSchedulerConditions() []metav1.Condition { now := time.Now() conds := append(DefaultBaseConditions(now), metav1.Condition{ Type: ConditionDedicatedInformerActive, @@ -196,6 +202,19 @@ func NewNUMAResourcesSchedulerBaseConditions() []metav1.Condition { return conds } +// NewNUMAResourcesOperatorConditions creates specific operator conditions on +// top of NewBaseConditions. +func NewNUMAResourcesOperatorConditions() []metav1.Condition { + now := time.Now() + conds := append(DefaultBaseConditions(now), metav1.Condition{ + Type: ConditionMachineConfigPoolPaused, + Status: metav1.ConditionUnknown, + LastTransitionTime: metav1.Time{Time: now}, + Reason: ConditionMachineConfigPoolPaused, + }) + return conds +} + // CloneConditions creates a deep copy of the given `conditions`. func CloneConditions(conditions []metav1.Condition) []metav1.Condition { var c = make([]metav1.Condition, len(conditions)) diff --git a/pkg/status/status_test.go b/pkg/status/status_test.go index 6264aee11c..140bb3045a 100644 --- a/pkg/status/status_test.go +++ b/pkg/status/status_test.go @@ -292,7 +292,7 @@ func TestComputeConditions(t *testing.T) { }{ { name: "first reconcile iteration - with operator condition", - conditions: NewNUMAResourcesSchedulerBaseConditions(), + conditions: NewNUMAResourcesSchedulerConditions(), condition: metav1.Condition{ Type: ConditionAvailable, Status: metav1.ConditionTrue, @@ -334,7 +334,7 @@ func TestComputeConditions(t *testing.T) { }, { name: "first reconcile iteration - with informer condition", - conditions: NewNUMAResourcesSchedulerBaseConditions(), + conditions: NewNUMAResourcesSchedulerConditions(), condition: metav1.Condition{ Type: ConditionDedicatedInformerActive, Status: metav1.ConditionTrue, From 22533cbf10dd7df592c8d32999a65eef9cb04a7a Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Fri, 23 Jan 2026 15:23:48 +0200 Subject: [PATCH 5/5] ctrl: nro: reconcile on node group with poolName Node group can be set with either MCPselector or a poolName. Validation of the nodegroup configuraion is checked in the reconcile logic, and because poolName is set and MCPSelector is not a valid nodeGroup we need to consider this and reconcile. This was missed when nodeGroup.PoolName was first introduced and didn't cause any observed reconcile failures because: 1. any change on the nodeGroups would anyway trigger reconciliation 2. so far the MCP related updates were caused by Kubeletconfig updates which wasn't blocked until the paused-MCP case got handled (which is why this appeared now). Signed-off-by: Shereen Haj --- .../controller/numaresourcesoperator_controller.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/controller/numaresourcesoperator_controller.go b/internal/controller/numaresourcesoperator_controller.go index 017ed7ca4b..43920c7c29 100644 --- a/internal/controller/numaresourcesoperator_controller.go +++ b/internal/controller/numaresourcesoperator_controller.go @@ -663,6 +663,18 @@ func (r *NUMAResourcesOperatorReconciler) mcpToNUMAResourceOperator(ctx context. nro := &nros.Items[i] mcpLabels := labels.Set(mcp.Labels) for _, nodeGroup := range nro.Spec.NodeGroups { + if nodeGroup.PoolName != nil { + if mcp.Name == *nodeGroup.PoolName { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Name: nro.Name, + }, + }) + break + } + continue + } + if nodeGroup.MachineConfigPoolSelector == nil { continue } @@ -679,10 +691,10 @@ func (r *NUMAResourcesOperatorReconciler) mcpToNUMAResourceOperator(ctx context. Name: nro.Name, }, }) + break } } } - return requests }