Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/controller/workloads/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ const (
// InvalidGangSchedulingAnnotations is emitted when group-gang-scheduling and
// role-instance-gang-scheduling annotations are set simultaneously on the same RBG.
InvalidGangSchedulingAnnotations = "InvalidGangSchedulingAnnotations"
// ReplicasOverriddenByAdapter is emitted when an external mutator (e.g. a
// `helm upgrade` whose chart no longer carries spec.roles[].replicas) sets
// spec.roles[i].replicas to a value that disagrees with a bound
// RoleBasedGroupScalingAdapter. The controller treats the adapter's value
// as authoritative and emits this event each reconcile that performs an
// override (so it fires once per drift episode, then stops as the adapter
// controller writes the value back).
ReplicasOverriddenByAdapter = "ReplicasOverriddenByAdapter"
)

// rbg-scaling-adapter events
Expand Down
66 changes: 66 additions & 0 deletions internal/controller/workloads/rolebasedgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (r *RoleBasedGroupReconciler) Reconcile(ctx context.Context, req ctrl.Reque
logger.Info("Finished reconciling", "duration", time.Since(start))
}()

if err := r.applyRBGSAReplicasOverride(ctx, rbg); err != nil {
return ctrl.Result{}, err
}

// Step 0: Pre-check validations
if err := r.preCheck(ctx, rbg); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -799,6 +803,68 @@ func buildScalingAdapterLabels(roleSpec *workloadsv1alpha2.RoleSpec, rbgName, ro
return merged
}

// applyRBGSAReplicasOverride mutates rbg in-memory so that, for each role
// with a bound RoleBasedGroupScalingAdapter, role.Replicas reflects the
// adapter's spec.replicas. Without this override a client-side merge from
// helm (or any other external mutator) that drops spec.roles[i].replicas
// would let the apiserver fill the field with the CRD default of 1, the
// rest of Reconcile would scale the underlying workload to 1, and pods
// would churn until the adapter controller wrote the correct value back.
//
// Callers that intentionally manage replicas without an autoscaler are
// unaffected: when scalingAdapter.enable is false, when the adapter is not
// yet bound, or when adapter.spec.replicas is nil, the role's own value is
// kept.
func (r *RoleBasedGroupReconciler) applyRBGSAReplicasOverride(
ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup,
) error {
logger := log.FromContext(ctx)
for i := range rbg.Spec.Roles {
role := &rbg.Spec.Roles[i]
if !scale.IsScalingAdapterEnable(role) {
continue
}
adapter := &workloadsv1alpha2.RoleBasedGroupScalingAdapter{}
adapterName := scale.GenerateScalingAdapterName(rbg.Name, role.Name)
if err := r.client.Get(ctx, types.NamespacedName{
Namespace: rbg.Namespace, Name: adapterName,
}, adapter); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return fmt.Errorf("get scaling adapter %s/%s: %w", rbg.Namespace, adapterName, err)
}
Comment on lines +827 to +836
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyRBGSAReplicasOverride reads the scaling adapter using r.client.Get, which may be served from the controller-runtime cache. If the cache is stale (or the informer hasn’t observed the latest update yet), this can override role.Replicas to an out-of-date adapter value and potentially scale workloads incorrectly. Consider using r.apiReader.Get (similar to updateRBGStatus, which explicitly bypasses cache for freshness) for this read so the adapter is treated as the authoritative source of truth.

Copilot uses AI. Check for mistakes.
if adapter.Status.Phase != constants.AdapterPhaseBound {
continue
}
if adapter.Spec.Replicas == nil {
continue
}
if role.Replicas != nil && *role.Replicas == *adapter.Spec.Replicas {
continue
}
var observed any = "<nil>"
if role.Replicas != nil {
observed = *role.Replicas
}
logger.Info(
"Overriding role.Replicas from bound RBGSA",
"role", role.Name,
"observed", observed,
"desired", *adapter.Spec.Replicas,
"adapter", adapterName,
)
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, ReplicasOverriddenByAdapter,
"Role %q replicas reset to RBGSA %q (desired=%d, observed=%v); "+
"external mutator (e.g. helm upgrade) likely dropped spec.roles[].replicas",
role.Name, adapterName, *adapter.Spec.Replicas, observed,
)
role.Replicas = ptr.To(*adapter.Spec.Replicas)
}
return nil
}

func (r *RoleBasedGroupReconciler) ReconcileScalingAdapter(
ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup, roleSpec *workloadsv1alpha2.RoleSpec,
) error {
Expand Down
195 changes: 195 additions & 0 deletions internal/controller/workloads/rolebasedgroup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/rbgs/pkg/reconciler"
Expand Down Expand Up @@ -1018,6 +1019,200 @@ func TestRoleBasedGroupReconciler_ReconcileScalingAdapter_LabelUpdate(t *testing
}
}

func TestRoleBasedGroupReconciler_applyRBGSAReplicasOverride(t *testing.T) {
testScheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(testScheme)
_ = workloadsv1alpha2.AddToScheme(testScheme)

const (
rbgName = "test-rbg"
ns = "default"
roleName = "leader"
)
adapterName := scale.GenerateScalingAdapterName(rbgName, roleName)

makeRBG := func(roleReplicas *int32, scalingAdapterEnable bool) *workloadsv1alpha2.RoleBasedGroup {
role := workloadsv1alpha2.RoleSpec{
Name: roleName,
Replicas: roleReplicas,
}
if scalingAdapterEnable {
role.ScalingAdapter = &workloadsv1alpha2.ScalingAdapter{Enable: true}
}
return &workloadsv1alpha2.RoleBasedGroup{
ObjectMeta: metav1.ObjectMeta{Name: rbgName, Namespace: ns},
Spec: workloadsv1alpha2.RoleBasedGroupSpec{
Roles: []workloadsv1alpha2.RoleSpec{role},
},
}
}

makeAdapter := func(phase constants.AdapterPhase, replicas *int32) *workloadsv1alpha2.RoleBasedGroupScalingAdapter {
return &workloadsv1alpha2.RoleBasedGroupScalingAdapter{
ObjectMeta: metav1.ObjectMeta{Name: adapterName, Namespace: ns},
Spec: workloadsv1alpha2.RoleBasedGroupScalingAdapterSpec{
Replicas: replicas,
},
Status: workloadsv1alpha2.RoleBasedGroupScalingAdapterStatus{
Phase: phase,
},
}
}

tests := []struct {
name string
rbg *workloadsv1alpha2.RoleBasedGroup
seedAdapter *workloadsv1alpha2.RoleBasedGroupScalingAdapter
expectedReplicas *int32
expectEvent bool
wantErr bool
}{
{
name: "scalingAdapter disabled: role.Replicas untouched",
rbg: makeRBG(ptr.To[int32](3), false),
seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](10)),
expectedReplicas: ptr.To[int32](3),
},
{
name: "RBGSA missing: role.Replicas untouched",
rbg: makeRBG(ptr.To[int32](1), true),
seedAdapter: nil,
expectedReplicas: ptr.To[int32](1),
},
{
name: "RBGSA unbound: role.Replicas untouched",
rbg: makeRBG(ptr.To[int32](1), true),
seedAdapter: makeAdapter(constants.AdapterPhaseNotBound, ptr.To[int32](10)),
expectedReplicas: ptr.To[int32](1),
},
{
name: "RBGSA bound but spec.replicas nil: role.Replicas untouched",
rbg: makeRBG(ptr.To[int32](1), true),
seedAdapter: makeAdapter(constants.AdapterPhaseBound, nil),
expectedReplicas: ptr.To[int32](1),
},
{
name: "RBGSA bound, replicas equal: no change",
rbg: makeRBG(ptr.To[int32](10), true),
seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](10)),
expectedReplicas: ptr.To[int32](10),
},
{
name: "RBGSA bound, replicas differ: override to adapter value",
rbg: makeRBG(ptr.To[int32](1), true),
seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](10)),
expectedReplicas: ptr.To[int32](10),
expectEvent: true,
},
{
name: "RBGSA bound, role.Replicas nil: override to adapter value",
rbg: makeRBG(nil, true),
seedAdapter: makeAdapter(constants.AdapterPhaseBound, ptr.To[int32](7)),
expectedReplicas: ptr.To[int32](7),
expectEvent: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
builder := fake.NewClientBuilder().WithScheme(testScheme)
if tt.seedAdapter != nil {
builder = builder.WithObjects(tt.seedAdapter)
}
fakeClient := builder.Build()
recorder := record.NewFakeRecorder(2)
r := &RoleBasedGroupReconciler{
client: fakeClient,
apiReader: fakeClient,
scheme: testScheme,
recorder: recorder,
}

err := r.applyRBGSAReplicasOverride(context.Background(), tt.rbg)
if (err != nil) != tt.wantErr {
t.Fatalf("applyRBGSAReplicasOverride() err=%v wantErr=%v", err, tt.wantErr)
}

got := tt.rbg.Spec.Roles[0].Replicas
switch {
case tt.expectedReplicas == nil && got != nil:
t.Fatalf("expected nil replicas, got %d", *got)
case tt.expectedReplicas != nil && got == nil:
t.Fatalf("expected %d replicas, got nil", *tt.expectedReplicas)
case tt.expectedReplicas != nil && got != nil && *tt.expectedReplicas != *got:
t.Fatalf("expected %d replicas, got %d", *tt.expectedReplicas, *got)
}

if tt.expectEvent {
select {
case ev := <-recorder.Events:
if !strings.Contains(ev, ReplicasOverriddenByAdapter) {
t.Errorf("expected event with reason %q, got %q", ReplicasOverriddenByAdapter, ev)
}
if !strings.Contains(ev, "Warning") {
t.Errorf("expected Warning event, got %q", ev)
}
default:
t.Errorf("expected an Event but none was recorded")
}
} else {
select {
case ev := <-recorder.Events:
t.Errorf("expected no Event, got %q", ev)
default:
}
}
})
}
}

func TestRoleBasedGroupReconciler_applyRBGSAReplicasOverride_GetError(t *testing.T) {
testScheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(testScheme)
_ = workloadsv1alpha2.AddToScheme(testScheme)

rbg := &workloadsv1alpha2.RoleBasedGroup{
ObjectMeta: metav1.ObjectMeta{Name: "rbg", Namespace: "default"},
Spec: workloadsv1alpha2.RoleBasedGroupSpec{
Roles: []workloadsv1alpha2.RoleSpec{
{
Name: "leader",
Replicas: ptr.To[int32](3),
ScalingAdapter: &workloadsv1alpha2.ScalingAdapter{Enable: true},
},
},
},
}

getErr := fmt.Errorf("transient apiserver error")
fakeClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithInterceptorFuncs(interceptor.Funcs{
Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
if _, ok := obj.(*workloadsv1alpha2.RoleBasedGroupScalingAdapter); ok {
return getErr
}
return c.Get(ctx, key, obj, opts...)
},
}).
Build()

r := &RoleBasedGroupReconciler{
client: fakeClient,
apiReader: fakeClient,
scheme: testScheme,
recorder: record.NewFakeRecorder(1),
}

err := r.applyRBGSAReplicasOverride(context.Background(), rbg)
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "get scaling adapter") {
t.Fatalf("expected wrapped error mentioning 'get scaling adapter', got %v", err)
}
Comment on lines +1207 to +1213
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test claims (in the PR description) to verify that the returned error preserves the original via %w, but it only asserts on the error string. To actually validate wrapping, assert errors.Is(err, getErr) (or errors.Unwrap(err) == getErr) in addition to the message check.

Copilot uses AI. Check for mistakes.
}

func TestRoleBasedGroupReconciler_CleanupOrphanedScalingAdapters(t *testing.T) {
testScheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(testScheme)
Expand Down
Loading