-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathengine.go
More file actions
3695 lines (3417 loc) · 117 KB
/
engine.go
File metadata and controls
3695 lines (3417 loc) · 117 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package etcd
import (
"bytes"
"context"
"encoding/binary"
"io"
"log/slog"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/bootjp/elastickv/internal/raftengine"
"github.com/cockroachdb/errors"
etcdstorage "go.etcd.io/etcd/server/v3/storage"
etcdraft "go.etcd.io/raft/v3"
raftpb "go.etcd.io/raft/v3/raftpb"
"google.golang.org/grpc/status"
)
const (
defaultTickInterval = 10 * time.Millisecond
defaultHeartbeatTick = 10 // 100ms at 10ms interval
defaultElectionTick = 100 // 1s at 10ms interval (10x heartbeat, etcd/raft recommended ratio)
// leaseSafetyMargin is subtracted from electionTimeout when computing the
// duration of a leader-local read lease. It absorbs goroutine scheduling
// delay between heartbeat ack and lease refresh, GC pauses on the leader,
// and bounded wall-clock skew between the leader and a partition's new
// leader candidate. See docs/lease_read_design.md for the safety argument.
leaseSafetyMargin = 300 * time.Millisecond
// defaultMaxInflightMsg controls how many in-flight MsgApp messages Raft
// allows per peer before waiting for an ACK. It also sizes the inbound
// stepCh, dispatchReportCh, and the per-peer outbound "normal" dispatch
// queue. Total buffered memory is bounded by
// O(numPeers × MaxInflightMsg × avgMsgSize).
//
// Raised from 256 → 512 to absorb short CPU bursts without forcing
// peers to reject with "etcd raft inbound step queue is full".
// Under production congestion we observed the 256-slot inbound
// stepCh on followers filling up while their event loop was held
// up by adapter-side pebble seek storms (PRs #560, #562, #563,
// #565 removed most of that CPU); 512 is a 2× safety margin.
//
// We intentionally do NOT raise this in lock-step with the 2 MiB
// defaultMaxSizePerMsg: the two knobs multiply, and 1024 × 2 MiB
// is a 2 GiB per-peer worst-case product that a bursty multi-peer
// deployment can plausibly realise under TCP backpressure loss.
// 512 × 2 MiB halves that to 1 GiB per peer (4 GiB on a 5-node
// leader with 4 followers), which fits comfortably inside the
// 4–16 GiB RAM envelope of typical elastickv nodes while still
// preserving the MsgApp-batching win that motivates raising the
// byte cap above etcd/raft's 1 MiB upstream default on small-entry
// workloads. Operators who need deeper pipelines (large clusters
// with plenty of RAM) can raise this via
// ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators
// who need a smaller memory ceiling can lower MaxSizePerMsg via
// ELASTICKV_RAFT_MAX_SIZE_PER_MSG.
defaultMaxInflightMsg = 512
// minInboundChannelCap is the floor applied when sizing the engine's
// inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg.
// Even if a (misconfigured) caller drops MaxInflightMsg below this,
// we keep at least this much buffering so that a single tick burst
// doesn't trip errStepQueueFull on the inbound side. 256 matches the
// pre-#529 compiled-in default that was known to be survivable.
minInboundChannelCap = 256
// defaultMaxSizePerMsg caps the byte size of a single MsgApp payload.
// Set to 2 MiB — double etcd/raft's 1 MiB upstream default — so each
// MsgApp amortises more entries under small-entry workloads
// (Redis-style KV, median entry ~500 B; 2 MiB / 500 B ≈ 4000 entries
// per MsgApp already saturates the per-RPC batching benefit).
// Fewer MsgApps per committed byte means fewer dispatcher wake-ups
// on the leader and fewer recv syscalls on the follower; the
// follower's apply loop also contends less with the read path.
//
// Lowered from 4 MiB → 2 MiB in tandem with defaultMaxInflightMsg=512
// to cap per-peer worst-case buffered Raft traffic at 1 GiB
// (512 × 2 MiB), i.e. 4 GiB on a 5-node leader with 4 followers.
// The previous 4 MiB cap produced a 2 GiB/peer, 8 GiB/leader
// worst case that was too tight for the 4–16 GiB RAM envelope
// typical elastickv nodes operate in; the batching win of 4 MiB
// over 2 MiB is marginal on small-entry workloads.
defaultMaxSizePerMsg = 2 << 20
// maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the
// Raft-level flow-control knobs without a rebuild. Parsed once at
// Open and passed through normalizeLimitConfig; invalid values fall
// back to the defaults with a warning. MaxSizePerMsg is expressed
// as an integer byte count for consistency with the other numeric
// knobs in this package.
maxInflightMsgEnvVar = "ELASTICKV_RAFT_MAX_INFLIGHT_MSGS"
maxSizePerMsgEnvVar = "ELASTICKV_RAFT_MAX_SIZE_PER_MSG"
// minMaxSizePerMsg is the lower bound accepted from the environment
// override. A payload cap below ~1 KiB makes MsgApp batching
// degenerate (one entry per message) which defeats the whole point
// of the knob; fall back to the default rather than rejecting so that a
// fat-fingered operator doesn't take out the engine.
minMaxSizePerMsg uint64 = 1 << 10
// maxMaxInflightMsg caps the environment override for
// MaxInflightMsg. Open() uses the resolved value to allocate
// stepCh, dispatchReportCh, and every per-peer dispatch queue, so
// a fat-fingered ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=1e8 triggers
// multi-GB channel allocations and crashes the process before the
// node even becomes healthy. 8192 is ~16× the compiled-in default
// (512) — plenty of headroom for pipelining experiments, and well
// below the point where channel allocation alone would OOM a
// 32 GiB runner. Values above this clamp back to the default with
// a warning (same policy as sub-1 values) so a misconfigured
// operator never hard-breaks startup.
maxMaxInflightMsg = 8192
// maxMaxSizePerMsg caps the environment override for
// MaxSizePerMsg at the Raft transport's per-message budget. The
// server- and dial-side gRPC options (see internal.GRPCMaxMessageBytes)
// reject frames larger than 64 MiB, so a MaxSizePerMsg above that
// makes Raft emit MsgApp payloads the transport physically cannot
// carry, producing repeated send failures / unreachable reports
// under large batches. Keeping this equal to GRPCMaxMessageBytes
// makes the transport budget the single source of truth — raise
// BOTH together, never this one alone. Values above this clamp
// back to the default with a warning.
maxMaxSizePerMsg uint64 = 64 << 20
// defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel.
// It carries low-frequency control traffic: heartbeats, votes, read-index,
// leader-transfer, and their corresponding response messages
// (MsgHeartbeatResp, MsgReadIndexResp, MsgVoteResp, MsgPreVoteResp).
// MsgAppResp is intentionally kept in the normal channel: followers — the
// only senders of MsgAppResp — do not send MsgApp, so there is no
// head-of-line blocking risk there.
//
// Raised from 64 → 512 after the leader logged heartbeat drops
// totalling 1.6M+ (dispatchDropCount) while the transport drained
// slower than heartbeat tick issuance. Heartbeats are tiny
// (< ~100 B), so 512 × numPeers is ≪ 1 MB total memory; the
// upside is that a ~5 s transient pause (election-timeout scale)
// no longer drops heartbeats and forces the peers' lease to expire.
defaultHeartbeatBufPerPeer = 512
// defaultSnapshotLaneBufPerPeer sizes the per-peer MsgSnap lane when the
// 4-lane dispatcher mode is enabled (see ELASTICKV_RAFT_DISPATCHER_LANES).
// MsgSnap is rare and bulky; 4 is enough to absorb a retry or two without
// holding up MsgApp replication behind a multi-MiB payload.
defaultSnapshotLaneBufPerPeer = 4
// defaultOtherLaneBufPerPeer sizes the per-peer fallback lane for message
// types not classified as heartbeat/replication/snapshot (e.g. surprise
// locally-addressed control types). Small buffer: traffic volume is tiny.
defaultOtherLaneBufPerPeer = 16
// dispatcherLanesEnvVar toggles the 4-lane dispatcher (heartbeat /
// replication / snapshot / other). When unset or "0", the legacy
// 2-lane layout (heartbeat + normal) is used. Opt-in by design: the
// raft hot path is high blast radius and a regression here can cause
// cluster-wide elections.
dispatcherLanesEnvVar = "ELASTICKV_RAFT_DISPATCHER_LANES"
// defaultSnapshotEvery is the fallback trigger threshold: take an FSM
// snapshot once the applied index has advanced this many entries past
// the last snapshot's index. etcd/raft itself uses 10_000 as a default,
// but with fat proposal payloads (e.g. Lua scripts) this can produce a
// multi-GiB WAL between snapshots. Operators can lower via
// ELASTICKV_RAFT_SNAPSHOT_COUNT without a rebuild.
defaultSnapshotEvery = 10_000
snapshotEveryEnvVar = "ELASTICKV_RAFT_SNAPSHOT_COUNT"
defaultSnapshotQueueSize = 1
defaultAdminPollInterval = 10 * time.Millisecond
defaultMaxPendingConfigs = 64
unknownLastContact = time.Duration(-1)
proposalEnvelopeVersion = byte(0x01)
readContextVersion = byte(0x02)
confChangeContextVersion = byte(0x03)
envelopeHeaderSize = 9
confChangeFixedSize = 21
)
var (
errNilEngine = errors.New("raft engine is not configured")
errClosed = errors.New("etcd raft engine is closed")
errNotLeader = errors.Mark(errors.New("etcd raft engine is not leader"), raftengine.ErrNotLeader)
errNodeIDRequired = errors.New("etcd raft node id is required")
errDataDirRequired = errors.New("etcd raft data dir is required")
errStateMachineUnset = errors.New("etcd raft state machine is not configured")
errSnapshotRequired = errors.New("etcd raft snapshot payload is required")
errStepQueueFull = errors.New("etcd raft inbound step queue is full")
errClusterMismatch = errors.New("etcd raft persisted cluster does not match configured peers")
errConfigIndexMismatch = errors.New("etcd raft configuration index does not match")
errConfChangeContextTooLarge = errors.New("etcd raft conf change context is too large")
errLeadershipTransferTarget = errors.New("etcd raft leadership transfer target is required")
errLeadershipTransferNotReady = errors.New("etcd raft leadership transfer target is not available")
errLeadershipTransferAborted = errors.New("etcd raft leadership transfer aborted")
errLeadershipTransferRejected = errors.New("etcd raft leadership transfer was rejected by raft (target is not a voter)")
errLeadershipTransferNotLeader = errors.Mark(errors.New("etcd raft leadership transfer requires the local node to be leader"), raftengine.ErrNotLeader)
errLeadershipTransferInProgress = errors.Mark(errors.New("etcd raft leadership transfer is in progress"), raftengine.ErrLeadershipTransferInProgress)
errTooManyPendingConfigs = errors.New("etcd raft engine has too many pending config changes")
)
// Snapshot is an alias for the shared raftengine.Snapshot interface.
type Snapshot = raftengine.Snapshot
// StateMachine is an alias for the shared raftengine.StateMachine interface.
type StateMachine = raftengine.StateMachine
type OpenConfig struct {
NodeID uint64
LocalID string
LocalAddress string
DataDir string
Peers []Peer
Bootstrap bool
Transport *GRPCTransport
TickInterval time.Duration
ElectionTick int
HeartbeatTick int
StateMachine StateMachine
// MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level
// flow control). Default: 2 MiB (see defaultMaxSizePerMsg). Larger values
// amortise more entries per MsgApp under small-entry workloads; smaller
// values tighten worst-case memory. Operators can override at runtime via
// ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a
// rebuild; the env var takes precedence over the caller-supplied value.
MaxSizePerMsg uint64
// MaxInflightMsg controls how many MsgApp messages Raft may have in-flight
// per peer before waiting for an acknowledgement (Raft-level flow control).
// It also sets the per-peer dispatch channel capacity, so total buffered
// memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize).
// Default: 512 (see defaultMaxInflightMsg). Increase for deeper pipelining
// on high-bandwidth links; lower in memory-constrained clusters. Operators
// can override at runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a
// rebuild; the env var takes precedence over the caller-supplied value.
MaxInflightMsg int
}
type Engine struct {
nodeID uint64
localID string
localAddress string
dataDir string
fsmSnapDir string
tickInterval time.Duration
electionTick int
storage *etcdraft.MemoryStorage
rawNode *etcdraft.RawNode
persist etcdstorage.Storage
fsm StateMachine
peers map[uint64]Peer
transport *GRPCTransport
nextRequestID atomic.Uint64
proposeCh chan proposalRequest
readCh chan readRequest
adminCh chan adminRequest
stepCh chan raftpb.Message
dispatchReportCh chan dispatchReport
peerDispatchers map[uint64]*peerQueues
perPeerQueueSize int
// dispatcherLanesEnabled toggles the 4-lane dispatcher layout. Captured
// once at Open from ELASTICKV_RAFT_DISPATCHER_LANES so the run-time code
// path is branch-free per message and does not need to re-read env vars.
dispatcherLanesEnabled bool
// snapshotEvery is the FSM-snapshot trigger threshold captured once at
// Open from ELASTICKV_RAFT_SNAPSHOT_COUNT. maybePersistLocalSnapshot
// runs on every Ready-drain pass, so re-parsing the env var (and
// potentially emitting a warning on malformed input) on every call
// would flood logs and burn CPU on the raft hot path. Cache it once.
snapshotEvery uint64
// maxWALFiles is the WAL retention cap captured once at Open from
// ELASTICKV_RAFT_MAX_WAL_FILES. Purges are relatively rare (only after
// snapshot release) but caching avoids a second warning-per-invalid
// call site and keeps the knob consistent across the engine lifetime.
maxWALFiles int
dispatchStopCh chan struct{}
dispatchCtx context.Context
dispatchCancel context.CancelFunc
snapshotReqCh chan snapshotRequest
snapshotResCh chan snapshotResult
snapshotStopCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
startedCh chan struct{}
leaderReady chan struct{}
leaderOnce sync.Once
startOnce sync.Once
closeOnce sync.Once
dispatchOnce sync.Once
snapshotOnce sync.Once
dispatchWG sync.WaitGroup
snapshotWG sync.WaitGroup
mu sync.RWMutex
pending sync.Mutex
status raftengine.Status
config raftengine.Configuration
runErr error
closed bool
applied uint64
// appliedIndex mirrors the current applied-entry index for
// lock-free readers on the lease-read fast path. Writers inside
// the Raft run loop update both `applied` (protected by the run
// loop's single-writer invariant) and `appliedIndex.Store(...)`.
// AppliedIndex() reads via atomic.Load so it does not contend
// with refreshStatus's write lock.
appliedIndex atomic.Uint64
// configIndex tracks the highest configuration index durably published to
// local raft snapshot state and peer metadata.
configIndex atomic.Uint64
lastLeaderContactAt time.Time
lastLeaderContactFrom uint64
// Restore swaps the underlying store state and must not race with the short
// critical section that publishes a newly persisted local snapshot.
snapshotMu sync.Mutex
dispatchDropCount atomic.Uint64
dispatchErrorCount atomic.Uint64
// dispatchErrorByCode subdivides dispatchErrorCount by the grpc
// status code returned from the transport (e.g. "Unavailable",
// "DeadlineExceeded"). Used to tell whether dispatch failures are
// network / backpressure / follower apply stalls. Surfaced to
// Prometheus via DispatchErrorCountsByCode() and the
// DispatchCollector poll loop.
dispatchErrorByCodeMu sync.Mutex
dispatchErrorByCode map[string]uint64
// stepQueueFullCount tracks the number of inbound raft messages
// (from remote peers and local handlers) that were dropped because
// stepCh was full. Surfaced to Prometheus as
// elastickv_raft_step_queue_full_total so operators can correlate
// seek-storm goroutine spikes with raft backpressure.
stepQueueFullCount atomic.Uint64
// ackTracker records per-peer last-response times on the leader and
// publishes the majority-ack instant via quorumAckUnixNano. It is
// read lock-free from LastQuorumAck() on the hot lease-read path
// and updated inside the single event-loop goroutine from
// handleStep when a follower response arrives.
ackTracker quorumAckTracker
// singleNodeLeaderAckUnixNano short-circuits LastQuorumAck on the
// single-node leader path: self IS the quorum, so there are no
// follower responses to observe. refreshStatus keeps this value
// current (set to time.Now().UnixNano() each tick while leader and
// cluster size is 1; cleared otherwise) so the lease-read hot path
// never has to acquire e.mu to check peer count or leader state.
singleNodeLeaderAckUnixNano atomic.Int64
// isLeader mirrors status.State == StateLeader for lock-free reads
// on the hot path. refreshStatus writes it on every tick;
// recordQuorumAck reads it before admitting a follower response
// into ackTracker (so late MsgAppResp / MsgHeartbeatResp arriving
// after a step-down cannot repopulate the tracker), and
// LastQuorumAck reads it to honor the LeaseProvider contract
// ("zero time when the local node is not the leader").
isLeader atomic.Bool
// leaderLossCbsMu guards the slice of callbacks invoked when the node
// transitions out of the leader role (graceful transfer, partition
// step-down, shutdown). Callbacks fire synchronously from the
// leader-loss handling path and MUST be non-blocking; a slow
// callback would hold up refreshStatus / shutdown / fail. See
// RegisterLeaderLossCallback for the full contract. Each entry
// carries a sentinel pointer so that the deregister closure
// returned by RegisterLeaderLossCallback can identify THIS
// specific registration even if the same fn is registered
// multiple times.
leaderLossCbsMu sync.Mutex
leaderLossCbs []leaderLossSlot
pendingProposals map[uint64]proposalRequest
pendingReads map[uint64]readRequest
pendingConfigs map[uint64]adminRequest
snapshotInFlight bool
dispatchFn func(context.Context, dispatchRequest) error
}
type adminAction byte
const (
adminActionAddVoter adminAction = iota + 1
adminActionRemoveServer
adminActionTransferLeadership
)
type adminRequest struct {
ctx context.Context
id uint64
action adminAction
peer Peer
prevIndex uint64
done chan adminResult
}
type adminResult struct {
index uint64
peer Peer
err error
}
type proposalRequest struct {
ctx context.Context
id uint64
payload []byte
done chan proposalResult
}
type proposalResult struct {
result *raftengine.ProposalResult
err error
}
type readRequest struct {
ctx context.Context
id uint64
done chan readResult
target uint64
waitApplied bool
}
type readResult struct {
index uint64
err error
}
type snapshotRequest struct {
index uint64
snapshot Snapshot
}
type snapshotResult struct {
index uint64
err error
}
type dispatchRequest struct {
msg raftpb.Message
}
// peerQueues holds separate dispatch channels per peer so that heartbeats
// are never blocked behind large log-entry RPCs.
//
// Legacy 2-lane layout (default): heartbeat + normal.
//
// 4-lane layout (opt-in via ELASTICKV_RAFT_DISPATCHER_LANES=1): heartbeat +
// replication (MsgApp/MsgAppResp) + snapshot (MsgSnap) + other. Each lane
// gets its own goroutine so a bulky MsgSnap transfer cannot stall MsgApp
// replication and vice versa. Per-peer ordering within a given message type
// is preserved because a single peer's MsgApp stream all share one lane and
// one worker.
type peerQueues struct {
normal chan dispatchRequest
heartbeat chan dispatchRequest
replication chan dispatchRequest // 4-lane mode only; nil otherwise
snapshot chan dispatchRequest // 4-lane mode only; nil otherwise
other chan dispatchRequest // 4-lane mode only; nil otherwise
ctx context.Context
cancel context.CancelFunc
}
type preparedOpenState struct {
cfg OpenConfig
peers []Peer
disk *diskState
}
// Open starts the etcd/raft backend.
//
// Single-node bootstrap waits for local leadership so callers can use the
// engine immediately. Multi-node startup returns after the local node is
// running; leadership is established asynchronously through raft transport.
func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) {
prepared, err := prepareOpenState(cfg)
if err != nil {
return nil, err
}
rawNode, err := newRawNode(prepared.cfg, prepared.disk.Storage)
if err != nil {
_ = closePersist(prepared.disk.Persist)
return nil, err
}
peerMap := make(map[uint64]Peer, len(prepared.peers))
for _, peer := range prepared.peers {
peerMap[peer.NodeID] = peer
}
var dispatchCtx context.Context
var dispatchCancel context.CancelFunc
if prepared.cfg.Transport != nil {
dispatchCtx, dispatchCancel = context.WithCancel(context.Background())
}
opened := false
defer func() {
if opened || dispatchCancel == nil {
return
}
dispatchCancel()
}()
engine := &Engine{
nodeID: prepared.cfg.NodeID,
localID: prepared.cfg.LocalID,
localAddress: prepared.cfg.LocalAddress,
dataDir: prepared.cfg.DataDir,
fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName),
tickInterval: prepared.cfg.TickInterval,
electionTick: prepared.cfg.ElectionTick,
storage: prepared.disk.Storage,
rawNode: rawNode,
persist: prepared.disk.Persist,
fsm: prepared.cfg.StateMachine,
peers: peerMap,
transport: prepared.cfg.Transport,
proposeCh: make(chan proposalRequest),
readCh: make(chan readRequest),
adminCh: make(chan adminRequest),
// Size the inbound step / dispatch-report channels from the
// resolved MaxInflightMsg (post-normalizeLimitConfig, which has
// already applied the env override and compiled-in default) so
// that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above
// the default actually get the extra buffering they asked for.
// Using defaultMaxInflightMsg here would silently cap the
// channel at 512 (the current compiled-in default) even when
// the Raft layer has been told to keep 2048 in flight,
// re-triggering errStepQueueFull under the exact bursty
// conditions this knob is meant to absorb.
stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)),
dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
startedCh: make(chan struct{}),
leaderReady: make(chan struct{}),
config: configurationFromConfState(peerMap, prepared.disk.LocalSnap.Metadata.ConfState),
applied: maxAppliedIndex(prepared.disk.LocalSnap),
dispatchCtx: dispatchCtx,
dispatchCancel: dispatchCancel,
pendingProposals: map[uint64]proposalRequest{},
pendingReads: map[uint64]readRequest{},
pendingConfigs: map[uint64]adminRequest{},
// Parse env-tunable retention/snapshot knobs once at Open. Both
// are consulted on hot paths (maybePersistLocalSnapshot runs per
// Ready-drain; purge runs after each snapshot persist) and the
// underlying env parsers emit warnings on malformed input, so
// re-reading them would flood logs under a misconfiguration.
snapshotEvery: snapshotEveryFromEnv(),
maxWALFiles: maxWALFilesFromEnv(),
}
engine.configIndex.Store(maxAppliedIndex(prepared.disk.LocalSnap))
engine.appliedIndex.Store(maxAppliedIndex(prepared.disk.LocalSnap))
engine.initTransport(prepared.cfg)
engine.initSnapshotWorker()
engine.refreshStatus()
// Surface a misconfiguration where the tick settings produce a
// non-positive lease window: lease reads would never hit the fast
// path. Don't fail Open -- the engine is still functional via the
// slow LinearizableRead path -- but make the degradation visible.
if lease := engine.LeaseDuration(); lease <= 0 {
slog.Warn("etcd raft engine: lease read disabled (non-positive LeaseDuration)",
slog.Duration("tick_interval", engine.tickInterval),
slog.Int("election_tick", engine.electionTick),
slog.Duration("lease_safety_margin", leaseSafetyMargin),
slog.Duration("computed_lease", lease),
)
}
go engine.run()
openedEngine, err := waitForOpen(ctx, engine, len(prepared.peers) == 1)
if err != nil {
return nil, err
}
opened = true
return openedEngine, nil
}
func prepareOpenState(cfg OpenConfig) (preparedOpenState, error) {
cfg, persistedPeers, persistedPeersOK, err := normalizeOpenConfig(cfg)
if err != nil {
return preparedOpenState{}, err
}
if err := validateConfig(cfg); err != nil {
return preparedOpenState{}, err
}
localPeer, peers, err := normalizePeers(cfg.NodeID, cfg.LocalID, cfg.LocalAddress, cfg.Peers, persistedPeersOK, cfg.Bootstrap)
if err != nil {
return preparedOpenState{}, err
}
cfg.NodeID = localPeer.NodeID
cfg.LocalID = localPeer.ID
cfg.LocalAddress = localPeer.Address
disk, err := openDiskState(cfg, peers)
if err != nil {
return preparedOpenState{}, err
}
if err := validateOpenPeers(disk.LocalSnap, peers, persistedPeers, persistedPeersOK); err != nil {
_ = closePersist(disk.Persist)
return preparedOpenState{}, err
}
if err := savePersistedPeers(cfg.DataDir, maxUint64(maxAppliedIndex(disk.LocalSnap), persistedPeers.Index), peers); err != nil {
_ = closePersist(disk.Persist)
return preparedOpenState{}, err
}
return preparedOpenState{
cfg: cfg,
peers: peers,
disk: disk,
}, nil
}
func (e *Engine) initTransport(cfg OpenConfig) {
if e.transport == nil {
return
}
// Transport listeners may already be accepting RPCs when Open is called.
// Gate inbound delivery on startedCh so messages are not enqueued until the
// local run loop has completed startup.
e.peerDispatchers = make(map[uint64]*peerQueues, len(e.peers))
// Size the per-peer dispatch buffer to match the Raft inflight limit so that
// the channel never drops messages that Raft's flow-control would permit.
e.perPeerQueueSize = cfg.MaxInflightMsg
e.dispatcherLanesEnabled = dispatcherLanesEnabledFromEnv()
e.dispatchStopCh = make(chan struct{})
e.transport.SetSpoolDir(cfg.DataDir)
e.transport.SetFSMSnapDir(e.fsmSnapDir)
e.transport.SetFSMPayloadReader(e.readFSMPayloadLocked)
e.transport.SetFSMPayloadOpener(e.openFSMPayloadLocked)
e.transport.SetHandler(e.handleTransportMessage)
e.startDispatchWorkers()
}
func (e *Engine) initSnapshotWorker() {
if e.persist == nil {
return
}
// Local snapshot persistence is only wired for disk-backed engines. When
// persist is nil the prototype intentionally leaves snapshot workers nil
// and maybePersistLocalSnapshot becomes a no-op.
e.snapshotReqCh = make(chan snapshotRequest, defaultSnapshotQueueSize)
e.snapshotResCh = make(chan snapshotResult, 1)
e.snapshotStopCh = make(chan struct{})
e.startSnapshotWorker()
}
func newRawNode(cfg OpenConfig, storage *etcdraft.MemoryStorage) (*etcdraft.RawNode, error) {
rawNode, err := etcdraft.NewRawNode(&etcdraft.Config{
ID: cfg.NodeID,
ElectionTick: cfg.ElectionTick,
HeartbeatTick: cfg.HeartbeatTick,
Storage: storage,
MaxSizePerMsg: cfg.MaxSizePerMsg,
MaxCommittedSizePerReady: cfg.MaxSizePerMsg,
MaxInflightMsgs: cfg.MaxInflightMsg,
CheckQuorum: true,
PreVote: true,
ReadOnlyOption: etcdraft.ReadOnlySafe,
DisableProposalForwarding: true,
})
if err != nil {
return nil, errors.WithStack(err)
}
return rawNode, nil
}
func waitForOpen(ctx context.Context, engine *Engine, waitForLeader bool) (*Engine, error) {
select {
case <-ctx.Done():
_ = engine.Close()
return nil, errors.WithStack(ctx.Err())
case <-engine.openReady(waitForLeader):
return engine, nil
case <-engine.doneCh:
if err := engine.currentError(); err != nil {
return nil, err
}
return nil, errors.WithStack(errClosed)
}
}
func (e *Engine) Close() error {
if e == nil {
return nil
}
e.closeOnce.Do(func() {
close(e.closeCh)
})
<-e.doneCh
if err := e.currentError(); err != nil && !errors.Is(err, errClosed) {
return err
}
return nil
}
func (e *Engine) Propose(ctx context.Context, data []byte) (*raftengine.ProposalResult, error) {
if err := contextErr(ctx); err != nil {
return nil, err
}
if e == nil {
return nil, errors.WithStack(errNilEngine)
}
req := proposalRequest{
ctx: ctx,
id: e.nextID(),
payload: append([]byte(nil), data...),
done: make(chan proposalResult, 1),
}
select {
case <-ctx.Done():
return nil, errors.WithStack(ctx.Err())
case <-e.doneCh:
return nil, e.currentErrorOrClosed()
case e.proposeCh <- req:
}
select {
case <-ctx.Done():
e.cancelPendingProposal(req.id)
return nil, errors.WithStack(ctx.Err())
case res := <-req.done:
if res.err != nil {
return nil, res.err
}
return res.result, nil
}
}
func (e *Engine) State() raftengine.State {
if e == nil {
return raftengine.StateUnknown
}
e.mu.RLock()
defer e.mu.RUnlock()
return e.status.State
}
func (e *Engine) Leader() raftengine.LeaderInfo {
if e == nil {
return raftengine.LeaderInfo{}
}
e.mu.RLock()
defer e.mu.RUnlock()
return e.status.Leader
}
func (e *Engine) VerifyLeader(ctx context.Context) error {
_, err := e.submitRead(ctx, false)
return err
}
func (e *Engine) CheckServing(ctx context.Context) error {
if err := contextErr(ctx); err != nil {
return err
}
if e == nil {
return errors.WithStack(errNilEngine)
}
if e.State() != raftengine.StateLeader {
return errors.WithStack(errNotLeader)
}
return nil
}
func (e *Engine) LinearizableRead(ctx context.Context) (uint64, error) {
return e.submitRead(ctx, true)
}
// LeaseDuration returns the time during which a lease holder can serve
// reads from local state without re-confirming leadership via ReadIndex.
// It is bounded by electionTimeout - leaseSafetyMargin so that the lease
// expires before a successor leader could realistically be elected and
// accept new writes elsewhere.
func (e *Engine) LeaseDuration() time.Duration {
if e == nil {
return 0
}
tick := e.tickInterval
if tick <= 0 {
tick = defaultTickInterval
}
election := e.electionTick
if election <= 0 {
election = defaultElectionTick
}
d := time.Duration(election)*tick - leaseSafetyMargin
if d < 0 {
return 0
}
return d
}
// AppliedIndex returns the highest log index applied to the local FSM.
// Suitable for callers that need a non-blocking read fence equivalent
// to what LinearizableRead would have returned, paired with an
// external quorum confirmation (e.g. a valid lease).
//
// Lock-free: reads the mirrored atomic.Uint64 written by the run
// loop's apply path (and by Restore's snapshot installation), so the
// lease-read fast path does not contend with refreshStatus's write
// lock under high read concurrency.
func (e *Engine) AppliedIndex() uint64 {
if e == nil {
return 0
}
return e.appliedIndex.Load()
}
// LastQuorumAck returns the wall-clock instant by which a majority of
// followers most recently responded to the leader, or the zero time
// when no such observation exists (follower / candidate / startup).
//
// Lock-free: reads atomic.Int64 values published by recordQuorumAck
// (multi-node cluster) or refreshStatus (single-node cluster keeps
// singleNodeLeaderAckUnixNano alive with time.Now() while leader, so
// the hot lease-read path performs zero lock work). See
// raftengine.LeaseProvider for the lease-read correctness contract.
func (e *Engine) LastQuorumAck() time.Time {
if e == nil {
return time.Time{}
}
// Honor the LeaseProvider contract that non-leaders always return
// the zero time. Without this guard a late MsgAppResp that sneaks
// past recordQuorumAck (or a tracker entry that survived a brief
// step-down/step-up window) could leak stale liveness into the
// caller's fast-path validation.
if !e.isLeader.Load() {
return time.Time{}
}
if ns := e.singleNodeLeaderAckUnixNano.Load(); ns != 0 {
return time.Unix(0, ns)
}
return e.ackTracker.load()
}
// DispatchDropCount returns the total number of outbound raft messages
// dropped before hitting the transport because the per-peer normal or
// heartbeat channel was full. Monotonic across the life of the engine.
// Surfaced to Prometheus via the monitoring package so the hot-path
// dashboard can graph stepCh saturation alongside LinearizableRead
// rate (see monitoring/grafana/dashboards/elastickv-redis-hotpath.json).
func (e *Engine) DispatchDropCount() uint64 {
if e == nil {
return 0
}
return e.dispatchDropCount.Load()
}
// DispatchErrorCount returns the total number of outbound raft
// dispatches that reached the transport but failed (network errors,
// remote shutdown, etc.). Monotonic across the life of the engine.
func (e *Engine) DispatchErrorCount() uint64 {
if e == nil {
return 0
}
return e.dispatchErrorCount.Load()
}
// DispatchErrorCountsByCode returns a snapshot of dispatch-error
// counts keyed by grpc status code ("Unavailable",
// "DeadlineExceeded", "ResourceExhausted", ...). Sum of values
// equals DispatchErrorCount(). A separate breakdown is needed to
// tell whether failures are peer-down (Unavailable), leader under
// load (DeadlineExceeded), or flow-control (ResourceExhausted).
// Returns an empty map when e is nil. Safe for concurrent callers;
// the returned map is a copy.
func (e *Engine) DispatchErrorCountsByCode() map[string]uint64 {
if e == nil {
return map[string]uint64{}
}
e.dispatchErrorByCodeMu.Lock()
defer e.dispatchErrorByCodeMu.Unlock()
if len(e.dispatchErrorByCode) == 0 {
return map[string]uint64{}
}
out := make(map[string]uint64, len(e.dispatchErrorByCode))
for k, v := range e.dispatchErrorByCode {
out[k] = v
}
return out
}
// recordDispatchErrorCode atomically increments both the aggregate
// dispatchErrorCount and the per-code bucket. code should be the
// grpc status code string from the error; callers that cannot
// extract one pass "Unknown".
func (e *Engine) recordDispatchErrorCode(code string) uint64 {
count := e.dispatchErrorCount.Add(1)
e.dispatchErrorByCodeMu.Lock()
if e.dispatchErrorByCode == nil {
e.dispatchErrorByCode = map[string]uint64{}
}
e.dispatchErrorByCode[code]++
e.dispatchErrorByCodeMu.Unlock()
return count
}
// StepQueueFullCount returns the total number of inbound raft messages
// that could not be enqueued into stepCh because the channel was at
// capacity. This is the "etcd raft inbound step queue is full" signal
// from the task description: a spike indicates the local raft loop
// is starved, usually by something blocking the apply path such as
// the pre-#560 rawKeyTypeAt seek storm.
func (e *Engine) StepQueueFullCount() uint64 {
if e == nil {
return 0
}
return e.stepQueueFullCount.Load()
}
// RegisterLeaderLossCallback registers fn to fire every time the local
// node's Raft state transitions out of leader (CheckQuorum step-down,
// graceful transfer completion, partition-induced demotion) and also
// on shutdown() while the node was still leader. Callbacks are NOT
// fired at the moment a transfer starts (LeadTransferee != 0); they
// only fire once the transfer completes and state flips to follower.
// Lease-read callers use this to invalidate cached lease state so the
// next read takes the slow path.
//
// Callbacks run synchronously from refreshStatus / shutdown / fail
// and MUST be non-blocking (each should be a fast, lock-free
// invalidation). A panic inside a callback is contained and logged
// so a bug in one holder cannot crash the engine or break other
// callbacks. LeaseRead also guards its fast path on
// engine.State() == StateLeader so the small window between the
// transition and this callback completing cannot serve stale reads.
//
// The returned deregister function removes this specific registration
// and is safe to call multiple times. Long-lived callers (coordinators
// whose lifetime matches the engine's) may ignore it; shorter-lived
// callers MUST invoke it to avoid accumulating dead callbacks in the
// engine's slice.
func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func()) {
if e == nil || fn == nil {
return func() {}
}
// Allocate a unique sentinel pointer so the deregister closure can
// identify THIS specific registration even if the same fn is
// registered multiple times.
slot := &struct{ fn func() }{fn: fn}
e.leaderLossCbsMu.Lock()
e.leaderLossCbs = append(e.leaderLossCbs, leaderLossSlot{id: slot, fn: fn})
e.leaderLossCbsMu.Unlock()
var once sync.Once
return func() {
once.Do(func() {
e.leaderLossCbsMu.Lock()
defer e.leaderLossCbsMu.Unlock()
for i, c := range e.leaderLossCbs {
if c.id != slot {
continue
}
// Remove without leaving a dangling reference at the
// tail of the underlying array. The removed slot's fn
// typically captures a *Coordinate; a plain
// `append(cbs[:i], cbs[i+1:]...)` would keep the old
// backing cell alive and prevent GC of the associated
// Coordinate until the engine itself is dropped.
last := len(e.leaderLossCbs) - 1
copy(e.leaderLossCbs[i:], e.leaderLossCbs[i+1:])
e.leaderLossCbs[last] = leaderLossSlot{}
e.leaderLossCbs = e.leaderLossCbs[:last]
return
}
})
}
}
// leaderLossSlot pairs a registered callback with an id-only sentinel
// pointer so deregister can distinguish identical fn values.
type leaderLossSlot struct {
id *struct{ fn func() }
fn func()
}
// fireLeaderLossCallbacks invokes all registered callbacks
// synchronously. The registered-callback contract requires each fn
// to be non-blocking (a lock-free lease-invalidate flag flip), so
// inline execution is safe and avoids spawning an unbounded number
// of goroutines per leader-loss event when many shards / coordinators
// are registered.
//
// A panicking callback is still contained (see
// invokeLeaderLossCallback) so a bug in one holder cannot break
// subsequent callbacks or crash the process.
func (e *Engine) fireLeaderLossCallbacks() {
e.leaderLossCbsMu.Lock()
cbs := make([]func(), len(e.leaderLossCbs))
for i, c := range e.leaderLossCbs {
cbs[i] = c.fn
}
e.leaderLossCbsMu.Unlock()
for _, fn := range cbs {
e.invokeLeaderLossCallback(fn)
}
}
func (e *Engine) invokeLeaderLossCallback(fn func()) {
defer func() {
if r := recover(); r != nil {