Skip to content

Commit 74761c7

Browse files
authored
chore: backport (01/20/2026) (#1247)
2 parents 65e3d17 + 9548159 commit 74761c7

24 files changed

Lines changed: 1954 additions & 886 deletions

cmd/memberagent/main.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,19 @@ var (
7777
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
7878
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.")
7979
// TODO(weiweng): only keep enableV1Alpha1APIs for backward compatibility with helm charts. Remove soon.
80-
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
81-
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
82-
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
83-
region = flag.String("region", "", "The region where the member cluster resides.")
84-
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
85-
watchWorkWithPriorityQueue = flag.Bool("enable-watch-work-with-priority-queue", false, "If set, the apply_work controller will watch/reconcile work objects that are created new or have recent updates")
86-
watchWorkReconcileAgeMinutes = flag.Int("watch-work-reconcile-age", 60, "maximum age (in minutes) of work objects for apply_work controller to watch/reconcile")
87-
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
88-
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
89-
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
90-
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
91-
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
92-
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
93-
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
94-
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
80+
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
81+
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
82+
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
83+
region = flag.String("region", "", "The region where the member cluster resides.")
84+
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
85+
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
86+
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
87+
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
88+
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
89+
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
90+
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
91+
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
92+
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
9593

9694
// Work applier requeue rate limiter settings.
9795
workApplierRequeueRateLimiterAttemptsWithFixedDelay = flag.Int("work-applier-requeue-rate-limiter-attempts-with-fixed-delay", 1, "If set, the work applier will requeue work objects with a fixed delay for the specified number of attempts before switching to exponential backoff.")
@@ -102,6 +100,12 @@ var (
102100
workApplierRequeueRateLimiterExponentialBaseForFastBackoff = flag.Float64("work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff", 1.5, "If set, the work applier will start to back off fast at this factor after it completes the slow backoff stage, until it reaches the fast backoff delay cap. Its value should be larger than the base value for the slow backoff stage.")
103101
workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds = flag.Float64("work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds", 900, "If set, the work applier will not back off longer than this value in seconds when it is in the fast backoff stage.")
104102
workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs = flag.Bool("work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs", true, "If set, the rate limiter will skip the slow backoff stage and start fast backoff immediately for work objects that are available or have diff reported.")
103+
104+
// Work applier priority queue settings.
105+
enableWorkApplierPriorityQueue = flag.Bool("enable-work-applier-priority-queue", false, "If set, the work applier will use a priority queue to process work objects.")
106+
workApplierPriorityLinearEquationCoeffA = flag.Int("work-applier-priority-linear-equation-coeff-a", -3, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient A in the equation.")
107+
workApplierPriorityLinearEquationCoeffB = flag.Int("work-applier-priority-linear-equation-coeff-b", 100, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient B in the equation.")
108+
105109
// Azure property provider feature gates.
106110
isAzProviderCostPropertiesEnabled = flag.Bool("use-cost-properties-in-azure-provider", true, "If set, the Azure property provider will expose cost properties in the member cluster.")
107111
isAzProviderAvailableResPropertiesEnabled = flag.Bool("use-available-res-properties-in-azure-provider", true, "If set, the Azure property provider will expose available resources properties in the member cluster.")
@@ -133,6 +137,13 @@ func main() {
133137
klog.ErrorS(errors.New("either enable-v1alpha1-apis or enable-v1beta1-apis is required"), "Invalid APIs flags")
134138
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
135139
}
140+
// TO-DO (chenyu1): refactor the validation logic.
141+
if workApplierPriorityLinearEquationCoeffA == nil || *workApplierPriorityLinearEquationCoeffA >= 0 {
142+
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffA is set incorrectly; must use a value less than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffA")
143+
}
144+
if workApplierPriorityLinearEquationCoeffB == nil || *workApplierPriorityLinearEquationCoeffB <= 0 {
145+
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffB is set incorrectly; must use a value greater than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffB")
146+
}
136147

137148
hubURL := os.Getenv("HUB_SERVER_URL")
138149

@@ -373,7 +384,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
373384
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
374385
return err
375386
}
376-
// create the work controller, so we can pass it to the internal member cluster reconciler
387+
// Set up the work applier. Note that it is referenced by the InternalMemberCluster controller.
377388

378389
// Set up the requeue rate limiter for the work applier.
379390
//
@@ -413,7 +424,8 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
413424
*workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs,
414425
)
415426

416-
workController := workapplier.NewReconciler(
427+
workApplier := workapplier.NewReconciler(
428+
"work-applier",
417429
hubMgr.GetClient(),
418430
targetNS,
419431
spokeDynamicClient,
@@ -426,12 +438,13 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
426438
// Use the default worker count (4) for parallelized manifest processing.
427439
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
428440
time.Minute*time.Duration(*deletionWaitTime),
429-
*watchWorkWithPriorityQueue,
430-
*watchWorkReconcileAgeMinutes,
431441
requeueRateLimiter,
442+
*enableWorkApplierPriorityQueue,
443+
workApplierPriorityLinearEquationCoeffA,
444+
workApplierPriorityLinearEquationCoeffB,
432445
)
433446

434-
if err = workController.SetupWithManager(hubMgr); err != nil {
447+
if err = workApplier.SetupWithManager(hubMgr); err != nil {
435448
klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work")
436449
return err
437450
}
@@ -459,7 +472,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
459472
ctx,
460473
hubMgr.GetClient(),
461474
memberMgr.GetConfig(), memberMgr.GetClient(),
462-
workController,
475+
workApplier,
463476
pp)
464477
if err != nil {
465478
klog.ErrorS(err, "Failed to create InternalMemberCluster v1beta1 reconciler")

pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {
379379

380380
// This controller is created for testing purposes only; no reconciliation loop is actually
381381
// run.
382-
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
382+
workApplier1 = workapplier.NewReconciler("work-applier-1", hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)
383383

384384
propertyProvider1 = &manuallyUpdatedProvider{}
385385
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
@@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {
402402

403403
// This controller is created for testing purposes only; no reconciliation loop is actually
404404
// run.
405-
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
405+
workApplier2 = workapplier.NewReconciler("work-applier-2", hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)
406406

407407
member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
408408
Expect(err).NotTo(HaveOccurred())

pkg/controllers/membercluster/v1beta1/membercluster_controller.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,14 @@ func (r *Reconciler) syncRoleBinding(ctx context.Context, mc *clusterv1beta1.Mem
426426
Name: roleName,
427427
},
428428
}
429+
// For User and Group kind, the APIGroup is defaulted to rbac.authorization.k8s.io if not set.
430+
// Reference: https://pkg.go.dev/k8s.io/api/rbac/v1#Subject
431+
for i := range expectedRoleBinding.Subjects {
432+
subj := &expectedRoleBinding.Subjects[i]
433+
if subj.APIGroup == "" && (subj.Kind == rbacv1.GroupKind || subj.Kind == rbacv1.UserKind) {
434+
subj.APIGroup = rbacv1.GroupName
435+
}
436+
}
429437

430438
// Creates role binding if not found.
431439
var currentRoleBinding rbacv1.RoleBinding

pkg/controllers/membercluster/v1beta1/membercluster_controller_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,11 @@ func TestSyncRole(t *testing.T) {
355355

356356
func TestSyncRoleBinding(t *testing.T) {
357357
identity := rbacv1.Subject{
358+
APIGroup: "rbac.authorization.k8s.io",
359+
Kind: "User",
360+
Name: "MemberClusterIdentity",
361+
}
362+
identityWithoutGroup := rbacv1.Subject{
358363
Kind: "User",
359364
Name: "MemberClusterIdentity",
360365
}
@@ -428,6 +433,40 @@ func TestSyncRoleBinding(t *testing.T) {
428433
roleName: "fleet-role-mc1",
429434
wantedError: "",
430435
},
436+
"identity without APIGroup should not trigger roleBinding reconcile": {
437+
r: &Reconciler{
438+
Client: &test.MockClient{
439+
MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error {
440+
roleRef := rbacv1.RoleRef{
441+
APIGroup: rbacv1.GroupName,
442+
Kind: "Role",
443+
Name: "fleet-role-mc1",
444+
}
445+
o := obj.(*rbacv1.RoleBinding)
446+
*o = rbacv1.RoleBinding{
447+
TypeMeta: metav1.TypeMeta{
448+
Kind: "RoleBinding",
449+
APIVersion: rbacv1.SchemeGroupVersion.String(),
450+
},
451+
ObjectMeta: metav1.ObjectMeta{
452+
Name: "fleet-rolebinding-mc1",
453+
Namespace: namespace1,
454+
},
455+
Subjects: []rbacv1.Subject{identity}, // Returned roleBinding has APIGroup set.
456+
RoleRef: roleRef,
457+
}
458+
return nil
459+
},
460+
},
461+
},
462+
memberCluster: &clusterv1beta1.MemberCluster{
463+
ObjectMeta: metav1.ObjectMeta{Name: "mc1"},
464+
Spec: clusterv1beta1.MemberClusterSpec{Identity: identityWithoutGroup},
465+
},
466+
namespaceName: namespace1,
467+
roleName: "fleet-role-mc1",
468+
wantedError: "",
469+
},
431470
"role binding but with diff": {
432471
r: &Reconciler{
433472
Client: &test.MockClient{

pkg/controllers/updaterun/controller.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24-
"strconv"
2524
"time"
2625

2726
"github.com/prometheus/client_golang/prometheus"
@@ -51,9 +50,10 @@ import (
5150
var (
5251
// errStagedUpdatedAborted is the error when the updateRun is aborted.
5352
errStagedUpdatedAborted = fmt.Errorf("cannot continue the updateRun")
54-
// errInitializedFailed is the error when the updateRun fails to initialize.
55-
// It is a wrapped error of errStagedUpdatedAborted, because some initialization functions are reused in the validation step.
56-
errInitializedFailed = fmt.Errorf("%w: failed to initialize the updateRun", errStagedUpdatedAborted)
53+
// errValidationFailed is the error when the updateRun fails validation.
54+
// It is a wrapped error of errStagedUpdatedAborted, because we perform validation during initialization
55+
// and subsequent reconciliations where initialization is skipped.
56+
errValidationFailed = fmt.Errorf("%w: failed to validate the updateRun", errStagedUpdatedAborted)
5757
)
5858

5959
// Reconciler reconciles an updateRun object.
@@ -125,8 +125,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
125125
var initErr error
126126
if toBeUpdatedBindings, toBeDeletedBindings, initErr = r.initialize(ctx, updateRun); initErr != nil {
127127
klog.ErrorS(initErr, "Failed to initialize the updateRun", "updateRun", runObjRef)
128-
// errInitializedFailed cannot be retried.
129-
if errors.Is(initErr, errInitializedFailed) {
128+
// errStagedUpdatedAborted cannot be retried.
129+
if errors.Is(initErr, errStagedUpdatedAborted) {
130130
return runtime.Result{}, r.recordInitializationFailed(ctx, updateRun, initErr.Error())
131131
}
132132
return runtime.Result{}, initErr
@@ -488,26 +488,26 @@ func handleApprovalRequestDelete(obj client.Object, q workqueue.TypedRateLimitin
488488
// emitUpdateRunStatusMetric emits the update run status metric based on status conditions in the updateRun.
489489
func emitUpdateRunStatusMetric(updateRun placementv1beta1.UpdateRunObj) {
490490
generation := updateRun.GetGeneration()
491-
genStr := strconv.FormatInt(generation, 10)
491+
state := updateRun.GetUpdateRunSpec().State
492492

493493
updateRunStatus := updateRun.GetUpdateRunStatus()
494494
succeedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
495495
if succeedCond != nil && succeedCond.ObservedGeneration == generation {
496-
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), genStr,
496+
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), string(state),
497497
string(placementv1beta1.StagedUpdateRunConditionSucceeded), string(succeedCond.Status), succeedCond.Reason).SetToCurrentTime()
498498
return
499499
}
500500

501501
progressingCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
502502
if progressingCond != nil && progressingCond.ObservedGeneration == generation {
503-
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), genStr,
503+
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), string(state),
504504
string(placementv1beta1.StagedUpdateRunConditionProgressing), string(progressingCond.Status), progressingCond.Reason).SetToCurrentTime()
505505
return
506506
}
507507

508508
initializedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
509509
if initializedCond != nil && initializedCond.ObservedGeneration == generation {
510-
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), genStr,
510+
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), string(state),
511511
string(placementv1beta1.StagedUpdateRunConditionInitialized), string(initializedCond.Status), initializedCond.Reason).SetToCurrentTime()
512512
return
513513
}

0 commit comments

Comments
 (0)