Skip to content

Commit 07ff07d

Browse files
authored
fix(raft): stream receive snapshot to disk + emit EKVT token (memory safety) (#747)
2 parents 6a5e493 + 3df8983 commit 07ff07d

5 files changed

Lines changed: 650 additions & 23 deletions

File tree

internal/raftengine/etcd/grpc_transport.go

Lines changed: 166 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"io"
77
"log/slog"
8+
"os"
89
"sync"
910
"time"
1011

@@ -378,11 +379,54 @@ func (t *GRPCTransport) SendSnapshot(stream pb.EtcdRaft_SendSnapshotServer) erro
378379
return err
379380
}
380381
if err := t.handle(stream.Context(), msg); err != nil {
382+
// If receive finalized the snapshot as a .fsm file (token in
383+
// Snapshot.Data), the engine refused to apply it — likely a
384+
// transient context cancel or raft error. Remove the on-disk
385+
// file so retries at later indexes don't leak orphan .fsm
386+
// payloads into fsmSnapDir until the next startup runs
387+
// cleanupStaleFSMSnaps. Same-index retries are already safe
388+
// because os.Rename atomically replaces the prior file.
389+
t.removeOrphanedFSMSnapshot(msg)
381390
return err
382391
}
383392
return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{}))
384393
}
385394

395+
// removeOrphanedFSMSnapshot deletes the .fsm file that
396+
// receiveSnapshotStream finalized for `msg`, if any. Used by
397+
// SendSnapshot when the engine apply (`t.handle`) fails after the
398+
// receive succeeded — the engine has NOT applied the snapshot (apply is
399+
// synchronous to t.handle, so a non-nil return means applied_index was
400+
// not advanced), so the file is unreferenced and safe to remove.
401+
//
402+
// Best-effort: a cleanup failure here is logged but not returned because
403+
// the original apply error is the actionable signal; orphans get swept
404+
// by cleanupStaleFSMSnaps at the next engine restart even if Remove
405+
// races with another process.
406+
func (t *GRPCTransport) removeOrphanedFSMSnapshot(msg raftpb.Message) {
407+
if msg.Snapshot == nil || !isSnapshotToken(msg.Snapshot.Data) {
408+
return
409+
}
410+
tok, err := decodeSnapshotToken(msg.Snapshot.Data)
411+
if err != nil {
412+
return
413+
}
414+
t.mu.RLock()
415+
fsmSnapDir := t.fsmSnapDir
416+
t.mu.RUnlock()
417+
if fsmSnapDir == "" {
418+
return
419+
}
420+
path := fsmSnapPath(fsmSnapDir, tok.Index)
421+
if rmErr := os.Remove(path); rmErr != nil && !os.IsNotExist(rmErr) {
422+
slog.Warn("failed to remove orphaned fsm snapshot file after apply failure",
423+
"path", path,
424+
"index", tok.Index,
425+
"err", rmErr,
426+
)
427+
}
428+
}
429+
386430
func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb.EtcdRaftAck, error) {
387431
if req == nil {
388432
return &pb.EtcdRaftAck{}, nil
@@ -683,55 +727,154 @@ func (t *GRPCTransport) handle(ctx context.Context, msg raftpb.Message) error {
683727
return errors.WithStack(handler(ctx, msg))
684728
}
685729

686-
func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) {
687-
var metadata raftpb.Message
688-
seenMetadata := false
730+
// snapshotSpoolPlacement returns (spoolDir, fsmSnapDir) under the transport
731+
// lock. When fsmSnapDir is wired, the spool itself is placed inside it so
732+
// FinalizeAsFSMFile's rename stays intra-filesystem and cannot fail with
733+
// EXDEV. Standard engine wiring puts both under cfg.DataDir, but the
734+
// receive code should not assume that. The legacy fallback path
735+
// (fsmSnapDir == "") keeps the spool in spoolDir because it never renames
736+
// — Bytes() materializes the payload in place.
737+
func (t *GRPCTransport) snapshotSpoolPlacement() (placement, fsmSnapDir string) {
689738
t.mu.RLock()
690-
spoolDir := t.spoolDir
691-
t.mu.RUnlock()
739+
defer t.mu.RUnlock()
740+
fsmSnapDir = t.fsmSnapDir
741+
if fsmSnapDir != "" {
742+
return fsmSnapDir, fsmSnapDir
743+
}
744+
return t.spoolDir, ""
745+
}
692746

693-
spool, err := newSnapshotSpool(spoolDir)
747+
func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) {
748+
spoolPlacement, fsmSnapDir := t.snapshotSpoolPlacement()
749+
spool, err := newSnapshotSpool(spoolPlacement)
694750
if err != nil {
695751
return raftpb.Message{}, err
696752
}
697753
defer func() {
698-
_ = spool.Close()
754+
// Log rather than swallow: a Close failure here points at a
755+
// half-written spool file we couldn't clean up (disk full,
756+
// permission flip mid-stream, …). Once FinalizeAsFSMFile has
757+
// transferred ownership, Close is a no-op so this only fires on
758+
// the unhappy paths that actually need an operator to look.
759+
if closeErr := spool.Close(); closeErr != nil {
760+
slog.Warn("snapshot spool close failed",
761+
"spool_dir", spoolPlacement,
762+
"err", closeErr,
763+
)
764+
}
699765
}()
700766

767+
msg, payloadBytes, err := drainSnapshotChunks(stream, spool, fsmSnapDir)
768+
if err != nil {
769+
return raftpb.Message{}, err
770+
}
771+
index := uint64(0)
772+
if msg.Snapshot != nil {
773+
index = msg.Snapshot.Metadata.Index
774+
}
775+
slog.Info("etcd raft snapshot stream received",
776+
"index", index,
777+
"from", msg.From,
778+
"payload_bytes", payloadBytes,
779+
"format", snapshotDataFormatLabel(msg.Snapshot),
780+
)
781+
return msg, nil
782+
}
783+
784+
// drainSnapshotChunks consumes the SendSnapshot stream into spool, computes
785+
// CRC32C over the payload bytes as they hit disk, and on the final chunk
786+
// hands off to finalizeReceivedSnapshot — which decides between the
787+
// streaming-token path (rename to fsmSnapDir/<index>.fsm + 17-byte token
788+
// in Snapshot.Data) and the legacy materialize fallback. Extracted from
789+
// receiveSnapshotStream so that function stays under cyclop's complexity
790+
// budget.
791+
func drainSnapshotChunks(
792+
stream pb.EtcdRaft_SendSnapshotServer,
793+
spool *snapshotSpool,
794+
fsmSnapDir string,
795+
) (raftpb.Message, int64, error) {
796+
var metadata raftpb.Message
797+
seenMetadata := false
798+
// Wrap spool with crc32CWriter so the CRC accumulates as bytes hit
799+
// disk. The CRC is only meaningful when we have an fsmSnapDir to
800+
// finalize into; the legacy fallback path discards it. Cost is
801+
// hashing speed (~GB/s on modern x86 with SSE 4.2 PCLMULQDQ), well
802+
// above gRPC stream throughput so the wrapper is invisible in
803+
// profiles.
804+
crcWriter := newCRC32CWriter(spool)
805+
701806
var payloadBytes int64
702807
for {
703808
chunk, err := stream.Recv()
704809
if err != nil {
705810
if errors.Is(err, io.EOF) {
706-
return raftpb.Message{}, errors.WithStack(errSnapshotStreamShort)
811+
return raftpb.Message{}, 0, errors.WithStack(errSnapshotStreamShort)
707812
}
708-
return raftpb.Message{}, errors.WithStack(err)
813+
return raftpb.Message{}, 0, errors.WithStack(err)
709814
}
710815
payloadBytes += int64(len(chunk.Chunk))
711-
seen, err := appendSnapshotChunk(&metadata, spool, chunk, seenMetadata)
816+
seen, err := appendSnapshotChunk(&metadata, crcWriter, chunk, seenMetadata)
712817
if err != nil {
713-
return raftpb.Message{}, err
818+
return raftpb.Message{}, 0, err
714819
}
715820
seenMetadata = seen
716821
if chunk.Final {
717-
msg, err := buildSnapshotMessage(metadata, spool, seenMetadata)
822+
msg, err := finalizeReceivedSnapshot(metadata, spool, crcWriter.Sum32(), fsmSnapDir, seenMetadata)
718823
if err != nil {
719-
return raftpb.Message{}, err
720-
}
721-
index := uint64(0)
722-
if msg.Snapshot != nil {
723-
index = msg.Snapshot.Metadata.Index
824+
return raftpb.Message{}, 0, err
724825
}
725-
slog.Info("etcd raft snapshot stream received",
726-
"index", index,
727-
"from", msg.From,
728-
"payload_bytes", payloadBytes,
729-
)
730-
return msg, nil
826+
return msg, payloadBytes, nil
731827
}
732828
}
733829
}
734830

831+
// finalizeReceivedSnapshot picks between the streaming-token path (when an
832+
// fsmSnapDir is wired and the snapshot's metadata index is non-zero) and the
833+
// legacy in-memory path. The streaming path renames the spool file in place
834+
// to fsmSnapPath(fsmSnapDir, index), embeds a 17-byte EKVT token in
835+
// Snapshot.Data, and lets restoreSnapshotState read the payload off disk via
836+
// io.Reader — heap usage stays flat regardless of FSM size, eliminating the
837+
// 1.35-GiB-FSM × 2.5-GiB-container OOM hazard observed in the 2026-05-08
838+
// incident. The legacy path is preserved for tests and legacy receivers
839+
// that have not wired a snapshot directory.
840+
func finalizeReceivedSnapshot(
841+
metadata raftpb.Message,
842+
spool *snapshotSpool,
843+
crc32c uint32,
844+
fsmSnapDir string,
845+
seenMetadata bool,
846+
) (raftpb.Message, error) {
847+
if !seenMetadata || metadata.Snapshot == nil {
848+
return raftpb.Message{}, errors.WithStack(errSnapshotMetadataNil)
849+
}
850+
index := metadata.Snapshot.Metadata.Index
851+
if fsmSnapDir != "" && index > 0 {
852+
if err := spool.FinalizeAsFSMFile(fsmSnapDir, index, crc32c); err != nil {
853+
return raftpb.Message{}, err
854+
}
855+
metadata.Snapshot.Data = encodeSnapshotToken(index, crc32c)
856+
return metadata, nil
857+
}
858+
// Legacy fallback: full materialization. Used by tests that don't wire an
859+
// fsmSnapDir and by the index=0 edge case (no canonical filename to
860+
// rename to).
861+
return buildSnapshotMessage(metadata, spool, seenMetadata)
862+
}
863+
864+
// snapshotDataFormatLabel exists purely for the structured log line on the
865+
// receiver — it lets an operator distinguish a streaming-token receive
866+
// (small heap, payload on disk) from a legacy materialization (heap holds
867+
// the full payload) at a glance, without grepping for byte counts.
868+
func snapshotDataFormatLabel(snap *raftpb.Snapshot) string {
869+
if snap == nil {
870+
return "nil"
871+
}
872+
if isSnapshotToken(snap.Data) {
873+
return "token"
874+
}
875+
return "inline"
876+
}
877+
735878
func appendSnapshotChunk(metadata *raftpb.Message, payload io.Writer, chunk *pb.EtcdRaftSnapshotChunk, seenMetadata bool) (bool, error) {
736879
if len(chunk.Metadata) > 0 {
737880
if seenMetadata {

0 commit comments

Comments
 (0)