Skip to content

Commit 13a1b61

Browse files
authored
feat(snapshot-skip B2): plumb metaAppliedIndex through raft-Apply + both snapshot persist sites (#915)
## Summary Implements **Branch 2** of the cold-start snapshot-restore skip optimisation designed in PR #910. After this lands the `metaAppliedIndex` Pebble meta key is durably written on every raft-Apply data mutation AND at every snapshot persist — but the skip gate itself (Branch 3) is NOT yet wired, so behaviour is observationally identical to `main` except for the new meta key in fsm.db. Branch 2 is meant to soak in production for at least one release before Branch 3 enables the skip; this PR is intentionally a no-op-from-the-outside change with comprehensive plumbing. ## Reading order (6 commits, designed to review one-at-a-time) | # | commit | scope | |---|---|---| | 1 | `2339a6f2` | raftengine: opt-in interfaces (`AppliedIndexReader` / `AppliedIndexWriter`) | | 2 | `525fc152` | pebbleStore: `metaAppliedIndex` const + `LastAppliedIndex` + `SetDurableAppliedIndex` (with `pebble.Sync` UNCONDITIONALLY) | | 3 | `aa9b8acc` | `MVCCStore` interface extension: `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads, threading appliedIndex through `applyMutationsWithOpts` + `deletePrefixAtWithOpts` | | 4 | `7cd72bda` | kvFSM seam wiring: `AppliedIndexReader()` / `SetDurableAppliedIndex()` accessors + all 7 data-Apply leaves switched to `*RaftAt` with `f.pendingApplyIdx` | | 5 | `f1e8748c` | engine hooks at BOTH snapshot persist sites: `persistCreatedSnapshot` + `e.persistLocalSnapshotPayload` call `SetDurableAppliedIndex` BEFORE `persist.SaveSnap` | | 6 | `2c42f7d6` | tests (10 new tests across store + engine) | ## Design constraints honoured All from `docs/design/2026_06_02_idempotent_snapshot_restore.md`: - **§2 "Why both leaves"**: meta key bundle in BOTH `applyMutationsWithOpts` AND `deletePrefixAtWithOpts` so DEL_PREFIX entries don't silently leave `LastAppliedIndex` behind. Tested by `TestDeletePrefixAtRaftAt_BundlesMetaAppliedIndex`. - **§3 `dbMu.RLock()`**: both `LastAppliedIndex` and `SetDurableAppliedIndex` acquire the read-lock, matching the lock-ordering discipline at `lsm_store.go:153 / :553 / :675`. - **§4 fallback policy**: `AppliedIndexReader()` returns nil when the store doesn't implement the seam; `LastAppliedIndex` returns `(0, false, nil)` for missing OR truncated meta key. Branch 3 will then fall back to full restore conservatively. - **§6 `ELASTICKV_FSM_SYNC_MODE=nosync` mode**: `SetDurableAppliedIndex` is **pinned to `pebble.Sync` unconditionally**. Rationale documented at length in the method's doc-comment — once `persist.SaveSnap` returns, WAL compaction discards every log entry ≤ `snap.Metadata.Index`, so there's no source to replay the meta key bump from. +1 fsync per snapshot persist (rare; default `SnapshotCount=10000`). Tested by `TestSetDurableAppliedIndex_UsesPebbleSync`. - **§6 "HLC lease entries — checkpoint at snapshot persist"**: BOTH `persistCreatedSnapshot` (config snapshots) AND `e.persistLocalSnapshotPayload` (steady-state `SnapshotCount`-triggered hot path) call the hook. Both crash-ordering tested by `TestPersistCreatedSnapshot_*`. - **§8 compatibility**: `StateMachine.Apply`'s public signature is unchanged. New interfaces are opt-in. Old call sites (`ApplyMutationsRaft` without `*At`) still work, just pass `appliedIndex=0` to opt out of the meta key bump. ## Test results ``` go vet ./... → 0 issues go test ./store/ -short → ok 29.4s go test ./kv/ -short → ok 10.4s go test ./internal/raftengine/... -short → ok 32.8s go test ./store/ -run 'TestLastAppliedIndex|TestSetDurable...|TestApply...|TestDelete...' → ok 1.6s go test ./internal/raftengine/etcd/ -run 'TestRecording|TestPersistCreatedSnapshot_' → ok 0.03s ``` 10 new tests added (see commit `2c42f7d6` for the full inventory). ## What this does NOT do - **Does NOT enable the skip gate.** `restoreSnapshotState` still always restores. Branch 3 wires the `fsmAlreadyAtIndex` check + `applyHeaderStateOnSkip` + the two-phase `SnapshotHeaderApplier` seam. - **Does NOT change `HEALTH_TIMEOUT_SECONDS=300`.** Branch 4 lowers it once Branch 3 has soaked. - **Does NOT touch the snapshot-install hot path** (`Engine.applySnapshot`) per Non-Goals in the design. ## Soak plan Branch 2 should run in production for at least one release before Branch 3 opens. Operators can verify the meta key is being written via: ```bash # Inspect a pebble fsm.db (read-only) ldb --db=/var/lib/elastickv/n3/fsm.db get '_meta_applied_index' --hex # Expected: 8 little-endian bytes equal to the current applied index ``` ## Refs - PR #910 (design) — round 1..7 design history + retraction sections explaining the design constraints this PR honours - PR #909 — `HEALTH_TIMEOUT_SECONDS` band-aid that this series eventually obviates <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Durable tracking of Raft-applied indexes to ensure consistent snapshot/save ordering. * **Bug Fixes** * Improved snapshot persistence reliability by pinning durable applied index before snapshot writes. * Stronger durability for writes bundled with Raft entry indices, reducing restore/recovery surprises. * **Tests** * Added comprehensive tests covering applied-index ordering, failure handling, and persistence behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 0ed19ec + 3951337 commit 13a1b61

13 files changed

Lines changed: 1127 additions & 49 deletions

docs/design/2026_06_02_idempotent_snapshot_restore.md

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -236,21 +236,37 @@ type AppliedIndexReader interface {
236236
}
237237
```
238238

239-
`kvFSM` exposes its store through a typed accessor so wal_store.go can
240-
reach it without importing the concrete pebble type:
239+
`kvFSM` directly satisfies `raftengine.AppliedIndexReader` by
240+
forwarding to its underlying store (PR #915 round-5 — round-1 had a
241+
factory `AppliedIndexReader() AppliedIndexReader` method intended to
242+
be called through a separate `AppliedIndexReporter` interface, but
243+
that pattern would require the skip gate to know about the reporter
244+
shim; the direct interface satisfaction is simpler and a compile-time
245+
guard catches future signature drift):
241246

242247
```go
243-
type AppliedIndexReporter interface {
244-
AppliedIndexReader() AppliedIndexReader
245-
}
246-
func (f *kvFSM) AppliedIndexReader() AppliedIndexReader {
247-
if r, ok := f.store.(AppliedIndexReader); ok {
248-
return r
248+
// kv/fsm.go
249+
func (f *kvFSM) LastAppliedIndex() (uint64, bool, error) {
250+
r, ok := f.store.(raftengine.AppliedIndexReader)
251+
if !ok {
252+
return 0, false, nil
249253
}
250-
return nil
254+
idx, present, err := r.LastAppliedIndex()
255+
if err != nil {
256+
return 0, false, errors.WithStack(err)
257+
}
258+
return idx, present, nil
251259
}
260+
261+
// kv/fsm_applied_index_iface_check.go (compile-time guard)
262+
var _ raftengine.AppliedIndexReader = (*kvFSM)(nil)
263+
var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil)
252264
```
253265

266+
The compile-time guard means any future rename or signature drift
267+
fails `go build` immediately — the soak investment is protected at
268+
the compiler level.
269+
254270
### 4. Conditional restore (with conservative error fallback)
255271

256272
```go
@@ -279,16 +295,20 @@ func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir
279295

280296
// fsmAlreadyAtIndex returns true ONLY when we can prove the FSM is
281297
// already at or past `want`. Any uncertainty -- FSM doesn't expose
282-
// the reporter, store doesn't expose the reader, read error, or
283-
// missing meta key -- returns false so we fall back to the full
284-
// restore. A stale-but-incorrect skip is far worse than a wasteful
285-
// full restore; the fallback errs strictly toward restoring.
298+
// the reader interface, read error, or missing meta key -- returns
299+
// false so we fall back to the full restore. A stale-but-incorrect
300+
// skip is far worse than a wasteful full restore; the fallback errs
301+
// strictly toward restoring.
302+
//
303+
// Direct type-assert against raftengine.AppliedIndexReader (PR #915
304+
// round-5): kvFSM satisfies the interface directly via its
305+
// LastAppliedIndex method, so no separate AppliedIndexReporter shim
306+
// is needed. The compile-time guard in kv/fsm_applied_index_iface_check.go
307+
// keeps this stable.
286308
func fsmAlreadyAtIndex(fsm StateMachine, want uint64) bool {
287-
reporter, ok := fsm.(AppliedIndexReporter)
309+
r, ok := fsm.(raftengine.AppliedIndexReader)
288310
if !ok { return false }
289-
reader := reporter.AppliedIndexReader()
290-
if reader == nil { return false }
291-
have, present, err := reader.LastAppliedIndex()
311+
have, present, err := r.LastAppliedIndex()
292312
if err != nil || !present { return false }
293313
return have >= want
294314
}
@@ -814,8 +834,12 @@ present.
814834
- `ApplyIndexAware` is **already** in `main`; this design only adds
815835
consumers.
816836
- The new opt-in interfaces (`AppliedIndexReader`,
817-
`AppliedIndexReporter`, `SnapshotHeaderApplier`) are additive.
837+
`AppliedIndexWriter`, `SnapshotHeaderApplier`) are additive.
818838
FSMs that don't implement them fall back to the current behaviour.
839+
(Round-1 / round-2 of this doc mentioned an `AppliedIndexReporter`
840+
factory-method shim; PR #915 round-5 superseded it by having
841+
`kvFSM` satisfy `AppliedIndexReader` directly via its
842+
`LastAppliedIndex` method.)
819843
- `metaAppliedIndexBytes` is new. Older fsm.db files don't have it.
820844
The `present=false` branch makes the first restart after upgrade
821845
fall back to full restore, populating the meta key from the next
@@ -863,7 +887,7 @@ restoreSnapshotState skipped (FSM at index %d, snapshot at %d, ceiling=%d, cutov
863887
| Branch | Content | Behaviour change |
864888
|---|---|---|
865889
| **B1** (this PR) | Design doc | None |
866-
| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()`, **`pebble.Sync` unconditionally**) + `kvFSM.AppliedIndexReader()` accessor + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. |
890+
| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()` + `applyMu.Lock()` RMW monotonic guard, **`pebble.Sync` unconditionally**) + `kvFSM.LastAppliedIndex()` directly satisfies `raftengine.AppliedIndexReader` (compile-time guard in `kv/fsm_applied_index_iface_check.go`) + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. |
867891
| **B3** | `restoreSnapshotState` skip gate + `applyHeaderStateOnSkip(snapPath, tok.CRC32C)` orchestrating size + footer-vs-tokenCRC + full-body-CRC verification using `internal/raftengine/etcd`'s existing helpers (matching `openAndRestoreFSMSnapshot`'s safety contract) + two-phase `SnapshotHeaderApplier` seam on `kvFSM` (`ParseSnapshotHeader(r io.Reader) (ceiling, cutover, err)` + pure `ApplySnapshotHeader(ceiling, cutover)`) + metrics + INFO log | **User-visible cold-start win.** |
868892
| **B4** | Lower `HEALTH_TIMEOUT_SECONDS` default once production data shows steady-state skip rate ≥ 90 % | Tighter ceiling; the env override remains honoured. |
869893

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/aws/smithy-go v1.25.1
1414
github.com/cockroachdb/errors v1.13.0
1515
github.com/cockroachdb/pebble/v2 v2.1.5
16+
github.com/coreos/go-semver v0.3.1
1617
github.com/emirpasic/gods v1.18.1
1718
github.com/getsentry/sentry-go v0.46.2
1819
github.com/goccy/go-json v0.10.6
@@ -58,7 +59,6 @@ require (
5859
github.com/cockroachdb/redact v1.1.5 // indirect
5960
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect
6061
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
61-
github.com/coreos/go-semver v0.3.1 // indirect
6262
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
6363
github.com/davecgh/go-spew v1.1.1 // indirect
6464
github.com/dustin/go-humanize v1.0.1 // indirect

internal/raftengine/etcd/engine.go

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,6 +2026,15 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error {
20262026
if err != nil {
20272027
return errors.Wrapf(err, "decode snapshot token index=%d", snapshot.Metadata.Index)
20282028
}
2029+
// B3/follow-up: also call SetDurableAppliedIndex(tok.Index) here
2030+
// after Restore so peer-after-InstallSnapshot populates the meta
2031+
// key. The local-snapshot persist path already bumps the live
2032+
// store (engine.persistLocalSnapshotPayload), but the receiving
2033+
// node's restored store inherits the pre-bump value embedded in
2034+
// the snapshot artifact. Design Non-Goals §
2035+
// docs/design/2026_06_02_idempotent_snapshot_restore.md:71-74
2036+
// scopes this out of Branch 2; see PR #915 round-4/5 codex P2 on
2037+
// engine.go:4077 for the rationale.
20292038
if err := openAndRestoreFSMSnapshot(e.fsm, fsmSnapPath(e.fsmSnapDir, tok.Index), tok.CRC32C); err != nil {
20302039
return errors.Wrapf(err, "restore fsm snapshot file index=%d crc=%08x", tok.Index, tok.CRC32C)
20312040
}
@@ -2704,10 +2713,38 @@ func (e *Engine) createConfigSnapshot(index uint64, confState raftpb.ConfState,
27042713
}
27052714
}
27062715

2716+
// bumpDurableAppliedIndexBeforeSave pins the FSM's durable applied
2717+
// index to `index` BEFORE the engine calls persist.SaveSnap, so a
2718+
// successful snapshot persist always implies LastAppliedIndex >=
2719+
// snap.Metadata.Index — closes the HLC-lease-only / encryption-only
2720+
// fallback (PR #910 design §6).
2721+
//
2722+
// FSMs that do not expose raftengine.AppliedIndexWriter silently
2723+
// no-op; the skip optimisation falls back to full restore for them
2724+
// (legacy test fakes, in-memory backends). pebble.Sync is forced on
2725+
// the writer side regardless of ELASTICKV_FSM_SYNC_MODE — once
2726+
// persist.SaveSnap returns, WAL compaction discards every log entry
2727+
// at or before snap.Metadata.Index, so there is no source to replay
2728+
// the meta key bump from.
2729+
//
2730+
// Used by BOTH snapshot persist sites: persistCreatedSnapshot (this
2731+
// file) and e.persistLocalSnapshotPayload (the steady-state
2732+
// SnapshotCount-triggered hot path).
2733+
func (e *Engine) bumpDurableAppliedIndexBeforeSave(index uint64) error {
2734+
w, ok := e.fsm.(raftengine.AppliedIndexWriter)
2735+
if !ok {
2736+
return nil
2737+
}
2738+
return errors.WithStack(w.SetDurableAppliedIndex(index))
2739+
}
2740+
27072741
func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
27082742
if etcdraft.IsEmptySnap(snap) || e.persist == nil {
27092743
return nil
27102744
}
2745+
if err := e.bumpDurableAppliedIndexBeforeSave(snap.Metadata.Index); err != nil {
2746+
return err
2747+
}
27112748
if err := e.persist.SaveSnap(snap); err != nil {
27122749
return errors.WithStack(err)
27132750
}
@@ -4072,29 +4109,49 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
40724109
return nil
40734110
}
40744111

4112+
if err := e.bumpDurableAppliedIndexBeforeSave(index); err != nil {
4113+
return err
4114+
}
4115+
40754116
_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
4117+
return e.handleLocalSnapshotPersistResult(err)
4118+
}
4119+
4120+
// handleLocalSnapshotPersistResult collapses the post-SaveSnap error
4121+
// switch into a single helper so persistLocalSnapshotPayload stays
4122+
// under the cyclomatic-complexity budget. The three raft-side
4123+
// 'snapshot already moved on' cases (ErrCompacted / ErrUnavailable /
4124+
// ErrSnapOutOfDate) are all treated as no-ops; only the success path
4125+
// runs the disk-side purge.
4126+
func (e *Engine) handleLocalSnapshotPersistResult(err error) error {
40764127
switch {
40774128
case err == nil:
4078-
snapDir := filepath.Join(e.dataDir, snapDirName)
4079-
if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil {
4080-
slog.Warn("failed to purge old snap files", "error", purgeErr)
4081-
}
4082-
walDir := filepath.Join(e.dataDir, walDirName)
4083-
if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil {
4084-
slog.Warn("failed to purge old wal files", "error", purgeErr)
4085-
}
4086-
return nil
4087-
case errors.Is(err, etcdraft.ErrCompacted):
4088-
return nil
4089-
case errors.Is(err, etcdraft.ErrUnavailable):
4129+
e.purgeAfterLocalSnapshot()
40904130
return nil
4091-
case errors.Is(err, etcdraft.ErrSnapOutOfDate):
4131+
case errors.Is(err, etcdraft.ErrCompacted),
4132+
errors.Is(err, etcdraft.ErrUnavailable),
4133+
errors.Is(err, etcdraft.ErrSnapOutOfDate):
40924134
return nil
40934135
default:
40944136
return err
40954137
}
40964138
}
40974139

4140+
// purgeAfterLocalSnapshot runs the disk-side cleanup that follows a
4141+
// successful local-snapshot persist: trim old .snap/.fsm files and
4142+
// rotate ageing WAL segments. Both calls log on error but do not
4143+
// propagate — failing to purge is non-fatal.
4144+
func (e *Engine) purgeAfterLocalSnapshot() {
4145+
snapDir := filepath.Join(e.dataDir, snapDirName)
4146+
if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil {
4147+
slog.Warn("failed to purge old snap files", "error", purgeErr)
4148+
}
4149+
walDir := filepath.Join(e.dataDir, walDirName)
4150+
if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil {
4151+
slog.Warn("failed to purge old wal files", "error", purgeErr)
4152+
}
4153+
}
4154+
40984155
func encodeReadContext(id uint64) []byte {
40994156
out := make([]byte, envelopeHeaderSize)
41004157
out[0] = readContextVersion

0 commit comments

Comments
 (0)