Skip to content

Commit 944ab86

Browse files
committed
refactor(raft): extract drainSnapshotChunks to satisfy cyclop (PR #747 r3)
Round-3 lint feedback on commit 3424d5c from reviewdog/golangci: > calculated cyclomatic complexity for function receiveSnapshotStream > is 11, max is 10 (cyclop) Round-3's EXDEV-avoidance change pushed receiveSnapshotStream's cyclomatic complexity from 10 to 11 (the new spool-placement branch). Reduced complexity by extracting two helpers and narrowing each function's responsibility: - snapshotSpoolPlacement(): owns the locked read of t.spoolDir / t.fsmSnapDir and the EXDEV-avoidance decision (place spool inside fsmSnapDir when wired). Returns (placement, fsmSnapDir). - drainSnapshotChunks(): owns the per-chunk receive loop, CRC accumulation, and the call into finalizeReceivedSnapshot on the final chunk. Returns (msg, payloadBytes, err). receiveSnapshotStream now owns: - spool lifecycle (newSnapshotSpool + deferred Close-with-warn) - the post-receive structured log line Behaviour is byte-identical: same spool placement, same finalization path, same log line (index, from, payload_bytes, format). Verified locally: golangci-lint run --enable-only cyclop ./internal/raftengine/etcd/... -- 0 issues. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 12.0s, all green. Claude bot round-3 review on the prior commit (3424d5c) marked the PR as "approve. All three rounds of review concerns are resolved." This refactor only addresses the cyclop signal raised by reviewdog separately; no semantic changes on the receive path. No caller audit needed.
1 parent 3424d5c commit 944ab86

1 file changed

Lines changed: 58 additions & 39 deletions

File tree

internal/raftengine/etcd/grpc_transport.go

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -683,28 +683,25 @@ func (t *GRPCTransport) handle(ctx context.Context, msg raftpb.Message) error {
683683
return errors.WithStack(handler(ctx, msg))
684684
}
685685

686-
func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) {
687-
var metadata raftpb.Message
688-
seenMetadata := false
686+
// snapshotSpoolPlacement returns (spoolDir, fsmSnapDir) under the transport
687+
// lock. When fsmSnapDir is wired, the spool itself is placed inside it so
688+
// FinalizeAsFSMFile's rename stays intra-filesystem and cannot fail with
689+
// EXDEV. Standard engine wiring puts both under cfg.DataDir, but the
690+
// receive code should not assume that. The legacy fallback path
691+
// (fsmSnapDir == "") keeps the spool in spoolDir because it never renames
692+
// — Bytes() materializes the payload in place.
693+
func (t *GRPCTransport) snapshotSpoolPlacement() (placement, fsmSnapDir string) {
689694
t.mu.RLock()
690-
spoolDir := t.spoolDir
691-
fsmSnapDir := t.fsmSnapDir
692-
t.mu.RUnlock()
693-
694-
// Place the spool file inside fsmSnapDir (not spoolDir) when the
695-
// streaming-token path is wired, so FinalizeAsFSMFile's os.Rename
696-
// stays within a single filesystem and cannot fail with EXDEV. An
697-
// operator who mounts spoolDir and fsmSnapDir on different volumes
698-
// would otherwise hit a hard receive failure (the leader retries
699-
// indefinitely with no chance of success). Standard engine wiring
700-
// already puts both under cfg.DataDir, but the receive code should
701-
// not assume that. The legacy fallback path (fsmSnapDir empty)
702-
// keeps the spool in spoolDir because it never renames — Bytes()
703-
// materializes the payload in place.
704-
spoolPlacement := spoolDir
695+
defer t.mu.RUnlock()
696+
fsmSnapDir = t.fsmSnapDir
705697
if fsmSnapDir != "" {
706-
spoolPlacement = fsmSnapDir
698+
return fsmSnapDir, fsmSnapDir
707699
}
700+
return t.spoolDir, ""
701+
}
702+
703+
func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) {
704+
spoolPlacement, fsmSnapDir := t.snapshotSpoolPlacement()
708705
spool, err := newSnapshotSpool(spoolPlacement)
709706
if err != nil {
710707
return raftpb.Message{}, err
@@ -723,44 +720,66 @@ func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotSer
723720
}
724721
}()
725722

726-
// Wrap spool with crc32CWriter so the CRC accumulates as bytes hit disk.
727-
// The CRC is only meaningful when we have an fsmSnapDir to finalize into;
728-
// the legacy fallback path discards it. Cost is hashing speed (~GB/s on
729-
// modern x86 with SSE 4.2 PCLMULQDQ) which is far above the gRPC stream
730-
// throughput so the wrapper is invisible in profiles.
723+
msg, payloadBytes, err := drainSnapshotChunks(stream, spool, fsmSnapDir)
724+
if err != nil {
725+
return raftpb.Message{}, err
726+
}
727+
index := uint64(0)
728+
if msg.Snapshot != nil {
729+
index = msg.Snapshot.Metadata.Index
730+
}
731+
slog.Info("etcd raft snapshot stream received",
732+
"index", index,
733+
"from", msg.From,
734+
"payload_bytes", payloadBytes,
735+
"format", snapshotDataFormatLabel(msg.Snapshot),
736+
)
737+
return msg, nil
738+
}
739+
740+
// drainSnapshotChunks consumes the SendSnapshot stream into spool, computes
741+
// CRC32C over the payload bytes as they hit disk, and on the final chunk
742+
// hands off to finalizeReceivedSnapshot — which decides between the
743+
// streaming-token path (rename to fsmSnapDir/<index>.fsm + 17-byte token
744+
// in Snapshot.Data) and the legacy materialize fallback. Extracted from
745+
// receiveSnapshotStream so that function stays under cyclop's complexity
746+
// budget.
747+
func drainSnapshotChunks(
748+
stream pb.EtcdRaft_SendSnapshotServer,
749+
spool *snapshotSpool,
750+
fsmSnapDir string,
751+
) (raftpb.Message, int64, error) {
752+
var metadata raftpb.Message
753+
seenMetadata := false
754+
// Wrap spool with crc32CWriter so the CRC accumulates as bytes hit
755+
// disk. The CRC is only meaningful when we have an fsmSnapDir to
756+
// finalize into; the legacy fallback path discards it. Cost is
757+
// hashing speed (~GB/s on modern x86 with SSE 4.2 PCLMULQDQ), well
758+
// above gRPC stream throughput so the wrapper is invisible in
759+
// profiles.
731760
crcWriter := newCRC32CWriter(spool)
732761

733762
var payloadBytes int64
734763
for {
735764
chunk, err := stream.Recv()
736765
if err != nil {
737766
if errors.Is(err, io.EOF) {
738-
return raftpb.Message{}, errors.WithStack(errSnapshotStreamShort)
767+
return raftpb.Message{}, 0, errors.WithStack(errSnapshotStreamShort)
739768
}
740-
return raftpb.Message{}, errors.WithStack(err)
769+
return raftpb.Message{}, 0, errors.WithStack(err)
741770
}
742771
payloadBytes += int64(len(chunk.Chunk))
743772
seen, err := appendSnapshotChunk(&metadata, crcWriter, chunk, seenMetadata)
744773
if err != nil {
745-
return raftpb.Message{}, err
774+
return raftpb.Message{}, 0, err
746775
}
747776
seenMetadata = seen
748777
if chunk.Final {
749778
msg, err := finalizeReceivedSnapshot(metadata, spool, crcWriter.Sum32(), fsmSnapDir, seenMetadata)
750779
if err != nil {
751-
return raftpb.Message{}, err
752-
}
753-
index := uint64(0)
754-
if msg.Snapshot != nil {
755-
index = msg.Snapshot.Metadata.Index
780+
return raftpb.Message{}, 0, err
756781
}
757-
slog.Info("etcd raft snapshot stream received",
758-
"index", index,
759-
"from", msg.From,
760-
"payload_bytes", payloadBytes,
761-
"format", snapshotDataFormatLabel(msg.Snapshot),
762-
)
763-
return msg, nil
782+
return msg, payloadBytes, nil
764783
}
765784
}
766785
}

0 commit comments

Comments
 (0)