Skip to content

Commit b9f2280

Browse files
committed
WIP: controller: nrop: per-tree processing and progress
WIP: implement "vertical split" vs current "horizontal split" Signed-off-by: Francesco Romani <fromani@redhat.com>
1 parent cf7b39d commit b9f2280

3 files changed

Lines changed: 259 additions & 109 deletions

File tree

internal/controller/numaresourcesoperator_controller.go

Lines changed: 163 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import (
6464
"github.com/openshift-kni/numaresources-operator/pkg/images"
6565
"github.com/openshift-kni/numaresources-operator/pkg/loglevel"
6666
"github.com/openshift-kni/numaresources-operator/pkg/objectnames"
67+
"github.com/openshift-kni/numaresources-operator/pkg/objectstate"
6768
apistate "github.com/openshift-kni/numaresources-operator/pkg/objectstate/api"
6869
rtestate "github.com/openshift-kni/numaresources-operator/pkg/objectstate/rte"
6970
rteupdate "github.com/openshift-kni/numaresources-operator/pkg/objectupdate/rte"
@@ -243,82 +244,172 @@ func (r *NUMAResourcesOperatorReconciler) reconcileResourceAPI(ctx context.Conte
243244
return intreconcile.StepSuccess()
244245
}
245246

246-
func (r *NUMAResourcesOperatorReconciler) reconcileResourceMachineConfig(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) intreconcile.Step {
247-
// we need to sync machine configs first and wait for the MachineConfigPool updates
248-
// before checking additional components for updates
249-
waitByPool, pausedMCPNames, err := r.syncMachineConfigs(ctx, instance, existing, trees)
247+
type treeReconcileResult struct {
248+
tree nodegroupv1.Tree
249+
mcpStatuses []nropv1.MachineConfigPool
250+
dsInfo []poolDaemonSet
251+
pausedMCPNames []string
252+
step intreconcile.Step
253+
}
254+
255+
func (r *NUMAResourcesOperatorReconciler) reconcileTreeMachineConfig(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, tree nodegroupv1.Tree) treeReconcileResult {
256+
result := treeReconcileResult{tree: tree}
257+
singleTreeSlice := []nodegroupv1.Tree{tree}
258+
259+
waitByPool, pausedMCPNames, err := r.syncMachineConfigs(ctx, instance, existing, singleTreeSlice)
260+
result.pausedMCPNames = pausedMCPNames
250261
if err != nil {
251-
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "FailedMCSync", "Failed to set up machine configuration for worker nodes: %v", err)
252-
err = fmt.Errorf("failed to sync machine configs: %w", err)
253-
return intreconcile.StepFailed(err)
262+
result.step = intreconcile.StepFailed(fmt.Errorf("failed to sync machine configs: %w", err))
263+
return result
254264
}
255-
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "SuccessfulMCSync", "Enabled machine configuration for worker nodes")
256265

257-
// MCO needs to update the SELinux context removal and other stuff, and need to trigger a reboot.
258-
// It can take a while.
259-
mcpStatuses, mcpNamePending := syncMachineConfigPoolsStatuses(instance.Name, trees, r.ForwardMCPConds, waitByPool)
260-
instance.Status.MachineConfigPools = mcpStatuses
261-
instance.Status.Conditions = updateMachineConfigPoolPausedCondition(instance.Status.Conditions, instance.Generation, pausedMCPNames)
266+
mcpStatuses, mcpNamePending := syncMachineConfigPoolsStatuses(instance.Name, singleTreeSlice, r.ForwardMCPConds, waitByPool)
267+
result.mcpStatuses = mcpStatuses
262268

263269
if mcpNamePending != "" {
264-
// the Machine Config Pool still did not apply the machine config, wait for one minute
265-
return intreconcile.StepOngoing(numaResourcesRetryPeriod).WithReason("MachineConfigPoolIsUpdating").WithMessage(mcpNamePending + " is updating")
270+
result.step = intreconcile.StepOngoing(numaResourcesRetryPeriod).WithReason("MachineConfigPoolIsUpdating").WithMessage(mcpNamePending + " is updating")
271+
return result
266272
}
267-
instance.Status.MachineConfigPools = syncMachineConfigPoolNodeGroupConfigStatuses(instance.Status.MachineConfigPools, trees)
273+
result.mcpStatuses = syncMachineConfigPoolNodeGroupConfigStatuses(result.mcpStatuses, singleTreeSlice)
268274

269-
return intreconcile.StepSuccess()
275+
result.step = intreconcile.StepSuccess()
276+
return result
270277
}
271278

272-
func (r *NUMAResourcesOperatorReconciler) reconcileResourceDaemonSet(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) ([]poolDaemonSet, intreconcile.Step) {
273-
daemonSetsInfoPerPool, err := r.syncNUMAResourcesOperatorResources(ctx, instance, existing, trees)
279+
func (r *NUMAResourcesOperatorReconciler) reconcileTreeDaemonSet(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, tree nodegroupv1.Tree) treeReconcileResult {
280+
result := treeReconcileResult{tree: tree}
281+
282+
dsPoolPairs, err := r.syncTreeDaemonSet(ctx, instance, existing, r.RTEManifests, tree)
274283
if err != nil {
275-
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "FailedRTECreate", "Failed to create Resource-Topology-Exporter DaemonSets: %v", err)
276-
err = fmt.Errorf("FailedRTESync: %w", err)
277-
return nil, intreconcile.StepFailed(err)
284+
result.step = intreconcile.StepFailed(fmt.Errorf("FailedRTESync: %w", err))
285+
return result
278286
}
279287

280-
if len(daemonSetsInfoPerPool) == 0 {
281-
return nil, intreconcile.StepSuccess()
288+
dssWithReadyStatus, dsNamePending, err := r.syncDaemonSetsStatuses(ctx, r.Client, dsPoolPairs)
289+
result.dsInfo = dsPoolPairs
290+
if err != nil {
291+
result.step = intreconcile.StepFailed(err)
292+
return result
293+
}
294+
if dsNamePending != "" {
295+
result.step = intreconcile.StepOngoing(5 * time.Second).WithReason("DaemonSetIsUpdating").WithMessage(dsNamePending + " is updating")
296+
return result
282297
}
283298

284-
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "SuccessfulRTECreate", "Created Resource-Topology-Exporter DaemonSets")
299+
result.dsInfo = dsPoolPairs
300+
_ = dssWithReadyStatus // consumed via dsInfo for status aggregation
301+
result.step = intreconcile.StepSuccess()
302+
return result
303+
}
285304

286-
dssWithReadyStatus, dsNamePending, err := r.syncDaemonSetsStatuses(ctx, r.Client, daemonSetsInfoPerPool)
287-
instance.Status.DaemonSets = dssWithReadyStatus
288-
instance.Status.RelatedObjects = relatedobjects.ResourceTopologyExporter(r.Namespace, dssWithReadyStatus)
289-
if err != nil {
290-
return nil, intreconcile.StepFailed(err)
305+
func aggregateTreeResults(results []treeReconcileResult) (intreconcile.Step, []nropv1.MachineConfigPool, []poolDaemonSet, []string) {
306+
var allMCPStatuses []nropv1.MachineConfigPool
307+
var allDSInfo []poolDaemonSet
308+
var allPausedMCPNames []string
309+
310+
worstStep := intreconcile.StepSuccess()
311+
var messages []string
312+
313+
for _, result := range results {
314+
allMCPStatuses = append(allMCPStatuses, result.mcpStatuses...)
315+
allPausedMCPNames = append(allPausedMCPNames, result.pausedMCPNames...)
316+
317+
if result.step.Done() {
318+
allDSInfo = append(allDSInfo, result.dsInfo...)
319+
continue
320+
}
321+
322+
if result.step.Error != nil {
323+
messages = append(messages, result.step.Error.Error())
324+
} else if result.step.Result.RequeueAfter > 0 {
325+
messages = append(messages, result.step.ConditionInfo.Message)
326+
}
327+
328+
if shouldReplaceStep(worstStep, result.step) {
329+
worstStep = result.step
330+
}
291331
}
292-
if dsNamePending != "" {
293-
return nil, intreconcile.StepOngoing(5 * time.Second).WithReason("DaemonSetIsUpdating").WithMessage(dsNamePending + " is updating")
332+
333+
if !worstStep.Done() && len(messages) > 0 {
334+
worstStep = worstStep.WithMessage(strings.Join(messages, "; "))
294335
}
295336

296-
return daemonSetsInfoPerPool, intreconcile.StepSuccess()
337+
return worstStep, allMCPStatuses, allDSInfo, allPausedMCPNames
338+
}
339+
340+
func shouldReplaceStep(current, candidate intreconcile.Step) bool {
341+
if current.Done() {
342+
return true
343+
}
344+
if candidate.Error != nil && current.Error == nil {
345+
return true
346+
}
347+
if candidate.Result.RequeueAfter > 0 && current.Result.RequeueAfter > 0 {
348+
return candidate.Result.RequeueAfter < current.Result.RequeueAfter
349+
}
350+
return false
297351
}
298352

299353
func (r *NUMAResourcesOperatorReconciler) reconcileResource(ctx context.Context, instance *nropv1.NUMAResourcesOperator, trees []nodegroupv1.Tree) intreconcile.Step {
300354
if step := r.reconcileResourceAPI(ctx, instance, trees); step.EarlyStop() {
301355
return step
302356
}
303357

304-
existing := rtestate.FromClient(ctx, r.Client, r.Platform, r.RTEManifests, instance, trees, r.Namespace)
358+
existing := rtestate.FromClientShared(ctx, r.Client, r.Platform, r.RTEManifests, instance, r.Namespace)
305359

360+
if err := r.prepareAndApplySharedResources(ctx, instance, existing); err != nil {
361+
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "FailedRTECreate", "Failed to create Resource-Topology-Exporter shared resources: %v", err)
362+
return intreconcile.StepFailed(fmt.Errorf("FailedSharedResourceSync: %w", err))
363+
}
364+
365+
err := dangling.DeleteUnusedDaemonSets(r.Client, ctx, instance, trees)
366+
if err != nil {
367+
klog.ErrorS(err, "failed to deleted unused daemonsets")
368+
}
306369
if r.Platform == platform.OpenShift {
307-
if step := r.reconcileResourceMachineConfig(ctx, instance, existing, trees); step.EarlyStop() {
308-
return step
370+
err = dangling.DeleteUnusedMachineConfigs(r.Client, ctx, instance, trees)
371+
if err != nil {
372+
klog.ErrorS(err, "failed to deleted unused machineconfigs")
309373
}
310374
}
311375

312-
dsPerPool, step := r.reconcileResourceDaemonSet(ctx, instance, existing, trees)
313-
if step.EarlyStop() {
314-
return step
376+
var results []treeReconcileResult
377+
for _, tree := range trees {
378+
treeExisting := existing.ForTree(ctx, r.Client, tree)
379+
380+
if r.Platform == platform.OpenShift {
381+
mcResult := r.reconcileTreeMachineConfig(ctx, instance, treeExisting, tree)
382+
if mcResult.step.EarlyStop() {
383+
results = append(results, mcResult)
384+
continue
385+
}
386+
dsResult := r.reconcileTreeDaemonSet(ctx, instance, treeExisting, tree)
387+
dsResult.mcpStatuses = mcResult.mcpStatuses
388+
dsResult.pausedMCPNames = mcResult.pausedMCPNames
389+
results = append(results, dsResult)
390+
} else {
391+
dsResult := r.reconcileTreeDaemonSet(ctx, instance, treeExisting, tree)
392+
results = append(results, dsResult)
393+
}
315394
}
316395

317-
// all fields of NodeGroupStatus are required so publish the status only when all daemonset and MCPs are updated which
318-
// is a certain thing if we got to this point otherwise the function would have returned already
319-
instance.Status.NodeGroups = syncNodeGroupsStatus(instance, dsPerPool)
396+
step, allMCPStatuses, allDSInfo, allPausedMCPNames := aggregateTreeResults(results)
320397

321-
return intreconcile.StepSuccess()
398+
instance.Status.MachineConfigPools = allMCPStatuses
399+
instance.Status.Conditions = updateMachineConfigPoolPausedCondition(instance.Status.Conditions, instance.Generation, allPausedMCPNames)
400+
401+
dssReady := []nropv1.NamespacedName{}
402+
for _, dsInfo := range allDSInfo {
403+
dssReady = append(dssReady, dsInfo.DaemonSet)
404+
}
405+
instance.Status.DaemonSets = dssReady
406+
instance.Status.RelatedObjects = relatedobjects.ResourceTopologyExporter(r.Namespace, dssReady)
407+
408+
if step.Done() {
409+
instance.Status.NodeGroups = syncNodeGroupsStatus(instance, allDSInfo)
410+
}
411+
412+
return step
322413
}
323414

324415
func (r *NUMAResourcesOperatorReconciler) syncDaemonSetsStatuses(ctx context.Context, rd client.Reader, daemonSetsInfo []poolDaemonSet) ([]nropv1.NamespacedName, string, error) {
@@ -511,49 +602,55 @@ func getMachineConfigPoolStatusByName(mcpStatuses []nropv1.MachineConfigPool, na
511602
return nropv1.MachineConfigPool{Name: name}
512603
}
513604

514-
func (r *NUMAResourcesOperatorReconciler) syncNUMAResourcesOperatorResources(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, trees []nodegroupv1.Tree) ([]poolDaemonSet, error) {
515-
klog.V(4).InfoS("RTESync start", "trees", len(trees))
516-
defer klog.V(4).Info("RTESync stop")
517-
518-
err := dangling.DeleteUnusedDaemonSets(r.Client, ctx, instance, trees)
519-
if err != nil {
520-
klog.ErrorS(err, "failed to deleted unused daemonsets")
521-
}
522-
523-
if r.Platform == platform.OpenShift {
524-
err = dangling.DeleteUnusedMachineConfigs(r.Client, ctx, instance, trees)
605+
func (r *NUMAResourcesOperatorReconciler) applyObjectStates(ctx context.Context, instance *nropv1.NUMAResourcesOperator, objStates []objectstate.ObjectState) error {
606+
for _, objState := range objStates {
607+
if objState.Error != nil {
608+
klog.Warningf("error loading object: %v", objState.Error)
609+
}
610+
if objState.UpdateError != nil {
611+
return fmt.Errorf("failed to update (%s) %s/%s: %w", objState.Desired.GetObjectKind().GroupVersionKind(), objState.Desired.GetNamespace(), objState.Desired.GetName(), objState.UpdateError)
612+
}
613+
err := controllerutil.SetControllerReference(instance, objState.Desired, r.Scheme)
525614
if err != nil {
526-
klog.ErrorS(err, "failed to deleted unused machineconfigs")
615+
return fmt.Errorf("failed to set controller reference to %s %s: %w", objState.Desired.GetNamespace(), objState.Desired.GetName(), err)
616+
}
617+
_, _, err = apply.ApplyObject(ctx, r.Client, objState)
618+
if err != nil {
619+
return fmt.Errorf("failed to apply (%s) %s/%s: %w", objState.Desired.GetObjectKind().GroupVersionKind(), objState.Desired.GetNamespace(), objState.Desired.GetName(), err)
527620
}
528621
}
622+
return nil
623+
}
529624

625+
func (r *NUMAResourcesOperatorReconciler) prepareAndApplySharedResources(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests) error {
530626
rteupdate.DaemonSetRolloutSettings(r.RTEManifests.Core.DaemonSet)
531-
err = rteupdate.DaemonSetAffinitySettings(r.RTEManifests.Core.DaemonSet, r.RTEManifests.Core.DaemonSet.Spec.Template.Labels)
627+
err := rteupdate.DaemonSetAffinitySettings(r.RTEManifests.Core.DaemonSet, r.RTEManifests.Core.DaemonSet.Spec.Template.Labels)
532628
if err != nil {
533629
klog.ErrorS(err, "failed to update RTE affinity settings")
534630
}
535631

536-
dsPoolPairs := []poolDaemonSet{}
537-
538-
// using a slice of poolDaemonSet instead of a map because Go maps assignment order is not consistent and non-deterministic
539632
err = rteupdate.DaemonSetUserImageSettings(r.RTEManifests.Core.DaemonSet, instance.Spec.ExporterImage, r.Images.Preferred(), r.ImagePullPolicy)
540633
if err != nil {
541-
return dsPoolPairs, err
634+
return err
542635
}
543636

544637
err = rteupdate.DaemonSetPauseContainerSettings(r.RTEManifests.Core.DaemonSet)
545638
if err != nil {
546-
return dsPoolPairs, err
639+
return err
547640
}
548641

549642
err = loglevel.UpdatePodSpec(&r.RTEManifests.Core.DaemonSet.Spec.Template.Spec, manifests.ContainerNameRTE, instance.Spec.LogLevel)
550643
if err != nil {
551-
return dsPoolPairs, err
644+
return err
552645
}
553646

554647
rteupdate.SecurityContextConstraint(r.RTEManifests.Core.SecurityContextConstraint, true) // force to legacy context
555-
// SCC v2 needs no updates
556648

649+
return r.applyObjectStates(ctx, instance, existing.SharedState(r.RTEManifests))
650+
}
651+
652+
func (r *NUMAResourcesOperatorReconciler) syncTreeDaemonSet(ctx context.Context, instance *nropv1.NUMAResourcesOperator, existing *rtestate.ExistingManifests, mf rtestate.Manifests, tree nodegroupv1.Tree) ([]poolDaemonSet, error) {
653+
var dsPoolPairs []poolDaemonSet
557654
existing = existing.WithManifestsUpdater(func(poolName string, gdm *rtestate.GeneratedDesiredManifest) error {
558655
err := daemonsetUpdater(poolName, gdm, r.RTEMetricsTLS)
559656
if err != nil {
@@ -563,32 +660,13 @@ func (r *NUMAResourcesOperatorReconciler) syncNUMAResourcesOperatorResources(ctx
563660
return nil
564661
})
565662

566-
for _, objState := range existing.State(r.RTEManifests) {
567-
if objState.Error != nil {
568-
// We are likely in the bootstrap scenario. In this case, which is expected once, everything is fine.
569-
// If it happens past bootstrap, still carry on. We know what to do, and we do want to enforce the desired state.
570-
klog.Warningf("error loading object: %v", objState.Error)
571-
}
572-
if objState.UpdateError != nil {
573-
// this is an internal error. Should not happen. But if it happen, we don't want to send garbage to the cluster, so we abort
574-
return nil, fmt.Errorf("failed to update (%s) %s/%s: %w", objState.Desired.GetObjectKind().GroupVersionKind(), objState.Desired.GetNamespace(), objState.Desired.GetName(), err)
575-
}
576-
err := controllerutil.SetControllerReference(instance, objState.Desired, r.Scheme)
577-
if err != nil {
578-
return nil, fmt.Errorf("failed to set controller reference to %s %s: %w", objState.Desired.GetNamespace(), objState.Desired.GetName(), err)
579-
}
580-
_, _, err = apply.ApplyObject(ctx, r.Client, objState)
581-
if err != nil {
582-
return nil, fmt.Errorf("failed to apply (%s) %s/%s: %w", objState.Desired.GetObjectKind().GroupVersionKind(), objState.Desired.GetNamespace(), objState.Desired.GetName(), err)
583-
}
584-
}
585-
586-
if len(dsPoolPairs) < len(trees) {
587-
klog.Warningf("daemonset and tree size mismatch: expected %d got in daemonsets %d", len(trees), len(dsPoolPairs))
663+
if err := r.applyObjectStates(ctx, instance, existing.TreeState(mf, tree)); err != nil {
664+
return nil, err
588665
}
589666
return dsPoolPairs, nil
590667
}
591668

669+
592670
// SetupWithManager sets up the controller with the Manager.
593671
func (r *NUMAResourcesOperatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
594672
// we want to initiate reconcile loop only on change under labels or spec of the object

0 commit comments

Comments
 (0)