11package serverutil
22
33import (
4+ "context"
45 "crypto/tls"
6+ "errors"
57 "fmt"
68 "io"
79 "net"
@@ -13,7 +15,7 @@ import (
1315 "github.com/klauspost/compress/gzhttp"
1416 ctrl "sigs.k8s.io/controller-runtime"
1517 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
16- "sigs.k8s.io/controller-runtime/pkg/manager "
18+ "sigs.k8s.io/controller-runtime/pkg/healthz "
1719
1820 catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
1921 "github.com/operator-framework/operator-controller/internal/catalogd/storage"
@@ -27,49 +29,114 @@ type CatalogServerConfig struct {
2729 LocalStorage storage.Instance
2830}
2931
30- func AddCatalogServerToManager (mgr ctrl.Manager , cfg CatalogServerConfig , tlsFileWatcher * certwatcher.CertWatcher ) error {
31- listener , err := net .Listen ("tcp" , cfg .CatalogAddr )
32+ // AddCatalogServerToManager adds the catalog HTTP server to the manager and registers
33+ // a readiness check that passes only when this pod is the leader and actively serving.
34+ // The listener is created lazily inside Start() so non-leader pods never bind the port,
35+ // which ensures the readiness check correctly excludes them from Service endpoints.
36+ func AddCatalogServerToManager (mgr ctrl.Manager , cfg CatalogServerConfig , cw * certwatcher.CertWatcher ) error {
37+ shutdownTimeout := 30 * time .Second
38+ r := & catalogServerRunnable {
39+ cfg : cfg ,
40+ cw : cw ,
41+ server : & http.Server {
42+ Addr : cfg .CatalogAddr ,
43+ Handler : storageServerHandlerWrapped (mgr .GetLogger ().WithName ("catalogd-http-server" ), cfg ),
44+ ReadTimeout : 5 * time .Second ,
45+ WriteTimeout : 5 * time .Minute ,
46+ },
47+ shutdownTimeout : shutdownTimeout ,
48+ ready : make (chan struct {}),
49+ }
50+
51+ if err := mgr .Add (r ); err != nil {
52+ return fmt .Errorf ("error adding catalog server to manager: %w" , err )
53+ }
54+
55+ // Register a readiness check that passes only once Start() has been called (i.e.
56+ // this pod holds the leader lease and the catalog server is actively serving).
57+ // Non-leader pods never reach Start(), so they remain not-ready and are excluded
58+ // from Service endpoints — preventing catalog traffic from hitting a pod that
59+ // isn't serving the catalog port.
60+ if err := mgr .AddReadyzCheck ("catalog-server" , r .readyzCheck ()); err != nil {
61+ return fmt .Errorf ("error adding catalog server readiness check: %w" , err )
62+ }
63+
64+ return nil
65+ }
66+
67+ // catalogServerRunnable is a leader-only Runnable that binds the catalog HTTP port
68+ // lazily inside Start(), so non-leader pods never hold the listen socket.
69+ type catalogServerRunnable struct {
70+ cfg CatalogServerConfig
71+ cw * certwatcher.CertWatcher
72+ server * http.Server
73+ shutdownTimeout time.Duration
74+ // ready is closed by Start() once the server is about to begin serving.
75+ ready chan struct {}
76+ }
77+
78+ // NeedLeaderElection returns false so the catalog server starts on every pod
79+ // immediately, regardless of leadership. This is required for rolling updates:
80+ // if Start() were gated on leadership, a new pod could not win the leader lease
81+ // (held by the still-running old pod) and therefore could never pass the
82+ // catalog-server readiness check, deadlocking the rollout.
83+ //
84+ // Non-leader pods serve the catalog HTTP port but have an empty local cache
85+ // (only the leader's reconciler downloads catalog content), so requests to a
86+ // non-leader return 404. Callers are expected to retry.
87+ func (r * catalogServerRunnable ) NeedLeaderElection () bool { return false }
88+
89+ func (r * catalogServerRunnable ) Start (ctx context.Context ) error {
90+ listener , err := net .Listen ("tcp" , r .cfg .CatalogAddr )
3291 if err != nil {
3392 return fmt .Errorf ("error creating catalog server listener: %w" , err )
3493 }
3594
36- if cfg .CertFile != "" && cfg .KeyFile != "" {
37- // Use the passed certificate watcher instead of creating a new one
95+ if r .cfg .CertFile != "" && r .cfg .KeyFile != "" {
3896 config := & tls.Config {
39- GetCertificate : tlsFileWatcher .GetCertificate ,
97+ GetCertificate : r . cw .GetCertificate ,
4098 MinVersion : tls .VersionTLS12 ,
4199 }
42100 listener = tls .NewListener (listener , config )
43101 }
44102
45- shutdownTimeout := 30 * time .Second
46- catalogServer := manager.Server {
47- Name : "catalogs" ,
48- OnlyServeWhenLeader : true ,
49- Server : & http.Server {
50- Addr : cfg .CatalogAddr ,
51- Handler : storageServerHandlerWrapped (mgr .GetLogger ().WithName ("catalogd-http-server" ), cfg ),
52- ReadTimeout : 5 * time .Second ,
53- // TODO: Revert this to 10 seconds if/when the API
54- // evolves to have significantly smaller responses
55- WriteTimeout : 5 * time .Minute ,
56- },
57- ShutdownTimeout : & shutdownTimeout ,
58- Listener : listener ,
59- }
103+ // Signal readiness before blocking on Serve so the readiness probe passes promptly.
104+ close (r .ready )
60105
61- err = mgr .Add (& catalogServer )
62- if err != nil {
63- return fmt .Errorf ("error adding catalog server to manager: %w" , err )
64- }
106+ go func () {
107+ <- ctx .Done ()
108+ shutdownCtx := context .Background ()
109+ if r .shutdownTimeout > 0 {
110+ var cancel context.CancelFunc
111+ shutdownCtx , cancel = context .WithTimeout (shutdownCtx , r .shutdownTimeout )
112+ defer cancel ()
113+ }
114+ if err := r .server .Shutdown (shutdownCtx ); err != nil {
115+ // Shutdown errors are logged by the manager; nothing actionable here.
116+ _ = err
117+ }
118+ }()
65119
120+ if err := r .server .Serve (listener ); err != nil && ! errors .Is (err , http .ErrServerClosed ) {
121+ return err
122+ }
66123 return nil
67124}
68125
126+ // readyzCheck returns a healthz.Checker that passes once Start() has been called.
127+ func (r * catalogServerRunnable ) readyzCheck () healthz.Checker {
128+ return func (_ * http.Request ) error {
129+ select {
130+ case <- r .ready :
131+ return nil
132+ default :
133+ return fmt .Errorf ("catalog server not yet started" )
134+ }
135+ }
136+ }
137+
69138func logrLoggingHandler (l logr.Logger , handler http.Handler ) http.Handler {
70139 return handlers .CustomLoggingHandler (nil , handler , func (_ io.Writer , params handlers.LogFormatterParams ) {
71- // extract parameters used in apache common log format, but then log using `logr` to remain consistent
72- // with other loggers used in this codebase.
73140 username := "-"
74141 if params .URL .User != nil {
75142 if name := params .URL .User .Username (); name != "" {
0 commit comments