Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 173 additions & 28 deletions internal/controller/kubeletconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
Expand All @@ -48,6 +49,7 @@ import (
"github.com/k8stopologyawareschedwg/deployer/pkg/deployer/platform"

nropv1 "github.com/openshift-kni/numaresources-operator/api/v1"
intkubeletconfig "github.com/openshift-kni/numaresources-operator/internal/kubeletconfig"
"github.com/openshift-kni/numaresources-operator/internal/machineconfigpools"
"github.com/openshift-kni/numaresources-operator/pkg/apply"
"github.com/openshift-kni/numaresources-operator/pkg/kubeletconfig"
Expand All @@ -57,7 +59,8 @@ import (
)

const (
kubeletConfigRetryPeriod = 30 * time.Second
kubeletConfigRetryPeriod = 30 * time.Second
MachineConfigPoolPausedRetryPeriod = 2 * time.Minute
)

const (
Expand Down Expand Up @@ -87,6 +90,12 @@ type kubeletConfigHandler struct {
setCtrlRef func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error
}

type reconcileErrorHandler struct {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a handler. a Handler manages something. This seems more a reconcileResult

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse a reconcile.Step, extend it, or even create a variant of?

err error
tolerateError bool
result ctrl.Result
}

// Namespace Scoped

// Cluster Scoped
Expand Down Expand Up @@ -116,22 +125,27 @@ func (r *KubeletConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// KubeletConfig changes are expected to be sporadic, yet are important enough
// to be made visible at kubernetes level. So we generate events to handle them
cm, err := r.reconcileConfigMap(ctx, instance, req.NamespacedName)
if err != nil {
cm, errHandler := r.reconcileConfigMap(ctx, instance, req.NamespacedName)
if errHandler.err != nil {
var klErr *InvalidKubeletConfig
if errors.As(err, &klErr) {
if errors.As(errHandler.err, &klErr) {
r.Recorder.Event(instance, "Normal", "ProcessSkip", "ignored kubelet config "+klErr.ObjectName)
return ctrl.Result{}, nil
return errHandler.result, nil
}
if errHandler.tolerateError {
r.Recorder.Event(instance, "Normal", "ProcessSkip", errHandler.err.Error())
return errHandler.result, nil
}

klog.ErrorS(err, "failed to reconcile configmap", "controller", "kubeletconfig")
klog.ErrorS(errHandler.err, "failed to reconcile configmap", "controller", "kubeletconfig")

r.Recorder.Event(instance, "Warning", "ProcessFailed", "Failed to update RTE config from kubelet config "+req.NamespacedName.String())
return ctrl.Result{}, err
return errHandler.result, errHandler.err
}

r.Recorder.Event(instance, "Normal", "ProcessOK", fmt.Sprintf("Updated RTE config %s/%s from kubelet config %s", cm.Namespace, cm.Name, req.NamespacedName.String()))
return ctrl.Result{}, nil
//return ctrl.Result{}, nil
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftovers I presume

return errHandler.result, nil
}

func (r *KubeletConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -176,11 +190,22 @@ func (r *KubeletConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
return equality.Semantic.DeepEqual(oldNodeGroups, newNodeGroups)
},
}

machineConfigPoolPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
mcpOld := e.ObjectOld.(*mcov1.MachineConfigPool)
mcpNew := e.ObjectNew.(*mcov1.MachineConfigPool)
return mcpOld.Spec.Paused && !mcpNew.Spec.Paused
},
}

return ctrl.NewControllerManagedBy(mgr).
For(o, builder.WithPredicates(p)).
Owns(&corev1.ConfigMap{}).
Watches(&nropv1.NUMAResourcesOperator{}, handler.EnqueueRequestsFromMapFunc(r.numaResourcesOperatorToKubeletConfig),
builder.WithPredicates(numaResourcesOperatorPredicate)).
Watches(&mcov1.MachineConfigPool{}, handler.EnqueueRequestsFromMapFunc(r.nodeGroupToMachineConfigPool),
builder.WithPredicates(machineConfigPoolPredicate)).
Complete(r)
}

Expand All @@ -197,25 +222,26 @@ func (e *InvalidKubeletConfig) Unwrap() error {
return e.Err
}

func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*corev1.ConfigMap, error) {
func (r *KubeletConfigReconciler) reconcileConfigMap(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*corev1.ConfigMap, reconcileErrorHandler) {
// first check if the ConfigMap should be deleted
// to save all the additional work related for create/update
cm, deleted, err := r.deleteConfigMap(ctx, instance, kcKey)
if deleted {
return cm, err
return cm, reconcileErrorHandler{err: err}
}

kcHandler, err := r.makeKCHandlerForPlatform(ctx, instance, kcKey)
if err != nil {
return nil, err
kcHandler, errHandler := r.makeKCHandlerForPlatform(ctx, instance, kcKey)
if errHandler.err != nil {
return nil, errHandler
}
kubeletConfig, err := kubeletconfig.MCOKubeletConfToKubeletConf(kcHandler.mcoKc)
if err != nil {
klog.ErrorS(err, "cannot extract KubeletConfiguration from MCO KubeletConfig", "name", kcKey.Name)
return nil, err
return nil, reconcileErrorHandler{err: err}
}

return r.syncConfigMap(ctx, kubeletConfig, instance, kcHandler)
cm, err = r.syncConfigMap(ctx, kubeletConfig, instance, kcHandler)
return cm, reconcileErrorHandler{err: err}
}

func (r *KubeletConfigReconciler) syncConfigMap(ctx context.Context, kubeletConfig *kubeletconfigv1beta1.KubeletConfiguration, instance *nropv1.NUMAResourcesOperator, kcHandler *kubeletConfigHandler) (*corev1.ConfigMap, error) {
Expand Down Expand Up @@ -244,63 +270,128 @@ func (r *KubeletConfigReconciler) syncConfigMap(ctx context.Context, kubeletConf
return rendered, nil
}

func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*kubeletConfigHandler, error) {
func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*kubeletConfigHandler, reconcileErrorHandler) {
switch r.Platform {
case platform.OpenShift:
mcoKc := &mcov1.KubeletConfig{}
if err := r.Client.Get(ctx, kcKey, mcoKc); err != nil {
return nil, err
return nil, reconcileErrorHandler{err: err}
}

mcps, err := machineconfigpools.GetListByNodeGroupsV1(ctx, r.Client, instance.Spec.NodeGroups)
if err != nil {
return nil, err
return nil, reconcileErrorHandler{err: err}
}

mcp, err := machineconfigpools.FindBySelector(mcps, mcoKc.Spec.MachineConfigPoolSelector)
if err != nil {
klog.ErrorS(err, "cannot find a matching mcp for MCO KubeletConfig", "name", kcKey.Name)
var notFound *machineconfigpools.NotFound
if errors.As(err, &notFound) {
return nil, &InvalidKubeletConfig{
return nil, reconcileErrorHandler{err: &InvalidKubeletConfig{
ObjectName: kcKey.Name,
Err: notFound,
}
}}
}
return nil, err
return nil, reconcileErrorHandler{err: err}
}

klog.V(3).InfoS("matched MCP to MCO KubeletConfig", "kubeletconfig name", kcKey.Name, "MCP name", mcp.Name)

// nothing we care about, and we can't do much anyway
if mcoKc.Spec.KubeletConfig == nil {
klog.InfoS("detected KubeletConfig with empty payload, ignoring", "name", kcKey.Name)
return nil, &InvalidKubeletConfig{ObjectName: kcKey.Name}
return nil, reconcileErrorHandler{err: &InvalidKubeletConfig{ObjectName: kcKey.Name}}
}

if mcp.Spec.Paused {
klog.InfoS("detected paused MCP", "name", mcp.Name)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be printed at every reconciliation step it seems. Let's make it V(4)

//if the CM exists -> just skip;
//if the CM does not exist -> create it based on the current active machineConfig
Comment thread
shajmakh marked this conversation as resolved.

expectedCMName := objectnames.GetComponentName(instance.Name, mcp.Name)
existingCM := &corev1.ConfigMap{}
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: r.Namespace, Name: expectedCMName}, existingCM); err != nil {
if !apierrors.IsNotFound(err) {
return nil, reconcileErrorHandler{
err: err,
tolerateError: true,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to support the operation even when MCP are paused, why these errors are tolerate?

}
}

currentConfigName := mcp.Status.Configuration.Name
currentConfigObj := &mcov1.MachineConfig{}
if err := r.Client.Get(ctx, client.ObjectKey{Name: currentConfigName}, currentConfigObj); err != nil {
klog.ErrorS(err, "cannot find the machineConfig", "name", currentConfigName)
return nil, reconcileErrorHandler{
err: fmt.Errorf("failed to find the current machineConfig %s: %v", currentConfigName, err),
tolerateError: true,
result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod},
}
}

// use local version of github.com/openshift/machine-config-operator/pkg/controller/common.ParseAndConvertConfig
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just narrating in english what the code does. Either give context about why or remove the comment

_, dataInBytes, err := intkubeletconfig.ParseKubeletConfigRawData(currentConfigObj.Spec.Config.Raw)
if err != nil {
klog.ErrorS(err, "cannot parse the machineConfig", "name", currentConfigName)
return nil, reconcileErrorHandler{
err: fmt.Errorf("failed to parse the machineConfig %s: %v", currentConfigName, err),
tolerateError: true,
result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod},
}
}

decodeKc, err := intkubeletconfig.DecodeKubeletConfigurationFromData(dataInBytes)
if err != nil {
klog.ErrorS(err, "cannot decode the KubeletConfig data from MachineConfig", "name", currentConfigName)
return nil, reconcileErrorHandler{
err: fmt.Errorf("failed to decode the KubeletConfig data from MachineConfig %s: %v", currentConfigName, err),
tolerateError: true,
result: ctrl.Result{Requeue: true, RequeueAfter: MachineConfigPoolPausedRetryPeriod},
}
}

return &kubeletConfigHandler{
ownerObject: decodeKc,
mcoKc: decodeKc,
poolName: mcp.Name,
setCtrlRef: controllerutil.SetControllerReference,
}, reconcileErrorHandler{}
}

klog.InfoS("MachineConfigPool is paused and configMap exists", "KubeletConfig", kcKey.Name, "ConfigMap", existingCM.Name)
return nil, reconcileErrorHandler{
// the KubeletConfig has been already handled and we can skip the rest of reconciliation logic due to paused MCP
err: fmt.Errorf("MachineConfigPool of KubeletConfig %s is paused and configMap %s already exists", kcKey.Name, existingCM.Name),
tolerateError: true,
result: ctrl.Result{},
}
}

return &kubeletConfigHandler{
ownerObject: mcoKc,
mcoKc: mcoKc,
poolName: mcp.Name,
setCtrlRef: controllerutil.SetControllerReference,
}, nil
}, reconcileErrorHandler{}

case platform.HyperShift:
cmKc := &corev1.ConfigMap{}
if err := r.Client.Get(ctx, kcKey, cmKc); err != nil {
return nil, err
return nil, reconcileErrorHandler{err: err}
}

nodePoolName := cmKc.Labels[HyperShiftNodePoolLabel]
kcData := cmKc.Data[HyperShiftConfigMapConfigKey]
mcoKc, err := kubeletconfig.DecodeFromData([]byte(kcData), r.Scheme)
if err != nil {
return nil, err
return nil, reconcileErrorHandler{err: err}
}

// nothing we care about, and we can't do much anyway
if mcoKc.Spec.KubeletConfig == nil {
klog.InfoS("detected KubeletConfig with empty payload, ignoring", "name", kcKey.Name)
return nil, &InvalidKubeletConfig{ObjectName: kcKey.Name}
return nil, reconcileErrorHandler{err: &InvalidKubeletConfig{ObjectName: kcKey.Name}}
}
return &kubeletConfigHandler{
ownerObject: cmKc,
Expand All @@ -312,9 +403,9 @@ func (r *KubeletConfigReconciler) makeKCHandlerForPlatform(ctx context.Context,
setCtrlRef: func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error {
return nil
},
}, nil
}, reconcileErrorHandler{}
}
return nil, fmt.Errorf("unsupported platform: %s", r.Platform)
return nil, reconcileErrorHandler{err: fmt.Errorf("unsupported platform: %s", r.Platform)}
}

func (r *KubeletConfigReconciler) deleteConfigMap(ctx context.Context, instance *nropv1.NUMAResourcesOperator, kcKey client.ObjectKey) (*corev1.ConfigMap, bool, error) {
Expand Down Expand Up @@ -373,6 +464,60 @@ func (r *KubeletConfigReconciler) numaResourcesOperatorToKubeletConfig(ctx conte
return requests
}

func (r *KubeletConfigReconciler) nodeGroupToMachineConfigPool(ctx context.Context, object client.Object) []reconcile.Request {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use the object argument?

var requests []reconcile.Request
if r.Platform != platform.OpenShift {
return requests
}

nro := &nropv1.NUMAResourcesOperator{}
if err := r.Client.Get(ctx, client.ObjectKey{Name: objectnames.DefaultNUMAResourcesOperatorCrName}, nro); err != nil {
klog.ErrorS(err, "failed to get NUMAResourcesOperator %v")
return requests
}
mcpList := &mcov1.MachineConfigPoolList{}
if err := r.Client.List(ctx, mcpList); err != nil {
klog.ErrorS(err, "failed to list MachineConfigPools %v")
}
mcpMap := make(map[string]mcov1.MachineConfigPool)
for _, mcp := range mcpList.Items {
mcpMap[mcp.Name] = mcp
}

ngstatus := nro.Status.NodeGroups
targetMCPs := sets.New[string]()
for _, ngstatus := range ngstatus {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't use a loop variable named like the collection it iterates on

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that I missed. Thanks for catching.

if mcp, ok := mcpMap[ngstatus.PoolName]; ok {
targetMCPs.Insert(mcp.Name)
}
}

kcList := &mcov1.KubeletConfigList{}
if err := r.Client.List(ctx, kcList); err != nil {
klog.ErrorS(err, "failed to list KubeletConfigs %v")
}
//map mcp to kubeletconfig requests
for _, mcpName := range targetMCPs.UnsortedList() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just do

for _, ngstatus := range nro.Status.NodeGroups {
    mcpObj, ok := mcpMap[ngstatus.PoolName]
    if !ok {
        continue
    }
    mcpLabels := labels.Set(mcpObj.Labels)
    ...

mcpLabels := labels.Set(mcpMap[mcpName].Labels)
for _, kc := range kcList.Items {
if kc.Spec.MachineConfigPoolSelector == nil {
continue
}
selector, err := metav1.LabelSelectorAsSelector(kc.Spec.MachineConfigPoolSelector)
if err != nil {
klog.ErrorS(err, "failed to parse MachineConfigPoolSelector", "kubeletconfig", kc.Name)
continue
}
if selector.Matches(mcpLabels) {
requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKey{
Name: kc.Name,
}})
}
}
}
return requests
}

func getDeletedOwner(kcKey client.ObjectKey, ownerConfigMaps []*corev1.ConfigMap) *corev1.ConfigMap {
for i := range ownerConfigMaps {
cm := ownerConfigMaps[i]
Expand Down
Loading
Loading