@@ -30,6 +30,7 @@ import (
3030 "k8s.io/apimachinery/pkg/runtime"
3131 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3232 ctrl "sigs.k8s.io/controller-runtime"
33+ "sigs.k8s.io/controller-runtime/pkg/healthz"
3334 "sigs.k8s.io/controller-runtime/pkg/log/zap"
3435 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3536 mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
@@ -49,21 +50,34 @@ func init() {
4950 utilRuntimeMust (corev1alpha1 .AddToScheme (scheme ))
5051}
5152
53+ // options holds the konnector's runtime flags.
54+ type options struct {
55+ metricsAddr string
56+ probeAddr string
57+ leaderElect bool
58+ leaderElectionID string
59+ }
60+
5261func main () {
53- var metricsAddr string
54- flag .StringVar (& metricsAddr , "metrics-bind-address" , ":8085" , "address the metric endpoint binds to" )
62+ var o options
63+ flag .StringVar (& o .metricsAddr , "metrics-bind-address" , ":8085" , "address the metric endpoint binds to" )
64+ flag .StringVar (& o .probeAddr , "health-probe-bind-address" , ":8081" , "address the health/readiness probe endpoint binds to" )
65+ flag .BoolVar (& o .leaderElect , "leader-elect" , false ,
66+ "enable leader election, ensuring only one active konnector replica (required for HA / multiple replicas)" )
67+ flag .StringVar (& o .leaderElectionID , "leader-election-id" , "konnector.kube-bind.io" ,
68+ "name of the Lease used for leader election" )
5569 opts := zap.Options {Development : true }
5670 opts .BindFlags (flag .CommandLine )
5771 flag .Parse ()
5872 ctrl .SetLogger (zap .New (zap .UseFlagOptions (& opts )))
5973
60- if err := run (metricsAddr ); err != nil {
74+ if err := run (o ); err != nil {
6175 ctrl .Log .Error (err , "konnector exited with error" )
6276 os .Exit (1 )
6377 }
6478}
6579
66- func run (metricsAddr string ) error {
80+ func run (o options ) error {
6781 ctx := ctrl .SetupSignalHandler ()
6882 log := ctrl .Log .WithName ("konnector" )
6983
@@ -73,13 +87,25 @@ func run(metricsAddr string) error {
7387 }
7488
7589 // Local (consumer) manager: owns the core CRDs and the consumer-side caches.
90+ // Leader election gates all consumer-side controllers, so standby replicas
91+ // engage no provider clusters and run no syncers until they win the lease.
7692 localMgr , err := ctrl .NewManager (cfg , ctrl.Options {
77- Scheme : scheme ,
78- Metrics : metricsserver.Options {BindAddress : metricsAddr },
93+ Scheme : scheme ,
94+ Metrics : metricsserver.Options {BindAddress : o .metricsAddr },
95+ HealthProbeBindAddress : o .probeAddr ,
96+ LeaderElection : o .leaderElect ,
97+ LeaderElectionID : o .leaderElectionID ,
98+ LeaderElectionReleaseOnCancel : true ,
7999 })
80100 if err != nil {
81101 return err
82102 }
103+ if err := localMgr .AddHealthzCheck ("healthz" , healthz .Ping ); err != nil {
104+ return err
105+ }
106+ if err := localMgr .AddReadyzCheck ("readyz" , healthz .Ping ); err != nil {
107+ return err
108+ }
83109
84110 // Connection provider: each ready Connection becomes an engaged provider cluster.
85111 connProvider , err := provider .New (localMgr , provider.Options {})
0 commit comments