Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/pulsar-resources-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ spec:
- --leader-elect
- --resync-period={{ .Values.features.resyncPeriod | default 10 }}
- --retry-count={{ .Values.features.retryCount | default 5 }}
{{- if and .Values.features.maxConcurrentReconciles (gt (int .Values.features.maxConcurrentReconciles) 1) }}
- --max-concurrent-reconciles={{ .Values.features.maxConcurrentReconciles }}
{{- end }}
command:
- /manager
env:
Expand Down
2 changes: 2 additions & 0 deletions charts/pulsar-resources-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ features:
resyncPeriod: 10
# The number of retries in case of error.
retryCount: 5
# The maximum number of concurrent reconciles per controller.
maxConcurrentReconciles: 1

# -- It will override the value of label `app.kubernetes.io/name` on pod
nameOverride: ""
Expand Down
14 changes: 10 additions & 4 deletions controllers/rolebinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -152,9 +153,13 @@ func (r *RoleBindingReconciler) handleRoleBindingDeletion(ctx context.Context, r
// Delete the RoleBinding from the API server
err := rbClient.DeleteRoleBinding(ctx, roleBinding)
if err != nil {
logger.Error(err, "Failed to delete RoleBinding from API server")
r.updateRoleBindingStatus(ctx, roleBinding, err, "DeletionError", "Failed to delete RoleBinding from API server")
return ctrl.Result{}, err
if apierrors.IsNotFound(err) {
logger.Info("RoleBinding not found on API server, skipping deletion", "name", roleBinding.Name)
} else {
logger.Error(err, "Failed to delete RoleBinding from API server")
r.updateRoleBindingStatus(ctx, roleBinding, err, "DeletionError", "Failed to delete RoleBinding from API server")
return ctrl.Result{}, err
}
}

// Remove the finalizer
Expand Down Expand Up @@ -288,8 +293,9 @@ func (r *RoleBindingReconciler) findResourcesForConnection(ctx context.Context,
}

// SetupWithManager sets up the controller with the Manager.
func (r *RoleBindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *RoleBindingReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&resourcev1alpha1.RoleBinding{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&resourcev1alpha1.StreamNativeCloudConnection{},
handler.EnqueueRequestsFromMapFunc(r.findResourcesForConnection)).
Expand Down
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ func main() {
var probeAddr string
var resyncPeriod int
var retryCount int
var maxConcurrentReconciles int
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8443", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&resyncPeriod, "resync-period", 10, "resyncPeriod is the base frequency the informers are resynced.")
flag.IntVar(&retryCount, "retry-count", 5, "The number of retries in case of error.")
flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 1, "The maximum number of concurrent reconciles per controller.")
opts := k8szap.Options{}
opts.BindFlags(flag.CommandLine)
flag.Parse()
Expand Down Expand Up @@ -116,15 +118,14 @@ func main() {
os.Exit(1)
}

// TODO get MaxConcurrentReconciles from cmd params
if err = (&controllers.PulsarConnectionReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("PulsarConnection"),
Recorder: mgr.GetEventRecorderFor("pulsarconnection-controller"),
PulsarAdminCreator: admin.NewPulsarAdmin,
Retryer: utils.NewReconcileRetryer(retryCount, utils.NewEventSource(ctrl.Log.WithName("eventSource"))),
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PulsarConnection")
os.Exit(1)
}
Expand Down Expand Up @@ -206,7 +207,7 @@ func main() {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConnectionManager: connectionManager,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RoleBinding")
os.Exit(1)
}
Expand Down
Loading