Skip to content

Commit e12d90b

Browse files
tmshortclaude
andauthored
🐛 Fix catalogd ha readiness (#2674)
* fix(catalogd): bind catalog HTTP port lazily; add readiness check The catalog HTTP server has OnlyServeWhenLeader: true, so only the leader pod should serve catalog content. Previously, net.Listen was called eagerly at startup for all pods: the listen socket was bound on non-leaders even though http.Serve was never called, causing TCP connections to queue without being served. With replicas > 1 this made ~50% of catalog content requests fail silently. Replace manager.Server with a custom Runnable (catalogServerRunnable) in serverutil that: - Binds the catalog port lazily inside Start(), which is only called on the leader by controller-runtime's leader election machinery. - Closes a ready channel once the listener is established, and registers a channel-select readiness check via AddReadyzCheck so non-leader pods fail the /readyz probe and are excluded from Service endpoints. This keeps cmd/catalogd/main.go health/readiness setup identical to cmd/operator-controller/main.go (healthz.Ping for both liveness and readiness); the catalog-server readiness check is an implementation detail of serverutil.AddCatalogServerToManager. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(experimental): run catalogd and operator-controller with 2 replicas The experimental e2e suite uses a 2-node kind cluster, making it a natural fit to validate HA behaviour. Set replicas=2 for both components in helm/experimental.yaml so the experimental and experimental-e2e manifests exercise the multi-replica path end-to-end. This is safe for operator-controller (no leader-only HTTP servers) and for catalogd now that the catalog server starts on all pods via NeedLeaderElection=false, preventing the rolling-update deadlock that would arise if the server were leader-only. Also adds a @CatalogdHA experimental e2e scenario that force-deletes the catalogd leader pod and verifies that a new leader is elected and the catalog resumes serving. The scenario is gated on a 2-node cluster (detected in BeforeSuite and reflected in the featureGates map), so it is automatically skipped in the standard 1-node e2e suite. The experimental e2e timeout is bumped from 20m to 25m to accommodate leader re-election time (~163s worst case). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Todd Short <tshort@redhat.com> --------- Signed-off-by: Todd Short <tshort@redhat.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 0e70fe3 commit e12d90b

11 files changed

Lines changed: 206 additions & 42 deletions

File tree

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ $(eval $(call install-sh,standard,operator-controller-standard.yaml))
254254
.PHONY: test
255255
test: manifests generate fmt lint test-unit test-e2e test-regression #HELP Run all tests.
256256

257-
E2E_TIMEOUT ?= 15m
257+
E2E_TIMEOUT ?= 20m
258258
GODOG_ARGS ?=
259259
.PHONY: e2e
260260
e2e: #EXHELP Run the e2e tests.
@@ -316,7 +316,7 @@ test-experimental-e2e: COVERAGE_NAME := experimental-e2e
316316
test-experimental-e2e: export MANIFEST := $(EXPERIMENTAL_RELEASE_MANIFEST)
317317
test-experimental-e2e: export INSTALL_DEFAULT_CATALOGS := false
318318
test-experimental-e2e: PROMETHEUS_VALUES := helm/prom_experimental.yaml
319-
test-experimental-e2e: E2E_TIMEOUT := 20m
319+
test-experimental-e2e: E2E_TIMEOUT := 25m
320320
test-experimental-e2e: run-internal prometheus e2e e2e-coverage kind-clean #HELP Run experimental e2e test suite on local kind cluster
321321

322322
.PHONY: prometheus

helm/experimental.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
# to pull in resources or additions
88
options:
99
operatorController:
10+
deployment:
11+
replicas: 2
1012
features:
1113
enabled:
1214
- SingleOwnNamespaceInstallSupport
@@ -20,6 +22,8 @@ options:
2022
# Use with {{- if has "FeatureGate" .Values.options.catalogd.features.enabled }}
2123
# to pull in resources or additions
2224
catalogd:
25+
deployment:
26+
replicas: 2
2327
features:
2428
enabled:
2529
- APIV1MetasHandler

internal/catalogd/serverutil/serverutil.go

Lines changed: 96 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package serverutil
22

33
import (
4+
"context"
45
"crypto/tls"
6+
"errors"
57
"fmt"
68
"io"
79
"net"
@@ -13,7 +15,7 @@ import (
1315
"github.com/klauspost/compress/gzhttp"
1416
ctrl "sigs.k8s.io/controller-runtime"
1517
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
16-
"sigs.k8s.io/controller-runtime/pkg/manager"
18+
"sigs.k8s.io/controller-runtime/pkg/healthz"
1719

1820
catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
1921
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
@@ -27,49 +29,116 @@ type CatalogServerConfig struct {
2729
LocalStorage storage.Instance
2830
}
2931

30-
func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFileWatcher *certwatcher.CertWatcher) error {
31-
listener, err := net.Listen("tcp", cfg.CatalogAddr)
32+
// AddCatalogServerToManager adds the catalog HTTP server to the manager and registers
33+
// a readiness check that passes once the server has started serving. Because
34+
// NeedLeaderElection returns false, Start() is called on every pod immediately, so all
35+
// replicas bind the catalog port and become ready. Non-leader pods serve requests but
36+
// return 404 (empty local cache); callers are expected to retry.
37+
func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, cw *certwatcher.CertWatcher) error {
38+
shutdownTimeout := 30 * time.Second
39+
r := &catalogServerRunnable{
40+
cfg: cfg,
41+
cw: cw,
42+
server: &http.Server{
43+
Addr: cfg.CatalogAddr,
44+
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
45+
ReadTimeout: 5 * time.Second,
46+
WriteTimeout: 5 * time.Minute,
47+
},
48+
shutdownTimeout: shutdownTimeout,
49+
ready: make(chan struct{}),
50+
}
51+
52+
if err := mgr.Add(r); err != nil {
53+
return fmt.Errorf("error adding catalog server to manager: %w", err)
54+
}
55+
56+
// Register a readiness check that passes once Start() has been called and the
57+
// server is actively serving. All pods reach Start() (NeedLeaderElection=false),
58+
// so all replicas become ready and receive traffic; non-leaders return 404 until
59+
// they win the leader lease and populate their local cache.
60+
if err := mgr.AddReadyzCheck("catalog-server", r.readyzCheck()); err != nil {
61+
return fmt.Errorf("error adding catalog server readiness check: %w", err)
62+
}
63+
64+
return nil
65+
}
66+
67+
// catalogServerRunnable is a Runnable that binds the catalog HTTP port on every pod.
68+
// Because NeedLeaderElection returns false, Start() is called on all replicas immediately;
69+
// non-leader pods serve the catalog port but return 404 (empty local cache).
70+
type catalogServerRunnable struct {
71+
cfg CatalogServerConfig
72+
cw *certwatcher.CertWatcher
73+
server *http.Server
74+
shutdownTimeout time.Duration
75+
// ready is closed by Start() once the server is about to begin serving.
76+
ready chan struct{}
77+
}
78+
79+
// NeedLeaderElection returns false so the catalog server starts on every pod
80+
// immediately, regardless of leadership. This is required for rolling updates:
81+
// if Start() were gated on leadership, a new pod could not win the leader lease
82+
// (held by the still-running old pod) and therefore could never pass the
83+
// catalog-server readiness check, deadlocking the rollout.
84+
//
85+
// Non-leader pods serve the catalog HTTP port but have an empty local cache
86+
// (only the leader's reconciler downloads catalog content), so requests to a
87+
// non-leader return 404. Callers are expected to retry.
88+
func (r *catalogServerRunnable) NeedLeaderElection() bool { return false }
89+
90+
func (r *catalogServerRunnable) Start(ctx context.Context) error {
91+
listener, err := net.Listen("tcp", r.cfg.CatalogAddr)
3292
if err != nil {
3393
return fmt.Errorf("error creating catalog server listener: %w", err)
3494
}
3595

36-
if cfg.CertFile != "" && cfg.KeyFile != "" {
37-
// Use the passed certificate watcher instead of creating a new one
96+
if r.cfg.CertFile != "" && r.cfg.KeyFile != "" {
3897
config := &tls.Config{
39-
GetCertificate: tlsFileWatcher.GetCertificate,
98+
GetCertificate: r.cw.GetCertificate,
4099
MinVersion: tls.VersionTLS12,
41100
}
42101
listener = tls.NewListener(listener, config)
43102
}
44103

45-
shutdownTimeout := 30 * time.Second
46-
catalogServer := manager.Server{
47-
Name: "catalogs",
48-
OnlyServeWhenLeader: true,
49-
Server: &http.Server{
50-
Addr: cfg.CatalogAddr,
51-
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
52-
ReadTimeout: 5 * time.Second,
53-
// TODO: Revert this to 10 seconds if/when the API
54-
// evolves to have significantly smaller responses
55-
WriteTimeout: 5 * time.Minute,
56-
},
57-
ShutdownTimeout: &shutdownTimeout,
58-
Listener: listener,
59-
}
104+
// Signal readiness before blocking on Serve so the readiness probe passes promptly.
105+
close(r.ready)
60106

61-
err = mgr.Add(&catalogServer)
62-
if err != nil {
63-
return fmt.Errorf("error adding catalog server to manager: %w", err)
64-
}
107+
go func() {
108+
<-ctx.Done()
109+
shutdownCtx := context.Background()
110+
if r.shutdownTimeout > 0 {
111+
var cancel context.CancelFunc
112+
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, r.shutdownTimeout)
113+
defer cancel()
114+
}
115+
if err := r.server.Shutdown(shutdownCtx); err != nil {
116+
// Shutdown errors (e.g. context deadline exceeded) are not actionable;
117+
// the process is terminating regardless.
118+
_ = err
119+
}
120+
}()
65121

122+
if err := r.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
123+
return err
124+
}
66125
return nil
67126
}
68127

128+
// readyzCheck returns a healthz.Checker that passes once Start() has been called.
129+
func (r *catalogServerRunnable) readyzCheck() healthz.Checker {
130+
return func(_ *http.Request) error {
131+
select {
132+
case <-r.ready:
133+
return nil
134+
default:
135+
return fmt.Errorf("catalog server not yet started")
136+
}
137+
}
138+
}
139+
69140
func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler {
70141
return handlers.CustomLoggingHandler(nil, handler, func(_ io.Writer, params handlers.LogFormatterParams) {
71-
// extract parameters used in apache common log format, but then log using `logr` to remain consistent
72-
// with other loggers used in this codebase.
73142
username := "-"
74143
if params.URL.User != nil {
75144
if name := params.URL.User.Username(); name != "" {

internal/operator-controller/catalogmetadata/client/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ func (c *Client) PopulateCache(ctx context.Context, catalog *ocv1.ClusterCatalog
106106
defer resp.Body.Close()
107107

108108
if resp.StatusCode != http.StatusOK {
109-
errToCache := fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
110-
return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, nil, errToCache)
109+
// Do not cache non-200 responses (e.g. 404 from a non-leader catalogd pod).
110+
// Returning the error directly lets the next reconcile retry a fresh HTTP
111+
// request and eventually hit the leader.
112+
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
111113
}
112114

113115
return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, resp.Body, nil)

internal/operator-controller/catalogmetadata/client/client_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,6 @@ func TestClientPopulateCache(t *testing.T) {
234234
}},
235235
}, nil
236236
},
237-
putFuncConstructor: func(t *testing.T) func(source string, errToCache error) (fs.FS, error) {
238-
return func(source string, errToCache error) (fs.FS, error) {
239-
assert.Empty(t, source)
240-
assert.Error(t, errToCache)
241-
return nil, errToCache
242-
}
243-
},
244237
assert: func(t *testing.T, fs fs.FS, err error) {
245238
assert.Nil(t, fs)
246239
assert.ErrorContains(t, err, "received unexpected response status code 500")

manifests/experimental-e2e.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2621,7 +2621,7 @@ metadata:
26212621
namespace: olmv1-system
26222622
spec:
26232623
minReadySeconds: 5
2624-
replicas: 1
2624+
replicas: 2
26252625
strategy:
26262626
type: RollingUpdate
26272627
rollingUpdate:
@@ -2772,7 +2772,7 @@ metadata:
27722772
name: operator-controller-controller-manager
27732773
namespace: olmv1-system
27742774
spec:
2775-
replicas: 1
2775+
replicas: 2
27762776
strategy:
27772777
type: RollingUpdate
27782778
rollingUpdate:

manifests/experimental.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2541,7 +2541,7 @@ metadata:
25412541
namespace: olmv1-system
25422542
spec:
25432543
minReadySeconds: 5
2544-
replicas: 1
2544+
replicas: 2
25452545
strategy:
25462546
type: RollingUpdate
25472547
rollingUpdate:
@@ -2679,7 +2679,7 @@ metadata:
26792679
name: operator-controller-controller-manager
26802680
namespace: olmv1-system
26812681
spec:
2682-
replicas: 1
2682+
replicas: 2
26832683
strategy:
26842684
type: RollingUpdate
26852685
rollingUpdate:

test/e2e/features/ha.feature

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Feature: HA failover for catalogd
2+
3+
When catalogd is deployed with multiple replicas, the remaining pods must
4+
elect a new leader and resume serving catalogs if the leader pod is lost.
5+
6+
Background:
7+
Given OLM is available
8+
And an image registry is available
9+
10+
@CatalogdHA
11+
Scenario: Catalogd resumes serving catalogs after leader pod failure
12+
Given a catalog "test" with packages:
13+
| package | version | channel | replaces | contents |
14+
| test | 1.0.0 | stable | | CRD, Deployment, ConfigMap |
15+
And catalogd is ready to reconcile resources
16+
And catalog "test" is reconciled
17+
When the catalogd leader pod is force-deleted
18+
Then a new catalogd leader is elected
19+
And catalog "test" reports Serving as True with Reason Available

test/e2e/steps/ha_steps.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package steps
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"k8s.io/component-base/featuregate"
9+
)
10+
11+
// catalogdHAFeature gates scenarios that require a multi-node cluster.
12+
// It is set to true in BeforeSuite when the cluster has at least 2 nodes,
13+
// which is the case for the experimental e2e suite (kind-config-2node.yaml)
14+
// but not the standard suite.
15+
const catalogdHAFeature featuregate.Feature = "CatalogdHA"
16+
17+
// CatalogdLeaderPodIsForceDeleted force-deletes the catalogd leader pod to simulate leader loss.
18+
// The pod is identified from sc.leaderPods["catalogd"] (populated by a prior
19+
// "catalogd is ready to reconcile resources" step). Force-deletion is equivalent to
20+
// an abrupt process crash: the lease is no longer renewed and the surviving pod
21+
// acquires leadership after the lease expires.
22+
//
23+
// Note: stopping the kind node container is not used here because both nodes in the
24+
// experimental 2-node cluster are control-plane nodes that run etcd — stopping either
25+
// would break etcd quorum and make the API server unreachable for the rest of the test.
26+
func CatalogdLeaderPodIsForceDeleted(ctx context.Context) error {
27+
sc := scenarioCtx(ctx)
28+
leaderPod := sc.leaderPods["catalogd"]
29+
if leaderPod == "" {
30+
return fmt.Errorf("catalogd leader pod not found in scenario context; run 'catalogd is ready to reconcile resources' first")
31+
}
32+
33+
logger.Info("Force-deleting catalogd leader pod", "pod", leaderPod)
34+
if _, err := k8sClient("delete", "pod", leaderPod, "-n", olmNamespace,
35+
"--force", "--grace-period=0"); err != nil {
36+
return fmt.Errorf("failed to force-delete catalogd leader pod %q: %w", leaderPod, err)
37+
}
38+
return nil
39+
}
40+
41+
// NewCatalogdLeaderIsElected polls the catalogd leader election lease until the holder
42+
// identity changes to a pod other than the deleted leader. It updates
43+
// sc.leaderPods["catalogd"] with the new leader pod name.
44+
func NewCatalogdLeaderIsElected(ctx context.Context) error {
45+
sc := scenarioCtx(ctx)
46+
oldLeader := sc.leaderPods["catalogd"]
47+
48+
waitFor(ctx, func() bool {
49+
holder, err := k8sClient("get", "lease", leaseNames["catalogd"], "-n", olmNamespace,
50+
"-o", "jsonpath={.spec.holderIdentity}")
51+
if err != nil || holder == "" {
52+
return false
53+
}
54+
newPod := strings.Split(strings.TrimSpace(holder), "_")[0]
55+
if newPod == oldLeader {
56+
return false
57+
}
58+
sc.leaderPods["catalogd"] = newPod
59+
logger.Info("New catalogd leader elected", "pod", newPod)
60+
return true
61+
})
62+
return nil
63+
}

test/e2e/steps/hooks.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os/exec"
99
"regexp"
1010
"strconv"
11+
"strings"
1112

1213
"github.com/cucumber/godog"
1314
"github.com/go-logr/logr"
@@ -90,6 +91,7 @@ var (
9091
features.HelmChartSupport: false,
9192
features.BoxcutterRuntime: false,
9293
features.DeploymentConfig: false,
94+
catalogdHAFeature: false,
9395
}
9496
logger logr.Logger
9597
)
@@ -131,6 +133,14 @@ func BeforeSuite() {
131133
logger = textlogger.NewLogger(textlogger.NewConfig())
132134
}
133135

136+
// Enable HA scenarios when the cluster has at least 2 nodes. This runs
137+
// unconditionally so that upgrade scenarios (which install OLM in a Background
138+
// step and return early below) still get the gate set correctly.
139+
if out, err := k8sClient("get", "nodes", "--no-headers", "-o", "name"); err == nil &&
140+
len(strings.Fields(strings.TrimSpace(out))) >= 2 {
141+
featureGates[catalogdHAFeature] = true
142+
}
143+
134144
olm, err := detectOLMDeployment()
135145
if err != nil {
136146
logger.Info("OLM deployments not found; skipping feature gate detection (upgrade scenarios will install OLM in Background)")
@@ -152,6 +162,7 @@ func BeforeSuite() {
152162
}
153163
}
154164
}
165+
155166
logger.Info(fmt.Sprintf("Enabled feature gates: %v", featureGates))
156167
}
157168

0 commit comments

Comments
 (0)