Skip to content

Commit 6a5e493

Browse files
authored
fix(raft): bump snapshot spool cap to 16 GiB + env override (#746)
## Summary Receive-side snapshot spool was hardcoded to 1 GiB. Production FSM snapshots at 1.35 GiB exceeded that ceiling: `snapshotSpool.Write` returned `errSnapshotPayloadTooLarge` mid-stream, the gRPC `SendSnapshot` stream broke, and etcd raft retried the snapshot indefinitely. This PR raises the default to 16 GiB and adds an `ELASTICKV_RAFT_MAX_SNAPSHOT_PAYLOAD_BYTES` env override. ## Production incident — 2026-05-08 Two followers (192.168.0.211 and 192.168.0.213) fell behind the leader's log during an earlier OOM cascade. The leader truncated past their match indices, so catch-up required a full FSM snapshot. Each transfer attempt: 1. Leader streams 1.35 GiB FSM via `streamFSMSnapshot` (no send-side cap) 2. Receiver writes chunks into `snapshotSpool` 3. At ~1 GiB the spool returns `errSnapshotPayloadTooLarge` 4. Receive returns error → gRPC stream closed → leader sees EOF 5. etcd raft fires `Progress.PendingSnapshot` retry → loop Symptoms observed: - Follower 213 frozen at `applied=26,459,962` (over 1.16M entries behind, never moved for 4+ hours) - Leader 210 sustained ~100 MB/s outbound for hours - Host disks at 73-99% util, ~125 MB/s sustained - Container 211 receive dir contained `elastickv-etcd-snapshot-<random>` files whose IDs changed every probe — visual confirmation of the receive-then-discard loop - Goroutine 1573 on leader stuck in `streamFSMSnapshot` → `sendSnapshotChunk` → gRPC `writeQuota.get` (HTTP/2 flow-control), waiting for receiver acks that never came because the receive had already errored out Cluster impact: 4/5 voters caught up was sufficient for write quorum, so the cluster stayed up; but two followers were perpetually stale and the leader's CPU + disk were burned on the futile retries. ## Fix ```go const defaultMaxSnapshotPayloadBytes int64 = 16 << 30 // 16 GiB ``` - **16 GiB** is sized as ~12× the production-observed FSM size, well past the runway. - **Per-spool capture**: `maxSize` is resolved at `newSnapshotSpool` time and read-only thereafter, so a test (or future env flip) cannot tear an in-flight receive. - **`ELASTICKV_RAFT_MAX_SNAPSHOT_PAYLOAD_BYTES`** env override for operators on extreme-data deployments. Invalid values fall back to the default with a `slog.Warn` (fail-soft so a typo doesn't zero the cap and break every receive). The cap still exists — defense against a misbehaving / compromised peer streaming unbounded data into the spool dir survives — but at a magnitude that is realistic. ## Self-review (5 lenses) 1. **Data loss** — none. The cap was rejecting valid snapshots; raising it lets receivers accept FSM transfers they should already have been accepting. No persisted state changes. 2. **Concurrency** — `maxSize` captured at construction, read-only thereafter. No new locks. The env resolver is plain `os.Getenv` + `ParseInt`; no shared state. 3. **Performance** — one `Getenv` + `ParseInt` per snapshot creation. Snapshots are infrequent (hours-scale on a stable cluster), so negligible. The 16 GiB default does NOT pre-allocate; the spool grows on disk only as bytes arrive. 4. **Data consistency** — snapshot integrity unchanged. The fix only widens the reception envelope; the same chunk-validation, metadata, and final-flag handling apply. 5. **Test coverage**: - `TestSnapshotSpool_DefaultCapAcceptsRealisticFSM` writes 1.5 GiB through `Write` (skipped under `-short` to keep `make test` fast). - `TestSnapshotSpool_OverrideViaEnv` exercises a lowered-cap value to confirm the env knob actually moves the cap and the `errSnapshotPayloadTooLarge` sentinel still surfaces past it. - `TestSnapshotSpool_OverrideInvalidFallsBack` pins fail-soft on malformed env input so a typo doesn't zero the cap. ## Test plan - [x] `go test -race -count=1 -short ./internal/raftengine/etcd` — 11.4s, all green - [x] `go test -race -count=1 -run TestSnapshotSpool_DefaultCapAcceptsRealisticFSM ./internal/raftengine/etcd` — 1.96s, green (1.5 GiB write succeeds) - [ ] After merge: deploy to 192.168.0.x cluster, verify 213 receives a fresh snapshot and `applied_index` advances to match the leader ## Follow-up (separate PRs) - `snapshotSpool.Bytes()` materializes the entire payload as `[]byte` for `RawNode.Step`. With 16 GiB allowed this is a real OOM risk on memory-constrained nodes. Streaming snapshot apply (the FSM-side path bypassing `raftpb` materialization) is the next step. - Make the leader respect a follower-advertised receive cap so a cluster running mixed binaries can negotiate a safe value. - 211/213 formal recovery: now that this PR unblocks snapshot completion, plan the operational steps to re-add 211 (currently stopped, data wiped) via a Learner path. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Snapshot payload size limit is now configurable via `ELASTICKV_RAFT_MAX_SNAPSHOT_PAYLOAD_BYTES` environment variable (default: 16 GiB). * Invalid environment values gracefully fall back to default configuration. * **Bug Fixes** * Enhanced error messages when snapshots exceed limits, displaying requested size versus configured limit. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 5f12d5d + 903a5f7 commit 6a5e493

2 files changed

Lines changed: 150 additions & 18 deletions

File tree

internal/raftengine/etcd/snapshot_spool.go

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,82 @@ package etcd
22

33
import (
44
"io"
5+
"log/slog"
56
"os"
67
"path/filepath"
8+
"strconv"
9+
"strings"
710

811
"github.com/cockroachdb/errors"
912
)
1013

11-
var (
12-
// The current raftpb snapshot APIs still materialize payloads as []byte, so
13-
// the prototype cannot stream snapshots end-to-end yet. Keep the payload on
14-
// disk while assembling it and fail fast before unbounded growth.
15-
maxSnapshotPayloadBytes int64 = 1 << 30 // 1 GiB
14+
// defaultMaxSnapshotPayloadBytes is the receive-side cap on a single snapshot
15+
// stream's spooled payload. Production hit a 1 GiB ceiling here that was
16+
// silently rejecting real-world FSM transfers (1.35 GiB+), so the receiver
17+
// returned errSnapshotPayloadTooLarge mid-stream, the gRPC stream broke,
18+
// and etcd raft retried — indefinitely, since each retry hit the same wall.
19+
// Followers stuck at stale applied indices, leader sustained ~100 MB/s
20+
// outbound, host disks saturated for hours.
21+
//
22+
// 16 GiB is sized as ~12× the production-observed FSM size so the limit
23+
// does not drift back into the runway as data grows. The cap still exists
24+
// so a misbehaving / compromised peer cannot stream unbounded data into
25+
// the spool dir; operators can raise it further via
26+
// ELASTICKV_RAFT_MAX_SNAPSHOT_PAYLOAD_BYTES if a real FSM ever exceeds
27+
// even this default.
28+
const defaultMaxSnapshotPayloadBytes int64 = 16 << 30 // 16 GiB
1629

17-
errSnapshotPayloadTooLarge = errors.New("etcd raft snapshot payload exceeds limit")
18-
)
30+
const maxSnapshotPayloadBytesEnvVar = "ELASTICKV_RAFT_MAX_SNAPSHOT_PAYLOAD_BYTES"
31+
32+
// resolveMaxSnapshotPayloadBytes evaluates the env override once per spool
33+
// creation. Snapshots are infrequent enough that one Getenv + ParseInt per
34+
// spool is invisible in profiles, and resolving at construction means tests
35+
// that flip the env via t.Setenv don't have to mutate process-wide globals.
36+
func resolveMaxSnapshotPayloadBytes() int64 {
37+
v := strings.TrimSpace(os.Getenv(maxSnapshotPayloadBytesEnvVar))
38+
if v == "" {
39+
return defaultMaxSnapshotPayloadBytes
40+
}
41+
n, err := strconv.ParseInt(v, 10, 64)
42+
if err != nil || n <= 0 {
43+
slog.Warn("invalid ELASTICKV_RAFT_MAX_SNAPSHOT_PAYLOAD_BYTES; using default",
44+
"value", v, "default_bytes", defaultMaxSnapshotPayloadBytes)
45+
return defaultMaxSnapshotPayloadBytes
46+
}
47+
return n
48+
}
49+
50+
var errSnapshotPayloadTooLarge = errors.New("etcd raft snapshot payload exceeds limit")
1951

2052
const snapshotSpoolPattern = "elastickv-etcd-snapshot-*"
2153

2254
type snapshotSpool struct {
23-
file *os.File
24-
path string
25-
size int64
55+
file *os.File
56+
path string
57+
size int64
58+
maxSize int64
2659
}
2760

2861
func newSnapshotSpool(dir string) (*snapshotSpool, error) {
2962
file, err := os.CreateTemp(dir, snapshotSpoolPattern)
3063
if err != nil {
3164
return nil, errors.WithStack(err)
3265
}
33-
return &snapshotSpool{file: file, path: file.Name()}, nil
66+
return &snapshotSpool{
67+
file: file,
68+
path: file.Name(),
69+
maxSize: resolveMaxSnapshotPayloadBytes(),
70+
}, nil
3471
}
3572

3673
func (s *snapshotSpool) Write(p []byte) (int, error) {
37-
if int64(len(p))+s.size > maxSnapshotPayloadBytes {
38-
return 0, errors.Wrapf(errSnapshotPayloadTooLarge, "%d > %d", int64(len(p))+s.size, maxSnapshotPayloadBytes)
74+
// Subtraction-based comparison so the cap check stays correct even when
75+
// s.maxSize is set to a value near math.MaxInt64 via the env override:
76+
// `int64(len(p))+s.size > s.maxSize` would overflow into a negative number
77+
// at large maxSize and let the write through. `int64(len(p)) > s.maxSize-s.size`
78+
// stays in [0, maxSize] and rejects the same payloads correctly.
79+
if int64(len(p)) > s.maxSize-s.size {
80+
return 0, errors.Wrapf(errSnapshotPayloadTooLarge, "adding %d bytes to current %d would exceed limit %d", len(p), s.size, s.maxSize)
3981
}
4082
n, err := s.file.Write(p)
4183
s.size += int64(n)
@@ -49,13 +91,17 @@ func (s *snapshotSpool) Bytes() ([]byte, error) {
4991
if _, err := s.file.Seek(0, io.SeekStart); err != nil {
5092
return nil, errors.WithStack(err)
5193
}
52-
// Read incrementally instead of sizing a buffer from s.size so malformed
53-
// inputs stay bounded by maxSnapshotPayloadBytes and file-backed I/O.
54-
data, err := io.ReadAll(s.file)
55-
if err != nil {
94+
// Pre-allocate from the bytes we have already accepted past Write's
95+
// per-call cap check, instead of letting io.ReadAll grow the buffer
96+
// through several power-of-two doublings (a 1.35 GiB receive would
97+
// trigger ~30 reallocs and copy the running total each time). s.size
98+
// is the truth-of-record for what's on disk because Write only
99+
// increments it on successful os.File.Write returns.
100+
buf := make([]byte, s.size)
101+
if _, err := io.ReadFull(s.file, buf); err != nil {
56102
return nil, errors.WithStack(err)
57103
}
58-
return data, nil
104+
return buf, nil
59105
}
60106

61107
func (s *snapshotSpool) Reader() (io.Reader, error) {

internal/raftengine/etcd/snapshot_spool_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,100 @@
11
package etcd
22

33
import (
4+
"bytes"
45
"fmt"
56
"os"
67
"path/filepath"
8+
"strconv"
79
"testing"
810

11+
"github.com/cockroachdb/errors"
912
"github.com/stretchr/testify/require"
1013
)
1114

15+
// TestSnapshotSpool_DefaultCapAcceptsRealisticFSM pins the regression behind
16+
// the 2026-05-08 incident: with the prior 1 GiB hardcoded cap, any real-world
17+
// FSM (production observed 1.35 GiB) failed mid-stream with
18+
// errSnapshotPayloadTooLarge, breaking the gRPC snapshot stream and locking
19+
// the leader/follower into a retransmit loop. The default cap must accept at
20+
// least 1.5 GiB without env override.
21+
func TestSnapshotSpool_DefaultCapAcceptsRealisticFSM(t *testing.T) {
22+
if testing.Short() {
23+
t.Skip("skipping: writes 1.5 GiB to a temp file")
24+
}
25+
dir := t.TempDir()
26+
spool, err := newSnapshotSpool(dir)
27+
require.NoError(t, err)
28+
t.Cleanup(func() { _ = spool.Close() })
29+
30+
// 1.5 GiB exceeds the legacy 1 GiB ceiling and matches realistic
31+
// production FSM sizes within the same order of magnitude.
32+
const target = int64(1536) << 20 // 1.5 GiB
33+
const chunk = 8 << 20 // 8 MiB writes mirror the gRPC snapshot chunk size order
34+
buf := bytes.Repeat([]byte{0xAB}, chunk)
35+
36+
var written int64
37+
for written < target {
38+
toWrite := chunk
39+
if remaining := target - written; remaining < int64(chunk) {
40+
toWrite = int(remaining)
41+
}
42+
n, err := spool.Write(buf[:toWrite])
43+
require.NoError(t, err, "write at offset %d unexpectedly failed", written)
44+
require.Equal(t, toWrite, n)
45+
written += int64(n)
46+
}
47+
require.Equal(t, target, spool.size)
48+
49+
// Round-trip through the materialization path (the io.ReadAll →
50+
// io.ReadFull refactor) to lock down behaviour at 1.5 GiB. The
51+
// returned slice MUST match s.size exactly: a short read here would
52+
// indicate the pre-allocation drifted out of sync with what Write
53+
// actually persisted to the spool file.
54+
got, err := spool.Bytes()
55+
require.NoError(t, err)
56+
require.Equal(t, int(target), len(got), "Bytes() returned %d, want %d", len(got), target)
57+
// Spot-check first/last bytes match the 0xAB fill; full byte-equality
58+
// would double the test's memory cost without adding signal.
59+
require.Equal(t, byte(0xAB), got[0])
60+
require.Equal(t, byte(0xAB), got[len(got)-1])
61+
}
62+
63+
// TestSnapshotSpool_OverrideViaEnv confirms the env knob actually moves the
64+
// cap. Tests deliberately *lower* it (cheap to write past) instead of
65+
// raising — the upper-bound test above already proves a generous cap works.
66+
func TestSnapshotSpool_OverrideViaEnv(t *testing.T) {
67+
const spoolCap = int64(4096)
68+
t.Setenv(maxSnapshotPayloadBytesEnvVar, strconv.FormatInt(spoolCap, 10))
69+
70+
spool, err := newSnapshotSpool(t.TempDir())
71+
require.NoError(t, err)
72+
t.Cleanup(func() { _ = spool.Close() })
73+
74+
require.Equal(t, spoolCap, spool.maxSize)
75+
76+
// Write up to the cap — succeeds.
77+
_, err = spool.Write(bytes.Repeat([]byte{0x01}, int(spoolCap)))
78+
require.NoError(t, err)
79+
80+
// One byte past — fails with the documented sentinel so callers can
81+
// errors.Is against errSnapshotPayloadTooLarge for telemetry.
82+
_, err = spool.Write([]byte{0x02})
83+
require.Error(t, err)
84+
require.True(t, errors.Is(err, errSnapshotPayloadTooLarge), "got %v", err)
85+
}
86+
87+
// TestSnapshotSpool_OverrideInvalidFallsBack pins the resolver's
88+
// fail-soft behaviour: a malformed env value must NOT zero the cap (which
89+
// would make every receive fail) — it falls back to the default.
90+
func TestSnapshotSpool_OverrideInvalidFallsBack(t *testing.T) {
91+
t.Setenv(maxSnapshotPayloadBytesEnvVar, "not-a-number")
92+
spool, err := newSnapshotSpool(t.TempDir())
93+
require.NoError(t, err)
94+
t.Cleanup(func() { _ = spool.Close() })
95+
require.Equal(t, defaultMaxSnapshotPayloadBytes, spool.maxSize)
96+
}
97+
1298
func TestCleanupStaleSnapshotSpools(t *testing.T) {
1399
dir := t.TempDir()
14100

0 commit comments

Comments
 (0)