diff --git a/internal/controller/workloads/event.go b/internal/controller/workloads/event.go index f455dce29..4607fed1b 100644 --- a/internal/controller/workloads/event.go +++ b/internal/controller/workloads/event.go @@ -37,6 +37,14 @@ const ( // InvalidGangSchedulingAnnotations is emitted when group-gang-scheduling and // role-instance-gang-scheduling annotations are set simultaneously on the same RBG. InvalidGangSchedulingAnnotations = "InvalidGangSchedulingAnnotations" + // ReplicasOverriddenByAdapter is emitted when an external mutator (e.g. a + // `helm upgrade` whose chart no longer carries spec.roles[].replicas) sets + // spec.roles[i].replicas to a value that disagrees with a bound + // RoleBasedGroupScalingAdapter. The controller treats the adapter's value + // as authoritative and emits this event each reconcile that performs an + // override (so it fires once per drift episode, then stops as the adapter + // controller writes the value back). + ReplicasOverriddenByAdapter = "ReplicasOverriddenByAdapter" ) // rbg-scaling-adapter events diff --git a/internal/controller/workloads/rolebasedgroup_controller.go b/internal/controller/workloads/rolebasedgroup_controller.go index 8f8e75a57..575c1d165 100644 --- a/internal/controller/workloads/rolebasedgroup_controller.go +++ b/internal/controller/workloads/rolebasedgroup_controller.go @@ -158,6 +158,10 @@ func (r *RoleBasedGroupReconciler) Reconcile(ctx context.Context, req ctrl.Reque logger.Info("Finished reconciling", "duration", time.Since(start)) }() + if err := r.applyRBGSAReplicasOverride(ctx, rbg); err != nil { + return ctrl.Result{}, err + } + // Step 0: Pre-check validations if err := r.preCheck(ctx, rbg); err != nil { return ctrl.Result{}, err @@ -799,6 +803,68 @@ func buildScalingAdapterLabels(roleSpec *workloadsv1alpha2.RoleSpec, rbgName, ro return merged } +// applyRBGSAReplicasOverride mutates rbg in-memory so that, for each role +// with a bound RoleBasedGroupScalingAdapter, role.Replicas reflects the +// adapter's spec.replicas. Without this override a client-side merge from +// helm (or any other external mutator) that drops spec.roles[i].replicas +// would let the apiserver fill the field with the CRD default of 1, the +// rest of Reconcile would scale the underlying workload to 1, and pods +// would churn until the adapter controller wrote the correct value back. +// +// Callers that intentionally manage replicas without an autoscaler are +// unaffected: when scalingAdapter.enable is false, when the adapter is not +// yet bound, or when adapter.spec.replicas is nil, the role's own value is +// kept. +func (r *RoleBasedGroupReconciler) applyRBGSAReplicasOverride( + ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup, +) error { + logger := log.FromContext(ctx) + for i := range rbg.Spec.Roles { + role := &rbg.Spec.Roles[i] + if !scale.IsScalingAdapterEnable(role) { + continue + } + adapter := &workloadsv1alpha2.RoleBasedGroupScalingAdapter{} + adapterName := scale.GenerateScalingAdapterName(rbg.Name, role.Name) + if err := r.client.Get(ctx, types.NamespacedName{ + Namespace: rbg.Namespace, Name: adapterName, + }, adapter); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return fmt.Errorf("get scaling adapter %s/%s: %w", rbg.Namespace, adapterName, err) + } + if adapter.Status.Phase != constants.AdapterPhaseBound { + continue + } + if adapter.Spec.Replicas == nil { + continue + } + if role.Replicas != nil && *role.Replicas == *adapter.Spec.Replicas { + continue + } + var observed any = "" + if role.Replicas != nil { + observed = *role.Replicas + } + logger.Info( + "Overriding role.Replicas from bound RBGSA", + "role", role.Name, + "observed", observed, + "desired", *adapter.Spec.Replicas, + "adapter", adapterName, + ) + r.recorder.Eventf( + rbg, corev1.EventTypeWarning, ReplicasOverriddenByAdapter, + "Role %q replicas reset to RBGSA %q (desired=%d, observed=%v); "+ + "external mutator (e.g. helm upgrade) likely dropped spec.roles[].replicas", + role.Name, adapterName, *adapter.Spec.Replicas, observed, + ) + role.Replicas = ptr.To(*adapter.Spec.Replicas) + } + return nil +} + func (r *RoleBasedGroupReconciler) ReconcileScalingAdapter( ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup, roleSpec *workloadsv1alpha2.RoleSpec, ) error { diff --git a/internal/controller/workloads/rolebasedgroup_controller_test.go b/internal/controller/workloads/rolebasedgroup_controller_test.go index a402b1382..8725dc1fc 100644 --- a/internal/controller/workloads/rolebasedgroup_controller_test.go +++ b/internal/controller/workloads/rolebasedgroup_controller_test.go @@ -39,6 +39,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/rbgs/pkg/reconciler" @@ -1018,6 +1019,200 @@ func TestRoleBasedGroupReconciler_ReconcileScalingAdapter_LabelUpdate(t *testing } } +func TestRoleBasedGroupReconciler_applyRBGSAReplicasOverride(t *testing.T) { + testScheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(testScheme) + _ = workloadsv1alpha2.AddToScheme(testScheme) + + const ( + rbgName = "test-rbg" + ns = "default" + roleName = "leader" + ) + adapterName := scale.GenerateScalingAdapterName(rbgName, roleName) + + makeRBG := func(roleReplicas *int32, scalingAdapterEnable bool) *workloadsv1alpha2.RoleBasedGroup { + role := workloadsv1alpha2.RoleSpec{ + Name: roleName, + Replicas: roleReplicas, + } + if scalingAdapterEnable { + role.ScalingAdapter = &workloadsv1alpha2.ScalingAdapter{Enable: true} + } + return &workloadsv1alpha2.RoleBasedGroup{ + ObjectMeta: metav1.ObjectMeta{Name: rbgName, Namespace: ns}, + Spec: workloadsv1alpha2.RoleBasedGroupSpec{ + Roles: []workloadsv1alpha2.RoleSpec{role}, + }, + } + } + + makeAdapter := func(phase constants.AdapterPhase, replicas *int32) *workloadsv1alpha2.RoleBasedGroupScalingAdapter { + return &workloadsv1alpha2.RoleBasedGroupScalingAdapter{ + ObjectMeta: metav1.ObjectMeta{Name: adapterName, Namespace: ns}, + Spec: workloadsv1alpha2.RoleBasedGroupScalingAdapterSpec{ + Replicas: replicas, + }, + Status: workloadsv1alpha2.RoleBasedGroupScalingAdapterStatus{ + Phase: phase, + }, + } + } + + tests := []struct { + name string + rbg *workloadsv1alpha2.RoleBasedGroup + seedAdapter *workloadsv1alpha2.RoleBasedGroupScalingAdapter + expectedReplicas *int32 + expectEvent bool + wantErr bool + }{ + { + name: "scalingAdapter disabled: role.Replicas untouched", + rbg: makeRBG(ptr.To[int32](3), false), + seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](10)), + expectedReplicas: ptr.To[int32](3), + }, + { + name: "RBGSA missing: role.Replicas untouched", + rbg: makeRBG(ptr.To[int32](1), true), + seedAdapter: nil, + expectedReplicas: ptr.To[int32](1), + }, + { + name: "RBGSA unbound: role.Replicas untouched", + rbg: makeRBG(ptr.To[int32](1), true), + seedAdapter: makeAdapter(constants.AdapterPhaseNotBound, ptr.To[int32](10)), + expectedReplicas: ptr.To[int32](1), + }, + { + name: "RBGSA bound but spec.replicas nil: role.Replicas untouched", + rbg: makeRBG(ptr.To[int32](1), true), + seedAdapter: makeAdapter(constants.AdapterPhaseBound, nil), + expectedReplicas: ptr.To[int32](1), + }, + { + name: "RBGSA bound, replicas equal: no change", + rbg: makeRBG(ptr.To[int32](10), true), + seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](10)), + expectedReplicas: ptr.To[int32](10), + }, + { + name: "RBGSA bound, replicas differ: override to adapter value", + rbg: makeRBG(ptr.To[int32](1), true), + seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](10)), + expectedReplicas: ptr.To[int32](10), + expectEvent: true, + }, + { + name: "RBGSA bound, role.Replicas nil: override to adapter value", + rbg: makeRBG(nil, true), + seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](7)), + expectedReplicas: ptr.To[int32](7), + expectEvent: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + builder := fake.NewClientBuilder().WithScheme(testScheme) + if tt.seedAdapter != nil { + builder = builder.WithObjects(tt.seedAdapter) + } + fakeClient := builder.Build() + recorder := record.NewFakeRecorder(2) + r := &RoleBasedGroupReconciler{ + client: fakeClient, + apiReader: fakeClient, + scheme: testScheme, + recorder: recorder, + } + + err := r.applyRBGSAReplicasOverride(context.Background(), tt.rbg) + if (err != nil) != tt.wantErr { + t.Fatalf("applyRBGSAReplicasOverride() err=%v wantErr=%v", err, tt.wantErr) + } + + got := tt.rbg.Spec.Roles[0].Replicas + switch { + case tt.expectedReplicas == nil && got != nil: + t.Fatalf("expected nil replicas, got %d", *got) + case tt.expectedReplicas != nil && got == nil: + t.Fatalf("expected %d replicas, got nil", *tt.expectedReplicas) + case tt.expectedReplicas != nil && got != nil && *tt.expectedReplicas != *got: + t.Fatalf("expected %d replicas, got %d", *tt.expectedReplicas, *got) + } + + if tt.expectEvent { + select { + case ev := <-recorder.Events: + if !strings.Contains(ev, ReplicasOverriddenByAdapter) { + t.Errorf("expected event with reason %q, got %q", ReplicasOverriddenByAdapter, ev) + } + if !strings.Contains(ev, "Warning") { + t.Errorf("expected Warning event, got %q", ev) + } + default: + t.Errorf("expected an Event but none was recorded") + } + } else { + select { + case ev := <-recorder.Events: + t.Errorf("expected no Event, got %q", ev) + default: + } + } + }) + } +} + +func TestRoleBasedGroupReconciler_applyRBGSAReplicasOverride_GetError(t *testing.T) { + testScheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(testScheme) + _ = workloadsv1alpha2.AddToScheme(testScheme) + + rbg := &workloadsv1alpha2.RoleBasedGroup{ + ObjectMeta: metav1.ObjectMeta{Name: "rbg", Namespace: "default"}, + Spec: workloadsv1alpha2.RoleBasedGroupSpec{ + Roles: []workloadsv1alpha2.RoleSpec{ + { + Name: "leader", + Replicas: ptr.To[int32](3), + ScalingAdapter: &workloadsv1alpha2.ScalingAdapter{Enable: true}, + }, + }, + }, + } + + getErr := fmt.Errorf("transient apiserver error") + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithInterceptorFuncs(interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if _, ok := obj.(*workloadsv1alpha2.RoleBasedGroupScalingAdapter); ok { + return getErr + } + return c.Get(ctx, key, obj, opts...) + }, + }). + Build() + + r := &RoleBasedGroupReconciler{ + client: fakeClient, + apiReader: fakeClient, + scheme: testScheme, + recorder: record.NewFakeRecorder(1), + } + + err := r.applyRBGSAReplicasOverride(context.Background(), rbg) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "get scaling adapter") { + t.Fatalf("expected wrapped error mentioning 'get scaling adapter', got %v", err) + } +} + func TestRoleBasedGroupReconciler_CleanupOrphanedScalingAdapters(t *testing.T) { testScheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(testScheme)