Skip to content

Commit aff926f

Browse files
committed
feat: add configurable max-concurrent-reconciles flag
Add --max-concurrent-reconciles CLI flag (default 1) to control the number of concurrent reconciles per controller. This is exposed via Helm values as features.maxConcurrentReconciles. Applied to PulsarConnection and RoleBinding controllers. Customers with many RoleBindings (e.g. 85+) can increase this to speed up reconciliation significantly.
1 parent 339a62a commit aff926f

4 files changed

Lines changed: 10 additions & 4 deletions

File tree

charts/pulsar-resources-operator/templates/deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ spec:
6262
- --leader-elect
6363
- --resync-period={{ .Values.features.resyncPeriod | default 10 }}
6464
- --retry-count={{ .Values.features.retryCount | default 5 }}
65+
- --max-concurrent-reconciles={{ .Values.features.maxConcurrentReconciles | default 1 }}
6566
command:
6667
- /manager
6768
env:

charts/pulsar-resources-operator/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ features:
4747
resyncPeriod: 10
4848
# The number of retries in case of error.
4949
retryCount: 5
50+
# The maximum number of concurrent reconciles per controller.
51+
maxConcurrentReconciles: 1
5052

5153
# -- It will override the value of label `app.kubernetes.io/name` on pod
5254
nameOverride: ""

controllers/rolebinding_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
ctrl "sigs.k8s.io/controller-runtime"
3030
"sigs.k8s.io/controller-runtime/pkg/builder"
3131
"sigs.k8s.io/controller-runtime/pkg/client"
32+
"sigs.k8s.io/controller-runtime/pkg/controller"
3233
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3334
"sigs.k8s.io/controller-runtime/pkg/handler"
3435
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -292,8 +293,9 @@ func (r *RoleBindingReconciler) findResourcesForConnection(ctx context.Context,
292293
}
293294

294295
// SetupWithManager sets up the controller with the Manager.
295-
func (r *RoleBindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
296+
func (r *RoleBindingReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
296297
return ctrl.NewControllerManagedBy(mgr).
298+
WithOptions(options).
297299
For(&resourcev1alpha1.RoleBinding{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
298300
Watches(&resourcev1alpha1.StreamNativeCloudConnection{},
299301
handler.EnqueueRequestsFromMapFunc(r.findResourcesForConnection)).

main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,15 @@ func main() {
6464
var probeAddr string
6565
var resyncPeriod int
6666
var retryCount int
67+
var maxConcurrentReconciles int
6768
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8443", "The address the metric endpoint binds to.")
6869
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
6970
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
7071
"Enable leader election for controller manager. "+
7172
"Enabling this will ensure there is only one active controller manager.")
7273
flag.IntVar(&resyncPeriod, "resync-period", 10, "resyncPeriod is the base frequency the informers are resynced.")
7374
flag.IntVar(&retryCount, "retry-count", 5, "The number of retries in case of error.")
75+
flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 1, "The maximum number of concurrent reconciles per controller.")
7476
opts := k8szap.Options{}
7577
opts.BindFlags(flag.CommandLine)
7678
flag.Parse()
@@ -116,15 +118,14 @@ func main() {
116118
os.Exit(1)
117119
}
118120

119-
// TODO get MaxConcurrentReconciles from cmd params
120121
if err = (&controllers.PulsarConnectionReconciler{
121122
Client: mgr.GetClient(),
122123
Scheme: mgr.GetScheme(),
123124
Log: ctrl.Log.WithName("controllers").WithName("PulsarConnection"),
124125
Recorder: mgr.GetEventRecorderFor("pulsarconnection-controller"),
125126
PulsarAdminCreator: admin.NewPulsarAdmin,
126127
Retryer: utils.NewReconcileRetryer(retryCount, utils.NewEventSource(ctrl.Log.WithName("eventSource"))),
127-
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
128+
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
128129
setupLog.Error(err, "unable to create controller", "controller", "PulsarConnection")
129130
os.Exit(1)
130131
}
@@ -206,7 +207,7 @@ func main() {
206207
Client: mgr.GetClient(),
207208
Scheme: mgr.GetScheme(),
208209
ConnectionManager: connectionManager,
209-
}).SetupWithManager(mgr); err != nil {
210+
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
210211
setupLog.Error(err, "unable to create controller", "controller", "RoleBinding")
211212
os.Exit(1)
212213
}

0 commit comments

Comments
 (0)