From 04f48d007d361072feaf09be272edeae2374e6c9 Mon Sep 17 00:00:00 2001 From: Xavier Lange Date: Wed, 17 Jun 2026 13:16:02 -0400 Subject: [PATCH 1/2] test(e2e): add kind-based stress harness for scale/churn Adds a build-tagged stress target (`//go:build stress` + `make test-stress`) that exercises the operator at 1/3/7-member scale and under scale-churn, crash-during-scale, and pod-recovery, reusing the existing kind bootstrap and e2e primitives. The fast `make test-e2e` suite is unaffected (stress tag excluded). Green tests (pass on current main): TestStressBringUp, TestStressScaleChurn, TestStressSingleEditJump, TestStressCrashDuringScale, TestStressPodRecoveryAtScale. Skip-gated bug-proofs (flip to passing alongside their fix PR): TestStressVersionUpgrade, TestStressEvenSizeRejected, TestStressLeaderlessScaleIn. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Xavier Lange --- Makefile | 9 + test/e2e/stress_helpers_test.go | 270 ++++++++++++++++ test/e2e/stress_test.go | 533 ++++++++++++++++++++++++++++++++ 3 files changed, 812 insertions(+) create mode 100644 test/e2e/stress_helpers_test.go create mode 100644 test/e2e/stress_test.go diff --git a/Makefile b/Makefile index 05d9ad1d..616003fa 100644 --- a/Makefile +++ b/Makefile @@ -91,6 +91,15 @@ test-e2e: generate fmt vet kind gofail-enable ## Run the e2e tests. Expected an ETCD_VERSION="$(E2E_ETCD_VERSION)" PATH="$(LOCALBIN):$(PATH)" go test ./test/e2e/ -v $(MAKE) gofail-disable +# The stress suite is build-tagged (//go:build stress) so the fast e2e suite +# above never compiles or runs it. It reuses the same Kind bootstrap + deployed +# operator, but exercises larger clusters (1/3/7 members) under churn, so it +# needs a longer timeout. +.PHONY: test-stress +test-stress: generate fmt vet kind gofail-enable ## Run the stress tests. Expected an isolated environment using Kind. + ETCD_VERSION="$(E2E_ETCD_VERSION)" PATH="$(LOCALBIN):$(PATH)" go test ./test/e2e/ -tags stress -run 'TestStress' -timeout 40m -v + $(MAKE) gofail-disable + .PHONY: gofail-enable gofail-enable: gofail $(GOFAIL) enable ./internal/controller/ diff --git a/test/e2e/stress_helpers_test.go b/test/e2e/stress_helpers_test.go new file mode 100644 index 00000000..4ddd1d1a --- /dev/null +++ b/test/e2e/stress_helpers_test.go @@ -0,0 +1,270 @@ +//go:build stress + +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "testing" + "time" + + "sigs.k8s.io/e2e-framework/klient/wait" + "sigs.k8s.io/e2e-framework/pkg/envconf" +) + +// These helpers are intentionally composed from the primitives already living +// in helpers_test.go (createEtcdClusterWithPVC, waitForNoLearners, +// getEtcdMemberListPB, getClusterEndpointHashKVs, execInPod, ...). They add the +// higher-level invariants the stress suite asserts (full health, leader +// identity, hashkv consistency, a continuous quorum watcher, and a timed +// bring-up) without re-implementing any of the low-level etcdctl plumbing. + +// endpointHealthAllHealthy runs `etcdctl endpoint health --cluster` from the +// given pod and returns whether every reported endpoint is healthy, along with +// the healthy/total counts. It never fails the test itself so callers (e.g. the +// quorum watcher) can poll it. +func endpointHealthAllHealthy(t *testing.T, c *envconf.Config, podName string) (healthy, total int, ok bool) { + t.Helper() + cmd := []string{"etcdctl", "endpoint", "health", "--cluster"} + stdout, _, err := execInPod(t, c, podName, namespace, cmd) + if err != nil { + // Treat an exec/etcdctl error as "not currently healthy" rather than a + // hard failure; transient unavailability during churn is expected. + return 0, 0, false + } + for _, line := range strings.Split(strings.TrimSpace(stdout), "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + total++ + if strings.Contains(line, "is healthy") { + healthy++ + } + } + return healthy, total, total > 0 && healthy == total +} + +// waitForClusterHealthy blocks until the cluster reachable through podName has +// exactly `size` voting members (no learners) AND every endpoint reports +// healthy, or the timeout elapses. +func waitForClusterHealthy(t *testing.T, c *envconf.Config, podName string, size int, timeout time.Duration) { + t.Helper() + // First require the steady-state membership (size members, no learners). + waitForNoLearners(t, c, podName, size, timeout) + + // Then require every endpoint to be healthy. + err := wait.For(func(ctx context.Context) (bool, error) { + healthy, total, ok := endpointHealthAllHealthy(t, c, podName) + if !ok { + return false, nil + } + return total == size && healthy == size, nil + }, wait.WithTimeout(timeout), wait.WithInterval(5*time.Second)) + if err != nil { + t.Fatalf("cluster %s never reached %d healthy endpoints: %v", podName, size, err) + } +} + +// endpointStatusEntry mirrors the shape of `etcdctl endpoint status --cluster -w json`. +type endpointStatusEntry struct { + Endpoint string `json:"Endpoint"` + Status struct { + Header struct { + // member_id is the ID of the member that answered this endpoint. + MemberID uint64 `json:"member_id"` + } `json:"header"` + // leader is the member ID the responding member currently believes is leader. + Leader uint64 `json:"leader"` + } `json:"Status"` +} + +// getEtcdLeader parses `etcdctl endpoint status --cluster -w json` and returns +// the leader's member name and its StatefulSet pod ordinal. The member name for +// etcd pods is the pod name (e.g. "etcd-cluster-2"), so the ordinal is the +// suffix after the final '-'. +func getEtcdLeader(t *testing.T, c *envconf.Config, podName string) (leaderName string, ordinal int) { + t.Helper() + cmd := []string{"etcdctl", "endpoint", "status", "--cluster", "-w", "json"} + stdout, stderr, err := execInPod(t, c, podName, namespace, cmd) + if err != nil { + t.Fatalf("Failed to get endpoint status from %s: %v, stderr: %s", podName, err, stderr) + } + + var entries []endpointStatusEntry + if err := json.Unmarshal([]byte(stdout), &entries); err != nil { + t.Fatalf("Failed to parse endpoint status JSON: %v. Raw: %s", err, stdout) + } + if len(entries) == 0 { + t.Fatalf("endpoint status returned no entries from %s", podName) + } + + // Any entry reports the same leader ID; take the first non-zero one. + var leaderID uint64 + for _, e := range entries { + if e.Status.Leader != 0 { + leaderID = e.Status.Leader + break + } + } + if leaderID == 0 { + t.Fatalf("no leader reported by endpoint status from %s (cluster leaderless?)", podName) + } + + // Map the leader member ID back to a member name via the member list. + for name, id := range getEtcdMembersName2IDMapping(t, c, podName) { + if id == leaderID { + leaderName = name + break + } + } + if leaderName == "" { + t.Fatalf("leader ID %d not found in member list from %s", leaderID, podName) + } + + ordinal = podOrdinal(t, leaderName) + return leaderName, ordinal +} + +// podOrdinal extracts the StatefulSet ordinal from a pod/member name like +// "etcd-cluster-3" -> 3. +func podOrdinal(t *testing.T, name string) int { + t.Helper() + idx := strings.LastIndex(name, "-") + if idx < 0 || idx == len(name)-1 { + t.Fatalf("cannot derive ordinal from name %q", name) + } + var ord int + if _, err := fmt.Sscanf(name[idx+1:], "%d", &ord); err != nil { + t.Fatalf("cannot parse ordinal from name %q: %v", name, err) + } + return ord +} + +// assertHashKVConsistent wraps getClusterEndpointHashKVs and fails the test if +// the members disagree on their key-value hash (a data-divergence signal). +func assertHashKVConsistent(t *testing.T, c *envconf.Config, podName string) { + t.Helper() + responses := getClusterEndpointHashKVs(t, c, podName) + if len(responses) == 0 { + t.Fatalf("hashkv returned no responses from %s", podName) + } + hashes := make(map[uint32]struct{}) + for _, r := range responses { + hashes[r.Hash] = struct{}{} + } + if len(hashes) != 1 { + t.Errorf("hashkv divergence across %d members reachable from %s: %d distinct hashes", + len(responses), podName, len(hashes)) + } +} + +// quorumWatcher polls endpoint health every ~2s in the background and records +// any window during which the cluster was not fully healthy (a proxy for +// quorum-loss / write-stall). Call the returned stop func when the churn under +// test is complete; it stops the watcher and fails the test if any unhealthy +// window was observed. +// +// The watcher tolerates a single transient unhealthy poll (one pod momentarily +// restarting is normal during scale steps); it only fails on a *sustained* +// unhealthy window (>= unhealthyTolerance consecutive bad polls), which is what +// a real quorum loss looks like. +func quorumWatcher(ctx context.Context, t *testing.T, c *envconf.Config, podName string) (stop func()) { + t.Helper() + + const pollInterval = 2 * time.Second + const unhealthyTolerance = 3 // consecutive bad polls (~6s) before we call it a quorum-loss window + + wctx, cancel := context.WithCancel(ctx) + var ( + wg sync.WaitGroup + mu sync.Mutex + windows int // number of distinct sustained unhealthy windows + worstStreak int // longest consecutive unhealthy streak observed + totalUnhealthy int // total unhealthy polls (informational) + ) + + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + streak := 0 + inWindow := false + for { + select { + case <-wctx.Done(): + return + case <-ticker.C: + _, _, ok := endpointHealthAllHealthy(t, c, podName) + mu.Lock() + if ok { + streak = 0 + inWindow = false + } else { + streak++ + totalUnhealthy++ + if streak > worstStreak { + worstStreak = streak + } + if streak >= unhealthyTolerance && !inWindow { + windows++ + inWindow = true + } + } + mu.Unlock() + } + } + }() + + return func() { + cancel() + wg.Wait() + mu.Lock() + defer mu.Unlock() + if windows > 0 { + t.Errorf("quorumWatcher(%s): observed %d sustained quorum-loss window(s) "+ + "(worst streak %d polls @ %s, %d unhealthy polls total)", + podName, windows, worstStreak, pollInterval, totalUnhealthy) + } else { + t.Logf("quorumWatcher(%s): no quorum-loss window (worst streak %d poll(s), %d unhealthy polls total)", + podName, worstStreak, totalUnhealthy) + } + } +} + +// timeToHealthy creates a cluster of the given size and blocks until it is fully +// healthy, logging the elapsed wall-clock time. This is the "efficient spin-up" +// baseline the plan calls for: the log line is the number to watch as the +// blocking-reconcile de-block work lands later. +func timeToHealthy(ctx context.Context, t *testing.T, c *envconf.Config, name string, size int, timeout time.Duration) time.Duration { + t.Helper() + start := time.Now() + createEtcdClusterWithPVC(ctx, t, c, name, size) + waitForSTSReadiness(t, c, name, size) + podName := fmt.Sprintf("%s-0", name) + waitForClusterHealthy(t, c, podName, size, timeout) + elapsed := time.Since(start) + t.Logf("timeToHealthy: cluster %q size=%d reached full health in %s", name, size, elapsed) + return elapsed +} diff --git a/test/e2e/stress_test.go b/test/e2e/stress_test.go new file mode 100644 index 00000000..2ced277b --- /dev/null +++ b/test/e2e/stress_test.go @@ -0,0 +1,533 @@ +//go:build stress + +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/e2e-framework/klient/wait" + "sigs.k8s.io/e2e-framework/pkg/envconf" + + ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1" +) + +// The stress suite reuses the package-level testenv (one kind cluster + one +// deployed operator for the whole run, set up in TestMain). Unlike the fast e2e +// tests it drives churn imperatively rather than through the features builder, +// because the invariants (continuous quorum watch, one-learner-at-a-time +// progression) are easier to express as straight-line Go. +// +// Each test gets its own EtcdCluster name and tears it down at the end unless +// ETCD_E2E_SKIP_TEARDOWN=true (honored by the shared skipTeardown var). + +// stressTeardown removes the cluster unless the operator left the teardown +// disabled for manual inspection. +func stressTeardown(ctx context.Context, t *testing.T, c *envconf.Config, name string) { + t.Helper() + if skipTeardown { + t.Logf("ETCD_E2E_SKIP_TEARDOWN=true, leaving cluster %q in place", name) + return + } + cleanupEtcdCluster(ctx, t, c, name) +} + +// healthyTimeout scales the per-step health timeout with cluster size; a 7-member +// bootstrap is serial learner-add x6 behind the blocking reconcile, so it needs +// considerably longer than the helpers_test.go default. +func healthyTimeout(size int) time.Duration { + if size >= 7 { + return 12 * time.Minute + } + return 6 * time.Minute +} + +// writeKeyset writes a deterministic set of keys via pod-0 so later assertions +// can confirm the data survived churn. +func writeKeyset(t *testing.T, c *envconf.Config, name string, n int) { + t.Helper() + for i := 0; i < n; i++ { + verifyDataOperations(t, c, name, fmt.Sprintf("stress-key-%d", i), fmt.Sprintf("stress-val-%d", i)) + } +} + +// assertKeyset reads the keyset back from pod-0 and fails on any mismatch. +func assertKeyset(t *testing.T, c *envconf.Config, name string, n int) { + t.Helper() + podName := fmt.Sprintf("%s-0", name) + for i := 0; i < n; i++ { + key := fmt.Sprintf("stress-key-%d", i) + want := fmt.Sprintf("stress-val-%d", i) + stdout, stderr, err := execInPod(t, c, podName, namespace, + []string{"etcdctl", "get", key, "--print-value-only"}) + if err != nil { + t.Fatalf("failed to read %s back: %v, stderr: %s", key, err, stderr) + } + if got := strings.TrimSpace(stdout); got != want { + t.Errorf("keyset mismatch for %s: want %q got %q", key, want, got) + } + } +} + +// --------------------------------------------------------------------------- +// Section 3: green tests (expected to pass on current main; upstream-PR fodder) +// --------------------------------------------------------------------------- + +// TestStressBringUp brings up clusters of 1/3/7 members, recording the elapsed +// time-to-healthy per size (the efficient-spin-up baseline), and asserts data +// consistency and a round-trip keyset at each size. +func TestStressBringUp(t *testing.T) { + ctx := context.Background() + cfg := testEnv.EnvConf() + + for _, size := range []int{1, 3, 7} { + size := size + t.Run(fmt.Sprintf("size-%d", size), func(t *testing.T) { + name := fmt.Sprintf("etcd-stress-bringup-%d", size) + defer stressTeardown(ctx, t, cfg, name) + + timeToHealthy(ctx, t, cfg, name, size, healthyTimeout(size)) + + podName := fmt.Sprintf("%s-0", name) + assertHashKVConsistent(t, cfg, podName) + + writeKeyset(t, cfg, name, 10) + assertKeyset(t, cfg, name, 10) + assertHashKVConsistent(t, cfg, podName) + }) + } +} + +// TestStressScaleChurn drives 1 -> 3 -> 7 -> 3 -> 1 with a quorum watcher +// running throughout. After every step it asserts steady-state membership (no +// stuck learners), hashkv consistency, and that the keyset written up-front is +// still intact. +func TestStressScaleChurn(t *testing.T) { + ctx := context.Background() + cfg := testEnv.EnvConf() + + name := "etcd-stress-churn" + defer stressTeardown(ctx, t, cfg, name) + podName := fmt.Sprintf("%s-0", name) + + // Start at 1 and seed the keyset. + timeToHealthy(ctx, t, cfg, name, 1, healthyTimeout(1)) + writeKeyset(t, cfg, name, 10) + + stop := quorumWatcher(ctx, t, cfg, podName) + defer stop() + + for _, size := range []int{3, 7, 3, 1} { + t.Logf("scaling cluster %q to %d", name, size) + scaleEtcdCluster(ctx, t, cfg, name, size) + waitForSTSReadiness(t, cfg, name, size) + waitForClusterHealthy(t, cfg, podName, size, healthyTimeout(size)) + + // One-member-at-a-time progression: by the time we are healthy there + // must be no learners and exactly `size` voting members. + ml := getEtcdMemberListPB(t, cfg, podName) + if len(ml.Members) != size { + t.Errorf("after scaling to %d, member list has %d members", size, len(ml.Members)) + } + for _, m := range ml.Members { + if m.IsLearner { + t.Errorf("after scaling to %d, found stuck learner %s (%d)", size, m.Name, m.ID) + } + } + + assertHashKVConsistent(t, cfg, podName) + assertKeyset(t, cfg, name, 10) + } +} + +// TestStressSingleEditJump performs a single 1 -> 7 edit and asserts the +// operator never admits more than one learner at a time while converging. +func TestStressSingleEditJump(t *testing.T) { + ctx := context.Background() + cfg := testEnv.EnvConf() + + name := "etcd-stress-edit-jump" + defer stressTeardown(ctx, t, cfg, name) + podName := fmt.Sprintf("%s-0", name) + + timeToHealthy(ctx, t, cfg, name, 1, healthyTimeout(1)) + writeKeyset(t, cfg, name, 5) + + // Watch learner count for the duration of the convergence: the operator + // adds members one at a time, promoting each learner before adding the + // next, so the in-flight learner count must never exceed 1. + wctx, cancel := context.WithCancel(ctx) + maxLearners := startLearnerSampler(wctx, t, cfg, podName) + + scaleEtcdCluster(ctx, t, cfg, name, 7) + waitForSTSReadiness(t, cfg, name, 7) + waitForClusterHealthy(t, cfg, podName, 7, healthyTimeout(7)) + + cancel() + if got := maxLearners(); got > 1 { + t.Errorf("expected at most 1 concurrent learner during 1->7 jump, observed %d", got) + } + + assertHashKVConsistent(t, cfg, podName) + assertKeyset(t, cfg, name, 5) +} + +// startLearnerSampler polls the member list and tracks the maximum number of +// simultaneous learners seen until ctx is cancelled. The returned func blocks +// until the sampler stops and returns that maximum. +func startLearnerSampler(ctx context.Context, t *testing.T, c *envconf.Config, podName string) func() int { + t.Helper() + done := make(chan int, 1) + go func() { + max := 0 + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + done <- max + return + case <-ticker.C: + ml := getEtcdMemberListPB(t, c, podName) + learners := 0 + for _, m := range ml.Members { + if m.IsLearner { + learners++ + } + } + if learners > max { + max = learners + } + } + } + }() + return func() int { return <-done } +} + +// TestStressCrashDuringScale arms the member-add / member-delete failpoints +// during 3 -> 7 and 7 -> 3 respectively, beyond the size-3 coverage in +// TestScaling, and asserts the operator recovers and converges. +func TestStressCrashDuringScale(t *testing.T) { + ctx := context.Background() + cfg := testEnv.EnvConf() + + name := "etcd-stress-crash-scale" + defer stressTeardown(ctx, t, cfg, name) + podName := fmt.Sprintf("%s-0", name) + + timeToHealthy(ctx, t, cfg, name, 3, healthyTimeout(3)) + writeKeyset(t, cfg, name, 5) + + // 3 -> 7 with the operator panicking right after each member add. + withFailpoint(t, cfg, "exceptionAfterMemberAdd", "panic", func() { + scaleEtcdCluster(ctx, t, cfg, name, 7) + waitForSTSReadiness(t, cfg, name, 7) + waitForClusterHealthy(t, cfg, podName, 7, healthyTimeout(7)) + }) + assertNoLearnersAt(t, cfg, podName, 7) + assertHashKVConsistent(t, cfg, podName) + + // 7 -> 3 with the operator panicking right after each member delete. + withFailpoint(t, cfg, "exceptionAfterMemberDelete", "panic", func() { + scaleEtcdCluster(ctx, t, cfg, name, 3) + waitForSTSReadiness(t, cfg, name, 3) + waitForClusterHealthy(t, cfg, podName, 3, healthyTimeout(3)) + }) + assertNoLearnersAt(t, cfg, podName, 3) + assertHashKVConsistent(t, cfg, podName) + assertKeyset(t, cfg, name, 5) +} + +// assertNoLearnersAt verifies exactly `size` voting members with no learners. +func assertNoLearnersAt(t *testing.T, c *envconf.Config, podName string, size int) { + t.Helper() + ml := getEtcdMemberListPB(t, c, podName) + if len(ml.Members) != size { + t.Errorf("expected %d members, got %d", size, len(ml.Members)) + } + for _, m := range ml.Members { + if m.IsLearner { + t.Errorf("found stuck learner %s (%d) at size %d", m.Name, m.ID, size) + } + } +} + +// withFailpoint enables a gofail failpoint on the operator pod, runs fn, and +// always disables the failpoint afterward (mirrors the Setup/Teardown pattern +// in TestScaling). +func withFailpoint(t *testing.T, c *envconf.Config, failpoint, term string, fn func()) { + t.Helper() + operator, err := getEtcdOperatorPod(t, c.Client()) + if err != nil { + t.Fatalf("unable to get etcd-operator pod: %v", err) + } + if err := enableGoFailPoint(t, c, operator, failpoint, term); err != nil { + t.Fatalf("unable to enable failpoint %s: %v", failpoint, err) + } + defer func() { + // Re-fetch the pod: a panic failpoint restarts the operator, so the + // pod object captured above may be stale. + op, err := getEtcdOperatorPod(t, c.Client()) + if err != nil { + t.Errorf("unable to get etcd-operator pod for failpoint cleanup: %v", err) + return + } + if err := disableGoFailPoint(t, c, op, failpoint); err != nil { + t.Errorf("unable to disable failpoint %s: %v", failpoint, err) + } + }() + fn() +} + +// TestStressPodRecoveryAtScale deletes a member pod at size 7 and asserts the +// member ID is stable and data is replicated to the recovered pod (extends the +// size-3 TestPodRecovery). +func TestStressPodRecoveryAtScale(t *testing.T) { + ctx := context.Background() + cfg := testEnv.EnvConf() + + name := "etcd-stress-recovery-7" + defer stressTeardown(ctx, t, cfg, name) + podName := fmt.Sprintf("%s-0", name) + targetPodName := fmt.Sprintf("%s-4", name) + + timeToHealthy(ctx, t, cfg, name, 7, healthyTimeout(7)) + + verifyPodUsesPVC(t, cfg, targetPodName, "etcd-data-"+name) + + initialMembers := getEtcdMembersName2IDMapping(t, cfg, podName) + wantID, ok := initialMembers[targetPodName] + if !ok { + t.Fatalf("member %s not present before deletion: %v", targetPodName, initialMembers) + } + + var pod corev1.Pod + if err := cfg.Client().Resources().Get(ctx, targetPodName, namespace, &pod); err != nil { + t.Fatalf("failed to get pod %s: %v", targetPodName, err) + } + deletedUID := pod.UID + if err := cfg.Client().Resources().Delete(ctx, &pod); err != nil { + t.Fatalf("failed to delete pod %s: %v", targetPodName, err) + } + + // Wait for the pod to be recreated with a fresh UID and running. + if err := wait.For(func(ctx context.Context) (bool, error) { + var p corev1.Pod + if err := cfg.Client().Resources().Get(ctx, targetPodName, namespace, &p); err != nil { + return false, nil + } + return p.UID != deletedUID && p.Status.Phase == corev1.PodRunning, nil + }, wait.WithTimeout(5*time.Minute), wait.WithInterval(10*time.Second)); err != nil { + t.Fatalf("pod %s failed to be recreated: %v", targetPodName, err) + } + + waitForClusterHealthy(t, cfg, podName, 7, healthyTimeout(7)) + + verifyPodUsesPVC(t, cfg, targetPodName, "etcd-data-"+name) + + finalMembers := getEtcdMembersName2IDMapping(t, cfg, podName) + if len(finalMembers) != 7 { + t.Errorf("member count changed after recovery: want 7, got %d", len(finalMembers)) + } + if gotID, ok := finalMembers[targetPodName]; !ok { + t.Errorf("member %s missing after recovery", targetPodName) + } else if gotID != wantID { + t.Errorf("member ID for %s changed across recovery: want %d got %d", targetPodName, wantID, gotID) + } + + // Data written via the leader must be readable from the recovered pod. + verifyDataOperations(t, cfg, name, "recovery-at-scale", "value") + stdout, stderr, err := execInPod(t, cfg, targetPodName, namespace, + []string{"etcdctl", "get", "recovery-at-scale", "--print-value-only"}) + if err != nil { + t.Fatalf("failed to read replicated data from %s: %v, stderr: %s", targetPodName, err, stderr) + } + if got := strings.TrimSpace(stdout); got != "value" { + t.Errorf("replication to recovered pod %s failed: want %q got %q", targetPodName, "value", got) + } + assertHashKVConsistent(t, cfg, podName) +} + +// --------------------------------------------------------------------------- +// Section 4: bug-proofs (t.Skip-gated; flip to passing alongside their fix PR) +// --------------------------------------------------------------------------- + +// TestStressVersionUpgrade proves the silent no-op upgrade: bumping +// .spec.version must change the StatefulSet image and roll the pods while +// keeping quorum. Skipped until the version-upgrade path lands. +func TestStressVersionUpgrade(t *testing.T) { + t.Skip("unblocks after PR17") + + ctx := context.Background() + cfg := testEnv.EnvConf() + + name := "etcd-stress-upgrade" + defer stressTeardown(ctx, t, cfg, name) + podName := fmt.Sprintf("%s-0", name) + + // Create at a known patch version, then bump the patch up. + const fromVersion = "v3.6.1" + const toVersion = "v3.6.4" + + etcdCluster := &ecv1alpha1.EtcdCluster{ + ObjectMeta: metaObjectMeta(name), + Spec: ecv1alpha1.EtcdClusterSpec{ + Size: 3, + Version: fromVersion, + StorageSpec: tinyStorage(ctx, t, cfg), + }, + } + if err := cfg.Client().Resources().Create(ctx, etcdCluster); err != nil { + t.Fatalf("failed to create cluster at %s: %v", fromVersion, err) + } + waitForSTSReadiness(t, cfg, name, 3) + waitForClusterHealthy(t, cfg, podName, 3, healthyTimeout(3)) + + imageBefore := stsContainerImage(ctx, t, cfg, name) + if !strings.HasSuffix(imageBefore, fromVersion) { + t.Fatalf("expected initial image to end with %s, got %q", fromVersion, imageBefore) + } + + // Bump the version and watch quorum throughout the rolling restart. + stop := quorumWatcher(ctx, t, cfg, podName) + var ec ecv1alpha1.EtcdCluster + if err := cfg.Client().Resources().Get(ctx, name, namespace, &ec); err != nil { + t.Fatalf("failed to get cluster for upgrade: %v", err) + } + ec.Spec.Version = toVersion + if err := cfg.Client().Resources().Update(ctx, &ec); err != nil { + t.Fatalf("failed to update version to %s: %v", toVersion, err) + } + + // The STS image must actually change. + if err := wait.For(func(ctx context.Context) (bool, error) { + return strings.HasSuffix(stsContainerImage(ctx, t, cfg, name), toVersion), nil + }, wait.WithTimeout(healthyTimeout(3)), wait.WithInterval(10*time.Second)); err != nil { + t.Fatalf("STS image never updated to %s (silent no-op upgrade?): %v", toVersion, err) + } + + waitForSTSReadiness(t, cfg, name, 3) + waitForClusterHealthy(t, cfg, podName, 3, healthyTimeout(3)) + stop() + + assertHashKVConsistent(t, cfg, podName) +} + +// TestStressEvenSizeRejected proves even sizes (2, 4) must be rejected at apply +// time once the odd-size CEL validation lands. +func TestStressEvenSizeRejected(t *testing.T) { + t.Skip("unblocks after PR14 odd-size CEL") + + ctx := context.Background() + cfg := testEnv.EnvConf() + + for _, size := range []int{2, 4} { + size := size + t.Run(fmt.Sprintf("size-%d", size), func(t *testing.T) { + name := fmt.Sprintf("etcd-stress-even-%d", size) + defer stressTeardown(ctx, t, cfg, name) + + etcdCluster := &ecv1alpha1.EtcdCluster{ + ObjectMeta: metaObjectMeta(name), + Spec: ecv1alpha1.EtcdClusterSpec{ + Size: size, + Version: etcdVersion, + }, + } + err := cfg.Client().Resources().Create(ctx, etcdCluster) + if err == nil { + t.Fatalf("expected even size %d to be rejected at apply, but create succeeded", size) + } + }) + } +} + +// TestStressLeaderlessScaleIn proves that removing the member that currently +// holds leadership does not cause a prolonged write-stall / election storm. +// Skipped until the leadership-transfer-before-removal fix lands. +func TestStressLeaderlessScaleIn(t *testing.T) { + t.Skip("unblocks after PR04+PR12") + + ctx := context.Background() + cfg := testEnv.EnvConf() + + name := "etcd-stress-leaderless" + defer stressTeardown(ctx, t, cfg, name) + podName := fmt.Sprintf("%s-0", name) + + timeToHealthy(ctx, t, cfg, name, 3, healthyTimeout(3)) + writeKeyset(t, cfg, name, 5) + + // Identify the leader's ordinal. Scaling in removes the highest ordinals + // first, so to target the leader for removal we want it to be the + // highest-ordinal pod; assert that precondition so the test is meaningful. + _, leaderOrdinal := getEtcdLeader(t, cfg, podName) + if leaderOrdinal != 2 { + t.Skipf("leader is on ordinal %d, not the highest (2); rerun to exercise leader removal", leaderOrdinal) + } + + stop := quorumWatcher(ctx, t, cfg, podName) + scaleEtcdCluster(ctx, t, cfg, name, 1) + waitForSTSReadiness(t, cfg, name, 1) + waitForClusterHealthy(t, cfg, fmt.Sprintf("%s-0", name), 1, healthyTimeout(1)) + stop() // fails the test if any sustained quorum-loss window occurred + + assertKeyset(t, cfg, name, 5) +} + +// --- small shared builders used by the Section 4 tests --- + +func metaObjectMeta(name string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + } +} + +// tinyStorage builds the same 64Mi PVC spec createEtcdClusterWithPVC uses, for +// tests that need to set fields (e.g. Version) the helper does not expose. +func tinyStorage(ctx context.Context, t *testing.T, c *envconf.Config) *ecv1alpha1.StorageSpec { + t.Helper() + return &ecv1alpha1.StorageSpec{ + AccessModes: corev1.ReadWriteOnce, + StorageClassName: getAvailableStorageClass(ctx, t, c), + VolumeSizeRequest: resource.MustParse("64Mi"), + VolumeSizeLimit: resource.MustParse("64Mi"), + } +} + +// stsContainerImage returns the etcd container image set on the cluster's STS. +func stsContainerImage(ctx context.Context, t *testing.T, c *envconf.Config, name string) string { + t.Helper() + var sts appsv1.StatefulSet + if err := c.Client().Resources().Get(ctx, name, namespace, &sts); err != nil { + t.Fatalf("failed to get STS %s: %v", name, err) + } + if len(sts.Spec.Template.Spec.Containers) == 0 { + t.Fatalf("STS %s has no containers", name) + } + return sts.Spec.Template.Spec.Containers[0].Image +} From f76dbb277956595efd6f5bc66d9c2669f2d67b52 Mon Sep 17 00:00:00 2001 From: Xavier Lange Date: Wed, 17 Jun 2026 14:03:30 -0400 Subject: [PATCH 2/2] test(e2e): make quorumWatcher measure quorum, not all-endpoint health quorumWatcher polled `etcdctl endpoint health --cluster`, which reports health for every member in pod-0's member list. During scale churn a member that is mid-join (a not-yet-serving learner) or mid-removal transiently reports unhealthy under --cluster even though quorum is fully intact. That tripped the watcher's 3-consecutive-bad-poll threshold and failed TestStressScaleChurn with a phantom "1 sustained quorum-loss window" (13 unhealthy polls), despite every scale step converging and the keyset staying intact -- i.e. quorum was never actually lost. Switch the watcher to a dedicated endpointHealthQuorum check that runs `etcdctl endpoint health` against pod-0's *local* endpoint only. A healthy result there means etcd committed a proposal through Raft, which requires quorum -- the true write-stall / quorum-loss signal -- and is immune to transient joining/leaving members. endpointHealthAllHealthy is left unchanged for waitForClusterHealthy, where all-endpoints-healthy is the correct convergence gate. With the fix TestStressScaleChurn passes with 0 unhealthy polls. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Xavier Lange --- test/e2e/stress_helpers_test.go | 48 ++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/test/e2e/stress_helpers_test.go b/test/e2e/stress_helpers_test.go index 4ddd1d1a..2baabc1a 100644 --- a/test/e2e/stress_helpers_test.go +++ b/test/e2e/stress_helpers_test.go @@ -64,6 +64,37 @@ func endpointHealthAllHealthy(t *testing.T, c *envconf.Config, podName string) ( return healthy, total, total > 0 && healthy == total } +// endpointHealthQuorum reports whether the cluster reachable through podName +// currently has quorum, by running `etcdctl endpoint health` against podName's +// *local* endpoint only (no --cluster). A healthy result means etcd committed a +// proposal through Raft, which requires a quorum of voting members; so this is a +// true quorum signal. +// +// This is deliberately NOT the same as endpointHealthAllHealthy (which uses +// --cluster and requires *every* member to be healthy). During scale churn a +// member that is mid-join (a not-yet-serving learner) or mid-removal will +// transiently report unhealthy under --cluster even though quorum is fully +// intact; counting those as quorum loss is a false positive. The quorum watcher +// must only flag genuine inability to commit, which is exactly what the local +// endpoint health check measures. +func endpointHealthQuorum(t *testing.T, c *envconf.Config, podName string) bool { + t.Helper() + cmd := []string{"etcdctl", "endpoint", "health"} + stdout, _, err := execInPod(t, c, podName, namespace, cmd) + if err != nil { + // An exec/etcdctl error means the local endpoint could not commit a + // proposal right now: treat as (transiently) no-quorum, to be smoothed + // by the watcher's consecutive-streak tolerance. + return false + } + for _, line := range strings.Split(strings.TrimSpace(stdout), "\n") { + if strings.Contains(line, "is healthy") { + return true + } + } + return false +} + // waitForClusterHealthy blocks until the cluster reachable through podName has // exactly `size` voting members (no learners) AND every endpoint reports // healthy, or the timeout elapses. @@ -178,11 +209,16 @@ func assertHashKVConsistent(t *testing.T, c *envconf.Config, podName string) { } } -// quorumWatcher polls endpoint health every ~2s in the background and records -// any window during which the cluster was not fully healthy (a proxy for -// quorum-loss / write-stall). Call the returned stop func when the churn under -// test is complete; it stops the watcher and fails the test if any unhealthy -// window was observed. +// quorumWatcher polls quorum health every ~2s in the background and records any +// window during which the cluster could not commit through Raft (i.e. lost +// quorum / write-stalled). Call the returned stop func when the churn under test +// is complete; it stops the watcher and fails the test if any such window was +// observed. +// +// It checks quorum via podName's local endpoint (endpointHealthQuorum), NOT +// whole-cluster all-healthy: during scale churn a mid-join learner or a member +// being removed transiently reports unhealthy under --cluster while quorum is +// fully intact, and counting that as quorum loss is a false positive. // // The watcher tolerates a single transient unhealthy poll (one pod momentarily // restarting is normal during scale steps); it only fails on a *sustained* @@ -216,7 +252,7 @@ func quorumWatcher(ctx context.Context, t *testing.T, c *envconf.Config, podName case <-wctx.Done(): return case <-ticker.C: - _, _, ok := endpointHealthAllHealthy(t, c, podName) + ok := endpointHealthQuorum(t, c, podName) mu.Lock() if ok { streak = 0