Skip to content

Commit 3df8983

Browse files
committed
fix(raft): remove orphan .fsm on apply failure (PR #747 r4)
Round-3 review on commit 944ab86 from chatgpt-codex (P2): > The streaming branch finalizes and renames the spool into > fsmSnapDir before the message is handed to SendSnapshot's > t.handle path, but FinalizeAsFSMFile clears the spool path so > deferred spool.Close() cannot clean it up afterward. If > t.handle returns an error (for example during transient > engine/raft failures), SendSnapshot returns failure to the > sender while leaving the newly written .fsm file behind with > no corresponding applied snapshot, which can accumulate > orphaned large files across retries with different snapshot > indexes. Confirmed: - After receiveSnapshotStream succeeds, msg.Snapshot.Data is a 17-byte EKVT token and the .fsm file lives at fsmSnapPath(fsmSnapDir, index). - SendSnapshot then calls t.handle(ctx, msg). The engine's applySnapshot is synchronous to t.handle, so a non-nil return guarantees applied_index was NOT advanced — the .fsm file is unreferenced. - Same-index retries are safe (os.Rename atomically replaces), but the leader can take a fresh snapshot at a higher index before the apply finally succeeds, and each failed attempt at a different index leaves an orphan .fsm. - cleanupStaleFSMSnaps only runs at startup (prepareDataDirs), so during a long-lived process, orphans accumulate to disk-size pressure. Fix: SendSnapshot calls a new removeOrphanedFSMSnapshot helper on the apply-failure branch. The helper: 1. Decodes the EKVT token from msg.Snapshot.Data; bails if the message used the legacy inline path (no .fsm file to clean). 2. Reads fsmSnapDir under t.mu; bails if it's unset (legacy receivers also use the inline path so there's nothing on disk). 3. os.Remove(fsmSnapPath(...)) — best-effort, IsNotExist is tolerated, all other errors are slog.Warn'd. The original apply error is the actionable signal returned to the sender; a failed Remove is a secondary concern that startup cleanup still picks up. Test added (TestSendSnapshot_ApplyFailureRemovesFinalizedFSMFile): - Wires SetFSMSnapDir + SetHandler-that-fails. - Drives a real testStateMachine-framed payload through SendSnapshot. - Asserts SendSnapshot surfaces the apply error. - Asserts fsmSnapPath(fsmSnapDir, index) does NOT exist after the call — orphan cleanup fired. Caller audit (semantic change requires it): - SendSnapshot (this function): only gRPC service handler, no in-tree non-test callers. Modified. - removeOrphanedFSMSnapshot: only called from SendSnapshot. - t.handle (other callers): line 438 in Send (regular RPC for non-snapshot messages, no .fsm involvement) is untouched. - client.SendSnapshot (lines 267, 455, 481): leader-side client invocations are different function, unaffected. The cleanup is gated by isSnapshotToken(msg.Snapshot.Data) so the legacy in-memory fallback path (when fsmSnapDir is unset) sees no behavior change — there's no .fsm file to remove on that path because Bytes() materializes inline. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 11.3s, all green. golangci-lint run --enable-only cyclop ./internal/raftengine/etcd/... -- 0 issues.
1 parent 944ab86 commit 3df8983

2 files changed

Lines changed: 113 additions & 0 deletions

File tree

internal/raftengine/etcd/grpc_transport.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"io"
77
"log/slog"
8+
"os"
89
"sync"
910
"time"
1011

@@ -378,11 +379,54 @@ func (t *GRPCTransport) SendSnapshot(stream pb.EtcdRaft_SendSnapshotServer) erro
378379
return err
379380
}
380381
if err := t.handle(stream.Context(), msg); err != nil {
382+
// If receive finalized the snapshot as a .fsm file (token in
383+
// Snapshot.Data), the engine refused to apply it — likely a
384+
// transient context cancel or raft error. Remove the on-disk
385+
// file so retries at later indexes don't leak orphan .fsm
386+
// payloads into fsmSnapDir until the next startup runs
387+
// cleanupStaleFSMSnaps. Same-index retries are already safe
388+
// because os.Rename atomically replaces the prior file.
389+
t.removeOrphanedFSMSnapshot(msg)
381390
return err
382391
}
383392
return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{}))
384393
}
385394

395+
// removeOrphanedFSMSnapshot deletes the .fsm file that
396+
// receiveSnapshotStream finalized for `msg`, if any. Used by
397+
// SendSnapshot when the engine apply (`t.handle`) fails after the
398+
// receive succeeded — the engine has NOT applied the snapshot (apply is
399+
// synchronous to t.handle, so a non-nil return means applied_index was
400+
// not advanced), so the file is unreferenced and safe to remove.
401+
//
402+
// Best-effort: a cleanup failure here is logged but not returned because
403+
// the original apply error is the actionable signal; orphans get swept
404+
// by cleanupStaleFSMSnaps at the next engine restart even if Remove
405+
// races with another process.
406+
func (t *GRPCTransport) removeOrphanedFSMSnapshot(msg raftpb.Message) {
407+
if msg.Snapshot == nil || !isSnapshotToken(msg.Snapshot.Data) {
408+
return
409+
}
410+
tok, err := decodeSnapshotToken(msg.Snapshot.Data)
411+
if err != nil {
412+
return
413+
}
414+
t.mu.RLock()
415+
fsmSnapDir := t.fsmSnapDir
416+
t.mu.RUnlock()
417+
if fsmSnapDir == "" {
418+
return
419+
}
420+
path := fsmSnapPath(fsmSnapDir, tok.Index)
421+
if rmErr := os.Remove(path); rmErr != nil && !os.IsNotExist(rmErr) {
422+
slog.Warn("failed to remove orphaned fsm snapshot file after apply failure",
423+
"path", path,
424+
"index", tok.Index,
425+
"err", rmErr,
426+
)
427+
}
428+
}
429+
386430
func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb.EtcdRaftAck, error) {
387431
if req == nil {
388432
return &pb.EtcdRaftAck{}, nil

internal/raftengine/etcd/grpc_transport_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,75 @@ func TestReceiveSnapshotStream_SpoolPlacedInFSMSnapDir(t *testing.T) {
282282
require.NoError(t, statErr, "renamed .fsm file should exist at canonical path")
283283
}
284284

285+
// TestSendSnapshot_ApplyFailureRemovesFinalizedFSMFile pins the
286+
// orphan-cleanup behaviour from PR #747 round-4 (Codex P2): when the
287+
// receive path successfully finalizes the snapshot as
288+
// fsmSnapDir/<index>.fsm but the engine's apply (t.handle) then fails
289+
// — transient context cancel, raft error, etc. — the finalized .fsm
290+
// file MUST be removed. Otherwise retries at later snapshot indexes
291+
// accumulate orphan .fsm payloads in fsmSnapDir until startup runs
292+
// cleanupStaleFSMSnaps. Same-index retries are already safe via
293+
// os.Rename's atomic-replace, so this test exercises the cross-index
294+
// case where the orphan would actually persist.
295+
func TestSendSnapshot_ApplyFailureRemovesFinalizedFSMFile(t *testing.T) {
296+
const index = uint64(77)
297+
298+
// Build a real testStateMachine payload + framed bytes so receive
299+
// finalizes a syntactically valid .fsm file.
300+
senderFSM := &testStateMachine{}
301+
senderFSM.Apply([]byte("entry-for-orphan-cleanup-test"))
302+
snap, err := senderFSM.Snapshot()
303+
require.NoError(t, err)
304+
var buf bytes.Buffer
305+
_, err = snap.WriteTo(&buf)
306+
require.NoError(t, err)
307+
require.NoError(t, snap.Close())
308+
payload := buf.Bytes()
309+
310+
metadata := raftpb.Message{
311+
Type: raftpb.MsgSnap,
312+
From: 1,
313+
To: 2,
314+
Snapshot: &raftpb.Snapshot{
315+
Metadata: raftpb.SnapshotMetadata{Index: index, Term: 1},
316+
},
317+
}
318+
raw, err := metadata.Marshal()
319+
require.NoError(t, err)
320+
321+
fsmSnapDir := t.TempDir()
322+
323+
transport := NewGRPCTransport(nil)
324+
transport.SetSpoolDir(t.TempDir())
325+
transport.SetFSMSnapDir(fsmSnapDir)
326+
327+
// Wire a handler that always fails so SendSnapshot exercises the
328+
// orphan-cleanup branch.
329+
applyErr := errors.New("simulated apply failure")
330+
transport.SetHandler(func(_ context.Context, _ raftpb.Message) error {
331+
return applyErr
332+
})
333+
334+
stream := &testSendSnapshotServer{
335+
chunks: []*pb.EtcdRaftSnapshotChunk{
336+
{Metadata: raw},
337+
{Chunk: payload, Final: true},
338+
},
339+
}
340+
341+
err = transport.SendSnapshot(stream)
342+
require.Error(t, err)
343+
require.ErrorIs(t, err, applyErr, "SendSnapshot must surface the apply failure")
344+
345+
// THE point: the .fsm file at the canonical path MUST have been
346+
// removed. Without the cleanup, leader retries at later indexes
347+
// would accumulate one .fsm per failed apply until next startup.
348+
finalPath := fsmSnapPath(fsmSnapDir, index)
349+
_, statErr := os.Stat(finalPath)
350+
require.True(t, os.IsNotExist(statErr),
351+
"orphan .fsm file at %s must be removed after apply failure (got stat err: %v)", finalPath, statErr)
352+
}
353+
285354
// TestReceiveSnapshotStream_LegacyFallbackWhenNoFSMSnapDir pins the
286355
// behaviour when fsmSnapDir is unset: receive still works, just via the
287356
// pre-PR materialization path. Tests that don't wire an fsmSnapDir (most

0 commit comments

Comments
 (0)