-
Notifications
You must be signed in to change notification settings - Fork 2
feat(snapshot-skip B2): plumb metaAppliedIndex through raft-Apply + both snapshot persist sites #915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(snapshot-skip B2): plumb metaAppliedIndex through raft-Apply + both snapshot persist sites #915
Changes from 6 commits
2339a6f
525fc15
aa9b8ac
7cd72bd
f1e8748
2c42f7d
7001a93
b957e1a
5684857
a8e2cf5
3951337
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2680,6 +2680,22 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error { | |
| if etcdraft.IsEmptySnap(snap) || e.persist == nil { | ||
| return nil | ||
| } | ||
| // Pin metaAppliedIndex to snap.Metadata.Index BEFORE SaveSnap so a | ||
| // successful snapshot persist always implies LastAppliedIndex >= | ||
| // snap.Metadata.Index — closes the HLC-lease-only / encryption-only | ||
| // fallback (PR #910 design §6). FSMs that do not expose | ||
| // raftengine.AppliedIndexWriter silently no-op; the skip | ||
| // optimisation falls back to full restore for them. pebble.Sync | ||
| // is forced on the writer side (see lsm_store.SetDurableAppliedIndex) | ||
| // regardless of ELASTICKV_FSM_SYNC_MODE — once SaveSnap returns, | ||
| // WAL compaction discards every log entry at or before | ||
| // snap.Metadata.Index, so there is no source to replay the meta | ||
| // key bump from. | ||
| if w, ok := e.fsm.(raftengine.AppliedIndexWriter); ok { | ||
| if err := w.SetDurableAppliedIndex(snap.Metadata.Index); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| } | ||
| if err := e.persist.SaveSnap(snap); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
|
|
@@ -4044,6 +4060,24 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error | |
| return nil | ||
| } | ||
|
|
||
| // Pin metaAppliedIndex to `index` BEFORE the free-function | ||
| // persistLocalSnapshotPayload (which calls persist.SaveSnap at | ||
| // wal_store.go:524). This is the steady-state SnapshotCount-triggered | ||
| // snapshot path — the hot path the cold-start skip optimisation | ||
| // depends on. Without this hook the round-3 P2 fallback (HLC | ||
| // leases / encryption ops keep snapshot.Metadata.Index ahead of the | ||
| // last data-Apply index forever) recurs permanently. See PR #910 | ||
| // design §6 'HLC lease entries — checkpoint at snapshot persist' | ||
| // and the round-5 retraction documenting why | ||
| // persistCreatedSnapshot alone is insufficient. pebble.Sync is | ||
| // forced on the writer side regardless of ELASTICKV_FSM_SYNC_MODE | ||
| // (lsm_store.SetDurableAppliedIndex inline comment). | ||
| if w, ok := e.fsm.(raftengine.AppliedIndexWriter); ok { | ||
| if err := w.SetDurableAppliedIndex(index); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| } | ||
|
Comment on lines
+4084
to
+4086
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the steady-state snapshot path, the FSM snapshot has already been serialized in Useful? React with 👍 / 👎. |
||
|
|
||
| _, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload) | ||
| switch { | ||
| case err == nil: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| package etcd | ||
|
|
||
| import ( | ||
| "io" | ||
| "sync" | ||
| "testing" | ||
|
|
||
| "github.com/bootjp/elastickv/internal/raftengine" | ||
| "github.com/coreos/go-semver/semver" | ||
| "github.com/stretchr/testify/require" | ||
| raftpb "go.etcd.io/raft/v3/raftpb" | ||
| ) | ||
|
|
||
| // applyIndexOrderRecorder is shared between the recording FSM and | ||
| // the recording persist storage so the test can assert the | ||
| // crash-ordering invariant (SetDurableAppliedIndex MUST run before | ||
| // persist.SaveSnap). Both record into a single ordered slice keyed | ||
| // by event kind; the test reads it back to verify the sequence. | ||
| type applyIndexOrderRecorder struct { | ||
| mu sync.Mutex | ||
| events []orderEvent | ||
| } | ||
|
|
||
| type orderEvent struct { | ||
| kind string // "bump" | "save" | ||
| index uint64 | ||
| } | ||
|
|
||
| func (r *applyIndexOrderRecorder) record(kind string, idx uint64) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| r.events = append(r.events, orderEvent{kind: kind, index: idx}) | ||
| } | ||
|
|
||
| func (r *applyIndexOrderRecorder) snapshot() []orderEvent { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| out := make([]orderEvent, len(r.events)) | ||
| copy(out, r.events) | ||
| return out | ||
| } | ||
|
|
||
| // recordingAppliedIndexFSM implements StateMachine + | ||
| // raftengine.AppliedIndexWriter. It records every | ||
| // SetDurableAppliedIndex call into the shared recorder. | ||
| type recordingAppliedIndexFSM struct { | ||
| rec *applyIndexOrderRecorder | ||
| failNext bool | ||
| failErr error | ||
| } | ||
|
|
||
| func (f *recordingAppliedIndexFSM) Apply(_ []byte) any { return nil } | ||
| func (f *recordingAppliedIndexFSM) Snapshot() (Snapshot, error) { return nil, io.EOF } | ||
| func (f *recordingAppliedIndexFSM) Restore(_ io.Reader) error { return nil } | ||
|
|
||
| func (f *recordingAppliedIndexFSM) SetDurableAppliedIndex(idx uint64) error { | ||
| if f.failNext { | ||
| f.failNext = false | ||
| return f.failErr | ||
| } | ||
| f.rec.record("bump", idx) | ||
| return nil | ||
| } | ||
|
|
||
| // recordingPersistStorage is a minimal etcdstorage.Storage stand-in | ||
| // that records SaveSnap calls into the shared recorder. The hook | ||
| // only calls SaveSnap + Release; the rest are stubs. | ||
| type recordingPersistStorage struct { | ||
| rec *applyIndexOrderRecorder | ||
| } | ||
|
|
||
| func (p *recordingPersistStorage) SaveSnap(snap raftpb.Snapshot) error { | ||
| p.rec.record("save", snap.Metadata.Index) | ||
| return nil | ||
| } | ||
|
|
||
| func (p *recordingPersistStorage) Save(_ raftpb.HardState, _ []raftpb.Entry) error { return nil } | ||
| func (p *recordingPersistStorage) Release(_ raftpb.Snapshot) error { return nil } | ||
| func (p *recordingPersistStorage) Sync() error { return nil } | ||
| func (p *recordingPersistStorage) Close() error { return nil } | ||
| func (p *recordingPersistStorage) MinimalEtcdVersion() *semver.Version { return nil } | ||
|
|
||
| // TestRecordingFSM_SatisfiesAppliedIndexWriter is a compile-time- | ||
| // adjacent assertion: the recording FSM MUST satisfy the writer | ||
| // seam so the engine hook actually fires for it. | ||
| func TestRecordingFSM_SatisfiesAppliedIndexWriter(t *testing.T) { | ||
| rec := &applyIndexOrderRecorder{} | ||
| var f any = &recordingAppliedIndexFSM{rec: rec} | ||
| _, ok := f.(raftengine.AppliedIndexWriter) | ||
| require.True(t, ok, "recordingAppliedIndexFSM must implement raftengine.AppliedIndexWriter") | ||
| } | ||
|
|
||
| // TestPersistCreatedSnapshot_BumpsAppliedIndex exercises Site 1 of | ||
| // the persist hook. We invoke (*Engine).persistCreatedSnapshot | ||
| // directly; the engine MUST call SetDurableAppliedIndex | ||
| // (snap.Metadata.Index) BEFORE SaveSnap. | ||
| func TestPersistCreatedSnapshot_BumpsAppliedIndex(t *testing.T) { | ||
| rec := &applyIndexOrderRecorder{} | ||
| fsm := &recordingAppliedIndexFSM{rec: rec} | ||
| persist := &recordingPersistStorage{rec: rec} | ||
| e := &Engine{fsm: fsm, persist: persist} | ||
|
|
||
| snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 42, Term: 1}} | ||
| require.NoError(t, e.persistCreatedSnapshot(snap)) | ||
|
|
||
| require.Equal(t, []orderEvent{ | ||
| {kind: "bump", index: 42}, | ||
| {kind: "save", index: 42}, | ||
| }, rec.snapshot(), | ||
| "hook MUST call SetDurableAppliedIndex BEFORE SaveSnap") | ||
| } | ||
|
|
||
| // TestPersistCreatedSnapshot_NilFSMNoOp covers the legacy / test- | ||
| // fake case: an FSM that does NOT implement AppliedIndexWriter | ||
| // silently no-ops; snapshot persist still runs. | ||
| func TestPersistCreatedSnapshot_NilFSMNoOp(t *testing.T) { | ||
| rec := &applyIndexOrderRecorder{} | ||
| persist := &recordingPersistStorage{rec: rec} | ||
| // testStateMachine (defined in engine_test.go) is the canonical | ||
| // non-AppliedIndexWriter FSM used by other tests in this package. | ||
| e := &Engine{fsm: &testStateMachine{}, persist: persist} | ||
|
|
||
| snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 17, Term: 1}} | ||
| require.NoError(t, e.persistCreatedSnapshot(snap)) | ||
|
|
||
| require.Equal(t, []orderEvent{ | ||
| {kind: "save", index: 17}, | ||
| }, rec.snapshot(), | ||
| "legacy FSM path: snapshot persist still happens, just without the meta-key bump") | ||
| } | ||
|
|
||
| // TestPersistCreatedSnapshot_BumpErrorAborts checks the ordering | ||
| // invariant under failure: if SetDurableAppliedIndex returns an | ||
| // error, the engine MUST surface it AND NOT call SaveSnap. This | ||
| // preserves the (metaAppliedIndex < snapshot pointer impossible) | ||
| // crash invariant from PR #910 design §6. | ||
| func TestPersistCreatedSnapshot_BumpErrorAborts(t *testing.T) { | ||
| rec := &applyIndexOrderRecorder{} | ||
| fsm := &recordingAppliedIndexFSM{rec: rec, failNext: true, failErr: io.ErrShortBuffer} | ||
| persist := &recordingPersistStorage{rec: rec} | ||
| e := &Engine{fsm: fsm, persist: persist} | ||
|
|
||
| snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 99, Term: 1}} | ||
| err := e.persistCreatedSnapshot(snap) | ||
| require.Error(t, err, "bump failure MUST be surfaced to caller") | ||
| require.Empty(t, rec.snapshot(), | ||
| "failed bump MUST NOT have recorded; SaveSnap MUST NOT have run") | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,6 +129,40 @@ func (f *kvFSM) SetApplyIndex(idx uint64) { | |
| f.pendingApplyIdx = idx | ||
| } | ||
|
|
||
| // AppliedIndexReader implements raftengine.AppliedIndexReporter. It | ||
| // exposes the underlying store's durable applied-index when the | ||
| // store implements raftengine.AppliedIndexReader (pebbleStore does; | ||
| // the in-memory mvccStore does not, in which case (0, false, nil) | ||
| // from the missing-key path will land at the caller). nil means | ||
| // "not supported on this backend" and triggers the conservative | ||
| // full-restore fallback at the cold-start skip gate. See | ||
| // docs/design/2026_06_02_idempotent_snapshot_restore.md §3. | ||
| func (f *kvFSM) AppliedIndexReader() raftengine.AppliedIndexReader { | ||
| if r, ok := f.store.(raftengine.AppliedIndexReader); ok { | ||
| return r | ||
| } | ||
| return nil | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the cold-start skip gate queries the FSM through the optional Useful? React with 👍 / 👎. |
||
|
|
||
| // SetDurableAppliedIndex implements raftengine.AppliedIndexWriter by | ||
| // forwarding to the underlying store when it supports the writer | ||
| // seam. Called by the engine at every snapshot persist site BEFORE | ||
| // persist.SaveSnap so a successful snapshot persist implies | ||
| // LastAppliedIndex >= snap.Metadata.Index, closing the HLC-lease- | ||
| // only / encryption-only fallback (PR #910 design §6). | ||
| // | ||
| // Returns nil silently when the backing store does not implement | ||
| // the writer seam (in-memory mvccStore, test fakes) — the skip | ||
| // optimisation simply degrades to "fall back to full restore" for | ||
| // those FSMs. | ||
| func (f *kvFSM) SetDurableAppliedIndex(idx uint64) error { | ||
| w, ok := f.store.(raftengine.AppliedIndexWriter) | ||
| if !ok { | ||
| return nil | ||
| } | ||
| return w.SetDurableAppliedIndex(idx) | ||
| } | ||
|
|
||
| type FSM interface { | ||
| raftengine.StateMachine | ||
| } | ||
|
|
@@ -455,7 +489,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui | |
| } | ||
| // Raw requests always commit against the latest state; use commitTS as both | ||
| // the validation snapshot and the commit timestamp. | ||
| return errors.WithStack(f.store.ApplyMutationsRaft(ctx, muts, nil, commitTS, commitTS)) | ||
| return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, muts, nil, commitTS, commitTS, f.pendingApplyIdx)) | ||
| } | ||
|
|
||
| // extractDelPrefix checks if the mutations contain a DEL_PREFIX operation. | ||
|
|
@@ -472,7 +506,7 @@ func extractDelPrefix(muts []*pb.Mutation) (bool, []byte) { | |
| // handleDelPrefix delegates prefix deletion to the store. Transaction-internal | ||
| // keys are always excluded to preserve transactional integrity. | ||
| func (f *kvFSM) handleDelPrefix(ctx context.Context, prefix []byte, commitTS uint64) error { | ||
| return errors.WithStack(f.store.DeletePrefixAtRaft(ctx, prefix, txnCommonPrefix, commitTS)) | ||
| return errors.WithStack(f.store.DeletePrefixAtRaftAt(ctx, prefix, txnCommonPrefix, commitTS, f.pendingApplyIdx)) | ||
| } | ||
|
|
||
| var ErrNotImplemented = errors.New("not implemented") | ||
|
|
@@ -730,7 +764,7 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { | |
| return err | ||
| } | ||
|
|
||
| if err := f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil { | ||
| if err := f.store.ApplyMutationsRaftAt(ctx, storeMuts, r.ReadKeys, startTS, startTS, f.pendingApplyIdx); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| return nil | ||
|
|
@@ -794,7 +828,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com | |
| if err != nil { | ||
| return err | ||
| } | ||
| return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) | ||
| return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, r.ReadKeys, startTS, commitTS, f.pendingApplyIdx)) | ||
| } | ||
|
|
||
| // dedupProbeOnePhase decides whether handleOnePhaseTxnRequest should no-op | ||
|
|
@@ -898,7 +932,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start | |
| // The secondary-shard LatestCommitTS scan is intentionally deferred to the | ||
| // write-conflict path so the hot (first-time) commit path pays no extra cost. | ||
| func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error { | ||
| err := f.store.ApplyMutationsRaft(ctx, storeMuts, nil, applyStartTS, commitTS) | ||
| err := f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, applyStartTS, commitTS, f.pendingApplyIdx) | ||
| if err == nil { | ||
| return nil | ||
| } | ||
|
|
@@ -915,7 +949,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut | |
| return errors.WithStack(lErr) | ||
| } | ||
| if exists && latestTS >= commitTS { | ||
| return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, commitTS, commitTS)) | ||
| return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, commitTS, commitTS, f.pendingApplyIdx)) | ||
| } | ||
| } | ||
| return errors.WithStack(err) | ||
|
|
@@ -966,7 +1000,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u | |
| if len(storeMuts) == 0 { | ||
| return nil | ||
| } | ||
| return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, startTS, abortTS)) | ||
| return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, startTS, abortTS, f.pendingApplyIdx)) | ||
| } | ||
|
|
||
| func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -330,6 +330,16 @@ func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations [] | |
| return errors.WithStack(s.local.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) | ||
| } | ||
|
|
||
| // ApplyMutationsRaftAt forwards to the local store's raft-entry-index- | ||
| // aware variant so the underlying pebbleStore can bundle | ||
| // metaAppliedIndex with the mutation. See PR #910 design §2. | ||
| func (s *LeaderRoutedStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { | ||
| if s == nil || s.local == nil { | ||
| return errors.WithStack(store.ErrNotSupported) | ||
| } | ||
| return errors.WithStack(s.local.ApplyMutationsRaftAt(ctx, mutations, readKeys, startTS, commitTS, appliedIndex)) | ||
| } | ||
|
|
||
| func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { | ||
| if s == nil || s.local == nil { | ||
| return errors.WithStack(store.ErrNotSupported) | ||
|
|
@@ -345,6 +355,15 @@ func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byt | |
| return errors.WithStack(s.local.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS)) | ||
| } | ||
|
|
||
| // DeletePrefixAtRaftAt forwards to the local store's raft-entry- | ||
| // index-aware variant. See PR #910 design §2 "why both leaves". | ||
| func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { | ||
| if s == nil || s.local == nil { | ||
| return errors.WithStack(store.ErrNotSupported) | ||
| } | ||
| return errors.WithStack(s.local.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex)) | ||
| } | ||
|
Comment on lines
+360
to
+365
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The To fix this, func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error {
if s == nil || s.local == nil {
return errors.WithStack(store.ErrNotSupported)
}
return errors.WithStack(s.local.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex))
}
func (s *LeaderRoutedStore) LastAppliedIndex() (uint64, bool, error) {
if s == nil || s.local == nil {
return 0, false, nil
}
if r, ok := s.local.(interface {
LastAppliedIndex() (uint64, bool, error)
}); ok {
return r.LastAppliedIndex()
}
return 0, false, nil
}
func (s *LeaderRoutedStore) SetDurableAppliedIndex(idx uint64) error {
if s == nil || s.local == nil {
return nil
}
if w, ok := s.local.(interface {
SetDurableAppliedIndex(uint64) error
}); ok {
return w.SetDurableAppliedIndex(idx)
}
return nil
} |
||
|
|
||
| func (s *LeaderRoutedStore) LastCommitTS() uint64 { | ||
| if s == nil || s.local == nil { | ||
| return 0 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the config-snapshot path reaches this hook,
persistConfigSnapshotPayloadLockedhas already obtainedpayloadfromsnapshotPayload, which writes/closes the.fsmfile (or serializes the legacy bytes) before callingpersistCreatedSnapshot. Bumping_meta_applied_indexhere only updates the live store, so a peer or restart that restores this saved config snapshot gets the older/missing meta key while the raft snapshot is atsnap.Metadata.Index; after WAL entries up to that index are released, the advertisedLastAppliedIndex >= snapshot indexinvariant does not hold for the snapshot artifact. This is separate from the steady-state snapshot site: the same ordering occurs throughpersistConfigSnapshotPayloadLocked -> snapshotPayload -> persistCreatedSnapshot.Useful? React with 👍 / 👎.