Skip to content

Commit 1f656f5

Browse files
committed
raftengine: harden async etcd snapshot dispatch
1 parent b9ef9fc commit 1f656f5

5 files changed

Lines changed: 395 additions & 40 deletions

File tree

internal/raftengine/etcd/engine.go

Lines changed: 201 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/binary"
77
"io"
8+
"log/slog"
89
"strconv"
910
"sync"
1011
"sync/atomic"
@@ -93,6 +94,9 @@ type Engine struct {
9394
stepCh chan raftpb.Message
9495
dispatchCh chan raftpb.Message
9596
dispatchStopCh chan struct{}
97+
snapshotReqCh chan snapshotRequest
98+
snapshotResCh chan snapshotResult
99+
snapshotStopCh chan struct{}
96100
closeCh chan struct{}
97101
doneCh chan struct{}
98102
startedCh chan struct{}
@@ -102,7 +106,9 @@ type Engine struct {
102106
startOnce sync.Once
103107
closeOnce sync.Once
104108
dispatchOnce sync.Once
109+
snapshotOnce sync.Once
105110
dispatchWG sync.WaitGroup
111+
snapshotWG sync.WaitGroup
106112

107113
mu sync.RWMutex
108114
status raftengine.Status
@@ -111,8 +117,14 @@ type Engine struct {
111117
closed bool
112118
applied uint64
113119

120+
snapshotMu sync.Mutex
121+
122+
dispatchDropCount atomic.Uint64
123+
dispatchErrorCount atomic.Uint64
124+
114125
pendingProposals map[uint64]proposalRequest
115126
pendingReads map[uint64]readRequest
127+
snapshotInFlight bool
116128
}
117129

118130
type proposalRequest struct {
@@ -137,6 +149,16 @@ type readResult struct {
137149
err error
138150
}
139151

152+
type snapshotRequest struct {
153+
index uint64
154+
snapshot Snapshot
155+
}
156+
157+
type snapshotResult struct {
158+
index uint64
159+
err error
160+
}
161+
140162
// Open starts the etcd/raft backend.
141163
//
142164
// Single-node bootstrap waits for local leadership so callers can use the
@@ -206,6 +228,12 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) {
206228
engine.transport.SetHandler(engine.handleTransportMessage)
207229
engine.startDispatchWorkers()
208230
}
231+
if engine.persist != nil {
232+
engine.snapshotReqCh = make(chan snapshotRequest)
233+
engine.snapshotResCh = make(chan snapshotResult, 1)
234+
engine.snapshotStopCh = make(chan struct{})
235+
engine.startSnapshotWorker()
236+
}
209237
engine.refreshStatus()
210238

211239
go engine.run()
@@ -396,7 +424,12 @@ func (e *Engine) run() {
396424
return
397425
}
398426

399-
if !e.handleEvent(ticker.C) {
427+
ok, err := e.handleEvent(ticker.C)
428+
if err != nil {
429+
e.fail(err)
430+
return
431+
}
432+
if !ok {
400433
e.shutdown()
401434
return
402435
}
@@ -419,10 +452,10 @@ func (e *Engine) startup() error {
419452
return e.drainReady()
420453
}
421454

422-
func (e *Engine) handleEvent(tick <-chan time.Time) bool {
455+
func (e *Engine) handleEvent(tick <-chan time.Time) (bool, error) {
423456
select {
424457
case <-e.closeCh:
425-
return false
458+
return false, nil
426459
case <-tick:
427460
e.rawNode.Tick()
428461
case req := <-e.proposeCh:
@@ -431,8 +464,12 @@ func (e *Engine) handleEvent(tick <-chan time.Time) bool {
431464
e.handleRead(req)
432465
case msg := <-e.stepCh:
433466
e.handleStep(msg)
467+
case result := <-e.snapshotResCh:
468+
if err := e.handleSnapshotResult(result); err != nil {
469+
return false, err
470+
}
434471
}
435-
return true
472+
return true, nil
436473
}
437474

438475
func (e *Engine) handleProposal(req proposalRequest) {
@@ -520,13 +557,19 @@ func (e *Engine) sendMessages(messages []raftpb.Message) error {
520557
if e.transport == nil || e.dispatchCh == nil {
521558
continue
522559
}
523-
cloned, err := cloneDispatchMessage(msg)
524-
if err != nil {
525-
return err
526-
}
560+
cloned := cloneDispatchMessage(msg)
527561
select {
528562
case e.dispatchCh <- cloned:
529563
default:
564+
count := e.dispatchDropCount.Add(1)
565+
if shouldLogDispatchEvent(count) {
566+
slog.Warn("dropping etcd raft outbound message",
567+
"node_id", e.nodeID,
568+
"to", msg.To,
569+
"type", msg.Type.String(),
570+
"drop_count", count,
571+
)
572+
}
530573
}
531574
}
532575
return nil
@@ -539,6 +582,8 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error {
539582
if len(snapshot.Data) == 0 {
540583
return errors.WithStack(errSnapshotRequired)
541584
}
585+
e.snapshotMu.Lock()
586+
defer e.snapshotMu.Unlock()
542587
if err := e.fsm.Restore(bytes.NewReader(snapshot.Data)); err != nil {
543588
return errors.WithStack(err)
544589
}
@@ -566,7 +611,7 @@ func (e *Engine) applyReadyHardState(hardState raftpb.HardState) error {
566611
}
567612

568613
func (e *Engine) maybePersistLocalSnapshot() error {
569-
if e.applied == 0 || e.persist == nil {
614+
if e.applied == 0 || e.persist == nil || e.snapshotReqCh == nil || e.snapshotInFlight {
570615
return nil
571616
}
572617

@@ -577,11 +622,16 @@ func (e *Engine) maybePersistLocalSnapshot() error {
577622
if e.applied <= current.Metadata.Index || e.applied-current.Metadata.Index < defaultSnapshotEvery {
578623
return nil
579624
}
580-
snapshot, err := persistLocalSnapshot(e.storage, e.persist, e.fsm, e.applied)
625+
snapshot, err := e.fsm.Snapshot()
581626
if err != nil {
582-
return err
627+
return errors.WithStack(err)
628+
}
629+
e.snapshotInFlight = true
630+
e.snapshotReqCh <- snapshotRequest{
631+
index: e.applied,
632+
snapshot: snapshot,
583633
}
584-
return e.refreshLocalSnapshot(snapshot)
634+
return nil
585635
}
586636

587637
func (e *Engine) applyCommitted(entries []raftpb.Entry) error {
@@ -716,6 +766,7 @@ func (e *Engine) shutdown() {
716766
e.status.State = raftengine.StateShutdown
717767
e.mu.Unlock()
718768
e.stopDispatchWorkers()
769+
e.stopSnapshotWorker()
719770
_ = closePersist(e.persist)
720771
_ = e.transport.Close()
721772
e.failPending(errors.WithStack(errClosed))
@@ -730,6 +781,7 @@ func (e *Engine) fail(err error) {
730781
e.status.State = raftengine.StateShutdown
731782
e.mu.Unlock()
732783
e.stopDispatchWorkers()
784+
e.stopSnapshotWorker()
733785
_ = closePersist(e.persist)
734786
_ = e.transport.Close()
735787
e.failPending(e.currentErrorOrClosed())
@@ -887,13 +939,6 @@ func maxAppliedIndex(snapshot raftpb.Snapshot) uint64 {
887939
return snapshot.Metadata.Index
888940
}
889941

890-
func (e *Engine) refreshLocalSnapshot(snapshot raftpb.Snapshot) error {
891-
if etcdraft.IsEmptySnap(snapshot) {
892-
return nil
893-
}
894-
return nil
895-
}
896-
897942
func (e *Engine) enqueueStep(ctx context.Context, msg raftpb.Message) error {
898943
select {
899944
case <-ctx.Done():
@@ -934,6 +979,16 @@ func (e *Engine) runDispatchWorker() {
934979
return
935980
case msg := <-e.dispatchCh:
936981
if err := e.transport.Dispatch(context.Background(), msg); err != nil {
982+
count := e.dispatchErrorCount.Add(1)
983+
if shouldLogDispatchEvent(count) {
984+
slog.Warn("etcd raft outbound dispatch failed",
985+
"node_id", e.nodeID,
986+
"to", msg.To,
987+
"type", msg.Type.String(),
988+
"dispatch_error_count", count,
989+
"err", err,
990+
)
991+
}
937992
continue
938993
}
939994
}
@@ -949,6 +1004,43 @@ func (e *Engine) stopDispatchWorkers() {
9491004
})
9501005
}
9511006

1007+
func (e *Engine) startSnapshotWorker() {
1008+
if e.snapshotReqCh == nil {
1009+
return
1010+
}
1011+
e.snapshotWG.Add(1)
1012+
go e.runSnapshotWorker()
1013+
}
1014+
1015+
func (e *Engine) runSnapshotWorker() {
1016+
defer e.snapshotWG.Done()
1017+
for {
1018+
select {
1019+
case <-e.snapshotStopCh:
1020+
return
1021+
case req := <-e.snapshotReqCh:
1022+
result := snapshotResult{
1023+
index: req.index,
1024+
err: e.persistSnapshot(req),
1025+
}
1026+
select {
1027+
case <-e.snapshotStopCh:
1028+
return
1029+
case e.snapshotResCh <- result:
1030+
}
1031+
}
1032+
}
1033+
}
1034+
1035+
func (e *Engine) stopSnapshotWorker() {
1036+
e.snapshotOnce.Do(func() {
1037+
if e.snapshotStopCh != nil {
1038+
close(e.snapshotStopCh)
1039+
}
1040+
e.snapshotWG.Wait()
1041+
})
1042+
}
1043+
9521044
func (e *Engine) leaderInfo(leaderNodeID uint64) raftengine.LeaderInfo {
9531045
if leaderNodeID == 0 {
9541046
return raftengine.LeaderInfo{}
@@ -982,16 +1074,99 @@ func dispatchQueueSize(maxInflight int) int {
9821074
return size * defaultDispatchWorkers
9831075
}
9841076

985-
func cloneDispatchMessage(msg raftpb.Message) (raftpb.Message, error) {
986-
raw, err := msg.Marshal()
1077+
func shouldLogDispatchEvent(count uint64) bool {
1078+
return count == 1 || count%128 == 0
1079+
}
1080+
1081+
func cloneDispatchMessage(msg raftpb.Message) raftpb.Message {
1082+
cloned := msg
1083+
cloned.Entries = cloneDispatchEntries(msg.Entries)
1084+
cloned.Snapshot = cloneDispatchSnapshot(msg.Snapshot)
1085+
cloned.Context = append([]byte(nil), msg.Context...)
1086+
cloned.Responses = cloneDispatchMessages(msg.Responses)
1087+
return cloned
1088+
}
1089+
1090+
func cloneDispatchMessages(messages []raftpb.Message) []raftpb.Message {
1091+
if len(messages) == 0 {
1092+
return nil
1093+
}
1094+
cloned := make([]raftpb.Message, len(messages))
1095+
for i, msg := range messages {
1096+
cloned[i] = cloneDispatchMessage(msg)
1097+
}
1098+
return cloned
1099+
}
1100+
1101+
func cloneDispatchEntries(entries []raftpb.Entry) []raftpb.Entry {
1102+
if len(entries) == 0 {
1103+
return nil
1104+
}
1105+
cloned := make([]raftpb.Entry, len(entries))
1106+
for i, entry := range entries {
1107+
cloned[i] = entry
1108+
cloned[i].Data = append([]byte(nil), entry.Data...)
1109+
}
1110+
return cloned
1111+
}
1112+
1113+
func cloneDispatchSnapshot(snapshot *raftpb.Snapshot) *raftpb.Snapshot {
1114+
if snapshot == nil {
1115+
return nil
1116+
}
1117+
cloned := *snapshot
1118+
cloned.Data = append([]byte(nil), snapshot.Data...)
1119+
cloned.Metadata.ConfState = cloneDispatchConfState(snapshot.Metadata.ConfState)
1120+
return &cloned
1121+
}
1122+
1123+
func cloneDispatchConfState(conf raftpb.ConfState) raftpb.ConfState {
1124+
conf.Voters = append([]uint64(nil), conf.Voters...)
1125+
conf.Learners = append([]uint64(nil), conf.Learners...)
1126+
conf.VotersOutgoing = append([]uint64(nil), conf.VotersOutgoing...)
1127+
conf.LearnersNext = append([]uint64(nil), conf.LearnersNext...)
1128+
return conf
1129+
}
1130+
1131+
func (e *Engine) handleSnapshotResult(result snapshotResult) error {
1132+
e.snapshotInFlight = false
1133+
if result.err != nil {
1134+
return result.err
1135+
}
1136+
return e.maybePersistLocalSnapshot()
1137+
}
1138+
1139+
func (e *Engine) persistSnapshot(req snapshotRequest) error {
1140+
payload, err := snapshotBytes(req.snapshot)
9871141
if err != nil {
988-
return raftpb.Message{}, errors.WithStack(err)
1142+
return err
9891143
}
990-
var cloned raftpb.Message
991-
if err := cloned.Unmarshal(raw); err != nil {
992-
return raftpb.Message{}, errors.WithStack(err)
1144+
1145+
e.snapshotMu.Lock()
1146+
defer e.snapshotMu.Unlock()
1147+
1148+
current, err := e.storage.Snapshot()
1149+
if err != nil {
1150+
return errors.WithStack(err)
1151+
}
1152+
if req.index <= current.Metadata.Index {
1153+
return nil
1154+
}
1155+
1156+
_, err = persistLocalSnapshotPayload(e.storage, e.persist, req.index, payload)
1157+
if err != nil {
1158+
switch {
1159+
case errors.Is(err, etcdraft.ErrCompacted):
1160+
return nil
1161+
case errors.Is(err, etcdraft.ErrUnavailable):
1162+
return nil
1163+
case errors.Is(err, etcdraft.ErrSnapOutOfDate):
1164+
return nil
1165+
default:
1166+
return err
1167+
}
9931168
}
994-
return cloned, nil
1169+
return nil
9951170
}
9961171

9971172
func encodeReadContext(id uint64) []byte {

0 commit comments

Comments
 (0)