Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ $(eval $(call install-sh,standard,operator-controller-standard.yaml))
.PHONY: test
test: manifests generate fmt lint test-unit test-e2e test-regression #HELP Run all tests.

E2E_TIMEOUT ?= 15m
E2E_TIMEOUT ?= 20m
Comment thread
tmshort marked this conversation as resolved.
GODOG_ARGS ?=
.PHONY: e2e
e2e: #EXHELP Run the e2e tests.
Expand Down Expand Up @@ -316,7 +316,7 @@ test-experimental-e2e: COVERAGE_NAME := experimental-e2e
test-experimental-e2e: export MANIFEST := $(EXPERIMENTAL_RELEASE_MANIFEST)
test-experimental-e2e: export INSTALL_DEFAULT_CATALOGS := false
test-experimental-e2e: PROMETHEUS_VALUES := helm/prom_experimental.yaml
test-experimental-e2e: E2E_TIMEOUT := 20m
test-experimental-e2e: E2E_TIMEOUT := 25m
test-experimental-e2e: run-internal prometheus e2e e2e-coverage kind-clean #HELP Run experimental e2e test suite on local kind cluster

.PHONY: prometheus
Expand Down
4 changes: 4 additions & 0 deletions helm/experimental.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# to pull in resources or additions
options:
operatorController:
deployment:
replicas: 2
features:
enabled:
- SingleOwnNamespaceInstallSupport
Expand All @@ -20,6 +22,8 @@ options:
# Use with {{- if has "FeatureGate" .Values.options.catalogd.features.enabled }}
# to pull in resources or additions
catalogd:
deployment:
replicas: 2
features:
enabled:
- APIV1MetasHandler
Expand Down
122 changes: 95 additions & 27 deletions internal/catalogd/serverutil/serverutil.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package serverutil

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
Expand All @@ -13,7 +15,7 @@ import (
"github.com/klauspost/compress/gzhttp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/healthz"

catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
Expand All @@ -27,49 +29,115 @@ type CatalogServerConfig struct {
LocalStorage storage.Instance
}

func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFileWatcher *certwatcher.CertWatcher) error {
listener, err := net.Listen("tcp", cfg.CatalogAddr)
// AddCatalogServerToManager adds the catalog HTTP server to the manager and registers
// a readiness check that passes once the server has started serving. Because
// NeedLeaderElection returns false, Start() is called on every pod immediately, so all
// replicas bind the catalog port and become ready. Non-leader pods serve requests but
// return 404 (empty local cache); callers are expected to retry.
func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, cw *certwatcher.CertWatcher) error {
shutdownTimeout := 30 * time.Second
r := &catalogServerRunnable{
cfg: cfg,
cw: cw,
server: &http.Server{
Addr: cfg.CatalogAddr,
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Minute,
},
shutdownTimeout: shutdownTimeout,
ready: make(chan struct{}),
}

if err := mgr.Add(r); err != nil {
return fmt.Errorf("error adding catalog server to manager: %w", err)
}

// Register a readiness check that passes once Start() has been called and the
// server is actively serving. All pods reach Start() (NeedLeaderElection=false),
// so all replicas become ready and receive traffic; non-leaders return 404 until
// they win the leader lease and populate their local cache.
if err := mgr.AddReadyzCheck("catalog-server", r.readyzCheck()); err != nil {
return fmt.Errorf("error adding catalog server readiness check: %w", err)
}

return nil
}

// catalogServerRunnable is a leader-only Runnable that binds the catalog HTTP port
// lazily inside Start(), so non-leader pods never hold the listen socket.
type catalogServerRunnable struct {
cfg CatalogServerConfig
cw *certwatcher.CertWatcher
server *http.Server
shutdownTimeout time.Duration
// ready is closed by Start() once the server is about to begin serving.
ready chan struct{}
}

// NeedLeaderElection returns false so the catalog server starts on every pod
// immediately, regardless of leadership. This is required for rolling updates:
// if Start() were gated on leadership, a new pod could not win the leader lease
// (held by the still-running old pod) and therefore could never pass the
// catalog-server readiness check, deadlocking the rollout.
//
// Non-leader pods serve the catalog HTTP port but have an empty local cache
// (only the leader's reconciler downloads catalog content), so requests to a
// non-leader return 404. Callers are expected to retry.
func (r *catalogServerRunnable) NeedLeaderElection() bool { return false }
Comment thread
tmshort marked this conversation as resolved.

func (r *catalogServerRunnable) Start(ctx context.Context) error {
listener, err := net.Listen("tcp", r.cfg.CatalogAddr)
if err != nil {
return fmt.Errorf("error creating catalog server listener: %w", err)
}

if cfg.CertFile != "" && cfg.KeyFile != "" {
// Use the passed certificate watcher instead of creating a new one
if r.cfg.CertFile != "" && r.cfg.KeyFile != "" {
config := &tls.Config{
GetCertificate: tlsFileWatcher.GetCertificate,
GetCertificate: r.cw.GetCertificate,
MinVersion: tls.VersionTLS12,
}
listener = tls.NewListener(listener, config)
}

shutdownTimeout := 30 * time.Second
catalogServer := manager.Server{
Name: "catalogs",
OnlyServeWhenLeader: true,
Server: &http.Server{
Addr: cfg.CatalogAddr,
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
ReadTimeout: 5 * time.Second,
// TODO: Revert this to 10 seconds if/when the API
// evolves to have significantly smaller responses
WriteTimeout: 5 * time.Minute,
},
ShutdownTimeout: &shutdownTimeout,
Listener: listener,
}
// Signal readiness before blocking on Serve so the readiness probe passes promptly.
close(r.ready)

err = mgr.Add(&catalogServer)
if err != nil {
return fmt.Errorf("error adding catalog server to manager: %w", err)
}
go func() {
<-ctx.Done()
shutdownCtx := context.Background()
if r.shutdownTimeout > 0 {
var cancel context.CancelFunc
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, r.shutdownTimeout)
defer cancel()
}
if err := r.server.Shutdown(shutdownCtx); err != nil {
// Shutdown errors (e.g. context deadline exceeded) are not actionable;
// the process is terminating regardless.
_ = err
Comment thread
tmshort marked this conversation as resolved.
}
}()

if err := r.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}

// readyzCheck returns a healthz.Checker that passes once Start() has been called.
func (r *catalogServerRunnable) readyzCheck() healthz.Checker {
return func(_ *http.Request) error {
select {
case <-r.ready:
return nil
default:
return fmt.Errorf("catalog server not yet started")
}
}
}

func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler {
return handlers.CustomLoggingHandler(nil, handler, func(_ io.Writer, params handlers.LogFormatterParams) {
// extract parameters used in apache common log format, but then log using `logr` to remain consistent
// with other loggers used in this codebase.
username := "-"
if params.URL.User != nil {
if name := params.URL.User.Username(); name != "" {
Expand Down
4 changes: 2 additions & 2 deletions manifests/experimental-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2621,7 +2621,7 @@ metadata:
namespace: olmv1-system
spec:
minReadySeconds: 5
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down Expand Up @@ -2772,7 +2772,7 @@ metadata:
name: operator-controller-controller-manager
namespace: olmv1-system
spec:
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
4 changes: 2 additions & 2 deletions manifests/experimental.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2541,7 +2541,7 @@ metadata:
namespace: olmv1-system
spec:
minReadySeconds: 5
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down Expand Up @@ -2679,7 +2679,7 @@ metadata:
name: operator-controller-controller-manager
namespace: olmv1-system
spec:
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
19 changes: 19 additions & 0 deletions test/e2e/features/ha.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Feature: HA failover for catalogd

When catalogd is deployed with multiple replicas, the remaining pods must
elect a new leader and resume serving catalogs if the leader pod is lost.

Background:
Given OLM is available
And an image registry is available

@CatalogdHA
Scenario: Catalogd resumes serving catalogs after leader pod failure
Given a catalog "test" with packages:
| package | version | channel | replaces | contents |
| test | 1.0.0 | stable | | CRD, Deployment, ConfigMap |
And catalogd is ready to reconcile resources
And catalog "test" is reconciled
When the catalogd leader pod is force-deleted
Then a new catalogd leader is elected
And catalog "test" reports Serving as True with Reason Available
63 changes: 63 additions & 0 deletions test/e2e/steps/ha_steps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package steps

import (
"context"
"fmt"
"strings"

"k8s.io/component-base/featuregate"
)

// catalogdHAFeature gates scenarios that require a multi-node cluster.
// It is set to true in BeforeSuite when the cluster has at least 2 nodes,
// which is the case for the experimental e2e suite (kind-config-2node.yaml)
// but not the standard suite.
const catalogdHAFeature featuregate.Feature = "CatalogdHA"

// CatalogdLeaderPodIsForceDeleted force-deletes the catalogd leader pod to simulate leader loss.
// The pod is identified from sc.leaderPods["catalogd"] (populated by a prior
// "catalogd is ready to reconcile resources" step). Force-deletion is equivalent to
// an abrupt process crash: the lease is no longer renewed and the surviving pod
// acquires leadership after the lease expires.
//
// Note: stopping the kind node container is not used here because both nodes in the
// experimental 2-node cluster are control-plane nodes that run etcd — stopping either
// would break etcd quorum and make the API server unreachable for the rest of the test.
func CatalogdLeaderPodIsForceDeleted(ctx context.Context) error {
sc := scenarioCtx(ctx)
leaderPod := sc.leaderPods["catalogd"]
if leaderPod == "" {
return fmt.Errorf("catalogd leader pod not found in scenario context; run 'catalogd is ready to reconcile resources' first")
}

logger.Info("Force-deleting catalogd leader pod", "pod", leaderPod)
if _, err := k8sClient("delete", "pod", leaderPod, "-n", olmNamespace,
"--force", "--grace-period=0"); err != nil {
return fmt.Errorf("failed to force-delete catalogd leader pod %q: %w", leaderPod, err)
}
return nil
}

// NewCatalogdLeaderIsElected polls the catalogd leader election lease until the holder
// identity changes to a pod other than the deleted leader. It updates
// sc.leaderPods["catalogd"] with the new leader pod name.
func NewCatalogdLeaderIsElected(ctx context.Context) error {
sc := scenarioCtx(ctx)
oldLeader := sc.leaderPods["catalogd"]

waitFor(ctx, func() bool {
holder, err := k8sClient("get", "lease", leaseNames["catalogd"], "-n", olmNamespace,
"-o", "jsonpath={.spec.holderIdentity}")
if err != nil || holder == "" {
return false
}
newPod := strings.Split(strings.TrimSpace(holder), "_")[0]
if newPod == oldLeader {
return false
}
sc.leaderPods["catalogd"] = newPod
logger.Info("New catalogd leader elected", "pod", newPod)
return true
})
return nil
}
11 changes: 11 additions & 0 deletions test/e2e/steps/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"regexp"
"strconv"
"strings"
"sync"

"github.com/cucumber/godog"
Expand Down Expand Up @@ -90,6 +91,7 @@ var (
features.HelmChartSupport: false,
features.BoxcutterRuntime: false,
features.DeploymentConfig: false,
catalogdHAFeature: false,
}
logger logr.Logger
)
Expand Down Expand Up @@ -131,6 +133,14 @@ func BeforeSuite() {
logger = textlogger.NewLogger(textlogger.NewConfig())
}

// Enable HA scenarios when the cluster has at least 2 nodes. This runs
// unconditionally so that upgrade scenarios (which install OLM in a Background
// step and return early below) still get the gate set correctly.
if out, err := k8sClient("get", "nodes", "--no-headers", "-o", "name"); err == nil &&
len(strings.Fields(strings.TrimSpace(out))) >= 2 {
featureGates[catalogdHAFeature] = true
}

olm, err := detectOLMDeployment()
if err != nil {
logger.Info("OLM deployments not found; skipping feature gate detection (upgrade scenarios will install OLM in Background)")
Expand All @@ -152,6 +162,7 @@ func BeforeSuite() {
}
}
}

logger.Info(fmt.Sprintf("Enabled feature gates: %v", featureGates))
}

Expand Down
3 changes: 3 additions & 0 deletions test/e2e/steps/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func RegisterSteps(sc *godog.ScenarioContext) {
sc.Step(`^(?i)the "([^"]+)" component is configured with HTTPS_PROXY "([^"]+)"$`, ConfigureDeploymentWithHTTPSProxy)
sc.Step(`^(?i)the "([^"]+)" component is configured with HTTPS_PROXY pointing to a recording proxy$`, StartRecordingProxyAndConfigureDeployment)
sc.Step(`^(?i)the recording proxy received a CONNECT request for the catalogd service$`, RecordingProxyReceivedCONNECTForCatalogd)

sc.Step(`^(?i)the catalogd leader pod is force-deleted$`, CatalogdLeaderPodIsForceDeleted)
sc.Step(`^(?i)a new catalogd leader is elected$`, NewCatalogdLeaderIsElected)
}

func init() {
Expand Down
Loading