-
Notifications
You must be signed in to change notification settings - Fork 2
perf(raft): raise MaxInflightMsgs=1024, MaxSizePerMsg=4MB defaults #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
98554e6
bb6ff01
0546484
ca6a415
a248131
4edd02d
b5c5298
d60cdcf
639ef08
8f72769
11cb09b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -40,19 +40,66 @@ const ( | |||||||||||||||||||||||||
| // queue. Total buffered memory is bounded by | ||||||||||||||||||||||||||
| // O(numPeers × MaxInflightMsg × avgMsgSize). | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Raised from 256 → 1024 to absorb short CPU bursts without forcing | ||||||||||||||||||||||||||
| // Raised from 256 → 512 to absorb short CPU bursts without forcing | ||||||||||||||||||||||||||
| // peers to reject with "etcd raft inbound step queue is full". | ||||||||||||||||||||||||||
| // Under production congestion we observed the 256-slot inbound | ||||||||||||||||||||||||||
| // stepCh on followers filling up while their event loop was held | ||||||||||||||||||||||||||
| // up by adapter-side pebble seek storms (PRs #560, #562, #563, | ||||||||||||||||||||||||||
| // #565 removed most of that CPU); 1024 is a 4× safety margin. | ||||||||||||||||||||||||||
| // Note that with the current defaultMaxSizePerMsg of 1 MiB, the | ||||||||||||||||||||||||||
| // true worst-case bound can be much larger (up to roughly 1 GiB | ||||||||||||||||||||||||||
| // per peer if every slot held a max-sized message). In practice, | ||||||||||||||||||||||||||
| // typical MsgApp payloads are far smaller, so expected steady-state | ||||||||||||||||||||||||||
| // memory remains much lower than that worst-case bound. | ||||||||||||||||||||||||||
| defaultMaxInflightMsg = 1024 | ||||||||||||||||||||||||||
| defaultMaxSizePerMsg = 1 << 20 | ||||||||||||||||||||||||||
| // #565 removed most of that CPU); 512 is a 2× safety margin. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // We intentionally do NOT raise this in lock-step with the 2 MiB | ||||||||||||||||||||||||||
| // defaultMaxSizePerMsg: the two knobs multiply, and 1024 × 2 MiB | ||||||||||||||||||||||||||
| // is a 2 GiB per-peer worst-case product that a bursty multi-peer | ||||||||||||||||||||||||||
| // deployment can plausibly realise under TCP backpressure loss. | ||||||||||||||||||||||||||
| // 512 × 2 MiB halves that to 1 GiB per peer (4 GiB on a 5-node | ||||||||||||||||||||||||||
| // leader with 4 followers), which fits comfortably inside the | ||||||||||||||||||||||||||
| // 4–16 GiB RAM envelope of typical elastickv nodes while still | ||||||||||||||||||||||||||
| // preserving the MsgApp-batching win that motivates raising the | ||||||||||||||||||||||||||
| // byte cap above etcd/raft's 1 MiB upstream default on small-entry | ||||||||||||||||||||||||||
| // workloads. Operators who need deeper pipelines (large clusters | ||||||||||||||||||||||||||
| // with plenty of RAM) can raise this via | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators | ||||||||||||||||||||||||||
| // who need a smaller memory ceiling can lower MaxSizePerMsg via | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. | ||||||||||||||||||||||||||
| defaultMaxInflightMsg = 512 | ||||||||||||||||||||||||||
| // minInboundChannelCap is the floor applied when sizing the engine's | ||||||||||||||||||||||||||
| // inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg. | ||||||||||||||||||||||||||
| // Even if a (misconfigured) caller drops MaxInflightMsg below this, | ||||||||||||||||||||||||||
| // we keep at least this much buffering so that a single tick burst | ||||||||||||||||||||||||||
| // doesn't trip errStepQueueFull on the inbound side. 256 matches the | ||||||||||||||||||||||||||
| // pre-#529 compiled-in default that was known to be survivable. | ||||||||||||||||||||||||||
| minInboundChannelCap = 256 | ||||||||||||||||||||||||||
| // defaultMaxSizePerMsg caps the byte size of a single MsgApp payload. | ||||||||||||||||||||||||||
| // Set to 2 MiB — double etcd/raft's 1 MiB upstream default — so each | ||||||||||||||||||||||||||
| // MsgApp amortises more entries under small-entry workloads | ||||||||||||||||||||||||||
| // (Redis-style KV, median entry ~500 B; 2 MiB / 500 B ≈ 4000 entries | ||||||||||||||||||||||||||
| // per MsgApp already saturates the per-RPC batching benefit). | ||||||||||||||||||||||||||
| // Fewer MsgApps per committed byte means fewer dispatcher wake-ups | ||||||||||||||||||||||||||
| // on the leader and fewer recv syscalls on the follower; the | ||||||||||||||||||||||||||
| // follower's apply loop also contends less with the read path. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Lowered from 4 MiB → 2 MiB in tandem with defaultMaxInflightMsg=512 | ||||||||||||||||||||||||||
| // to cap per-peer worst-case buffered Raft traffic at 1 GiB | ||||||||||||||||||||||||||
| // (512 × 2 MiB), i.e. 4 GiB on a 5-node leader with 4 followers. | ||||||||||||||||||||||||||
| // The previous 4 MiB cap produced a 2 GiB/peer, 8 GiB/leader | ||||||||||||||||||||||||||
| // worst case that was too tight for the 4–16 GiB RAM envelope | ||||||||||||||||||||||||||
| // typical elastickv nodes operate in; the batching win of 4 MiB | ||||||||||||||||||||||||||
| // over 2 MiB is marginal on small-entry workloads. | ||||||||||||||||||||||||||
| defaultMaxSizePerMsg = 2 << 20 | ||||||||||||||||||||||||||
| // maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the | ||||||||||||||||||||||||||
| // Raft-level flow-control knobs without a rebuild. Parsed once at | ||||||||||||||||||||||||||
| // Open and passed through normalizeLimitConfig; invalid values fall | ||||||||||||||||||||||||||
| // back to the defaults with a warning. MaxSizePerMsg is expressed | ||||||||||||||||||||||||||
| // as an integer byte count for consistency with the other numeric | ||||||||||||||||||||||||||
| // knobs in this package. | ||||||||||||||||||||||||||
| maxInflightMsgEnvVar = "ELASTICKV_RAFT_MAX_INFLIGHT_MSGS" | ||||||||||||||||||||||||||
| maxSizePerMsgEnvVar = "ELASTICKV_RAFT_MAX_SIZE_PER_MSG" | ||||||||||||||||||||||||||
| // minMaxSizePerMsg is the lower bound accepted from the environment | ||||||||||||||||||||||||||
| // override. A payload cap below ~1 KiB makes MsgApp batching | ||||||||||||||||||||||||||
| // degenerate (one entry per message) which defeats the whole point | ||||||||||||||||||||||||||
| // of the knob; fall back to the default rather than rejecting so that a | ||||||||||||||||||||||||||
| // fat-fingered operator doesn't take out the engine. | ||||||||||||||||||||||||||
| minMaxSizePerMsg uint64 = 1 << 10 | ||||||||||||||||||||||||||
| // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. | ||||||||||||||||||||||||||
| // It carries low-frequency control traffic: heartbeats, votes, read-index, | ||||||||||||||||||||||||||
| // leader-transfer, and their corresponding response messages | ||||||||||||||||||||||||||
|
|
@@ -142,13 +189,21 @@ type OpenConfig struct { | |||||||||||||||||||||||||
| ElectionTick int | ||||||||||||||||||||||||||
| HeartbeatTick int | ||||||||||||||||||||||||||
| StateMachine StateMachine | ||||||||||||||||||||||||||
| // MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level | ||||||||||||||||||||||||||
| // flow control). Default: 4 MiB. Larger values amortise more entries per | ||||||||||||||||||||||||||
| // MsgApp under small-entry workloads; smaller values tighten worst-case | ||||||||||||||||||||||||||
| // memory. Operators can override at runtime via | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a | ||||||||||||||||||||||||||
| // rebuild; the env var takes precedence over the caller-supplied value. | ||||||||||||||||||||||||||
| MaxSizePerMsg uint64 | ||||||||||||||||||||||||||
| // MaxInflightMsg controls how many MsgApp messages Raft may have in-flight | ||||||||||||||||||||||||||
| // per peer before waiting for an acknowledgement (Raft-level flow control). | ||||||||||||||||||||||||||
| // It also sets the per-peer dispatch channel capacity, so total buffered | ||||||||||||||||||||||||||
| // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). | ||||||||||||||||||||||||||
| // Default: 256. Increase for deeper pipelining on high-bandwidth links; | ||||||||||||||||||||||||||
| // lower in memory-constrained clusters. | ||||||||||||||||||||||||||
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; | ||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment indicates a default value of 1024, but the actual compiled-in default defaultMaxInflightMsg has been set to 512 in this PR (line 60). This mismatch can mislead callers who rely on the documentation to understand the behavior when the field is left at its zero value.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docstring for
Suggested change
References
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The exported Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| // lower in memory-constrained clusters. Operators can override at | ||||||||||||||||||||||||||
| // runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; the | ||||||||||||||||||||||||||
| // env var takes precedence over the caller-supplied value. | ||||||||||||||||||||||||||
| MaxInflightMsg int | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -419,24 +474,33 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { | |||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| engine := &Engine{ | ||||||||||||||||||||||||||
| nodeID: prepared.cfg.NodeID, | ||||||||||||||||||||||||||
| localID: prepared.cfg.LocalID, | ||||||||||||||||||||||||||
| localAddress: prepared.cfg.LocalAddress, | ||||||||||||||||||||||||||
| dataDir: prepared.cfg.DataDir, | ||||||||||||||||||||||||||
| fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), | ||||||||||||||||||||||||||
| tickInterval: prepared.cfg.TickInterval, | ||||||||||||||||||||||||||
| electionTick: prepared.cfg.ElectionTick, | ||||||||||||||||||||||||||
| storage: prepared.disk.Storage, | ||||||||||||||||||||||||||
| rawNode: rawNode, | ||||||||||||||||||||||||||
| persist: prepared.disk.Persist, | ||||||||||||||||||||||||||
| fsm: prepared.cfg.StateMachine, | ||||||||||||||||||||||||||
| peers: peerMap, | ||||||||||||||||||||||||||
| transport: prepared.cfg.Transport, | ||||||||||||||||||||||||||
| proposeCh: make(chan proposalRequest), | ||||||||||||||||||||||||||
| readCh: make(chan readRequest), | ||||||||||||||||||||||||||
| adminCh: make(chan adminRequest), | ||||||||||||||||||||||||||
| stepCh: make(chan raftpb.Message, defaultMaxInflightMsg), | ||||||||||||||||||||||||||
| dispatchReportCh: make(chan dispatchReport, defaultMaxInflightMsg), | ||||||||||||||||||||||||||
| nodeID: prepared.cfg.NodeID, | ||||||||||||||||||||||||||
| localID: prepared.cfg.LocalID, | ||||||||||||||||||||||||||
| localAddress: prepared.cfg.LocalAddress, | ||||||||||||||||||||||||||
| dataDir: prepared.cfg.DataDir, | ||||||||||||||||||||||||||
| fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), | ||||||||||||||||||||||||||
| tickInterval: prepared.cfg.TickInterval, | ||||||||||||||||||||||||||
| electionTick: prepared.cfg.ElectionTick, | ||||||||||||||||||||||||||
| storage: prepared.disk.Storage, | ||||||||||||||||||||||||||
| rawNode: rawNode, | ||||||||||||||||||||||||||
| persist: prepared.disk.Persist, | ||||||||||||||||||||||||||
| fsm: prepared.cfg.StateMachine, | ||||||||||||||||||||||||||
| peers: peerMap, | ||||||||||||||||||||||||||
| transport: prepared.cfg.Transport, | ||||||||||||||||||||||||||
| proposeCh: make(chan proposalRequest), | ||||||||||||||||||||||||||
| readCh: make(chan readRequest), | ||||||||||||||||||||||||||
| adminCh: make(chan adminRequest), | ||||||||||||||||||||||||||
| // Size the inbound step / dispatch-report channels from the | ||||||||||||||||||||||||||
| // resolved MaxInflightMsg (post-normalizeLimitConfig, which has | ||||||||||||||||||||||||||
| // already applied the env override and compiled-in default) so | ||||||||||||||||||||||||||
| // that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above | ||||||||||||||||||||||||||
| // the default actually get the extra buffering they asked for. | ||||||||||||||||||||||||||
| // Using defaultMaxInflightMsg here would silently cap the | ||||||||||||||||||||||||||
| // channel at 1024 even when the Raft layer has been told to | ||||||||||||||||||||||||||
| // keep 2048 in flight, re-triggering errStepQueueFull under | ||||||||||||||||||||||||||
| // the exact bursty conditions this knob is meant to absorb. | ||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||
| stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)), | ||||||||||||||||||||||||||
| dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)), | ||||||||||||||||||||||||||
| closeCh: make(chan struct{}), | ||||||||||||||||||||||||||
| doneCh: make(chan struct{}), | ||||||||||||||||||||||||||
| startedCh: make(chan struct{}), | ||||||||||||||||||||||||||
|
|
@@ -2581,13 +2645,41 @@ func normalizeTimingConfig(cfg OpenConfig) OpenConfig { | |||||||||||||||||||||||||
| return cfg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // inboundChannelCap returns the capacity to use when allocating the | ||||||||||||||||||||||||||
| // engine's inbound stepCh and dispatchReportCh. It mirrors the resolved | ||||||||||||||||||||||||||
| // MaxInflightMsg but clamps to minInboundChannelCap so that a caller | ||||||||||||||||||||||||||
| // passing a tiny value doesn't shrink the buffers below a survivable | ||||||||||||||||||||||||||
| // floor. maxInflight is expected to be the post-normalizeLimitConfig | ||||||||||||||||||||||||||
| // value (compiled default or env override applied). | ||||||||||||||||||||||||||
| func inboundChannelCap(maxInflight int) int { | ||||||||||||||||||||||||||
| if maxInflight < minInboundChannelCap { | ||||||||||||||||||||||||||
| return minInboundChannelCap | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return maxInflight | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| func normalizeLimitConfig(cfg OpenConfig) OpenConfig { | ||||||||||||||||||||||||||
| if cfg.MaxInflightMsg <= 0 { | ||||||||||||||||||||||||||
| cfg.MaxInflightMsg = defaultMaxInflightMsg | ||||||||||||||||||||||||||
|
Comment on lines
2704
to
2705
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if cfg.MaxSizePerMsg == 0 { | ||||||||||||||||||||||||||
| cfg.MaxSizePerMsg = defaultMaxSizePerMsg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Env overrides win over caller-supplied values so that operators can | ||||||||||||||||||||||||||
| // retune replication flow-control without a rebuild. This mirrors the | ||||||||||||||||||||||||||
| // behaviour of ELASTICKV_RAFT_SNAPSHOT_COUNT and | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_WAL_FILES. Invalid values fall back to the | ||||||||||||||||||||||||||
| // compiled-in defaults with a warning. | ||||||||||||||||||||||||||
| if v, ok := maxInflightMsgFromEnv(); ok { | ||||||||||||||||||||||||||
| cfg.MaxInflightMsg = v | ||||||||||||||||||||||||||
|
Comment on lines
+2715
to
+2716
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if v, ok := maxSizePerMsgFromEnv(); ok { | ||||||||||||||||||||||||||
| cfg.MaxSizePerMsg = v | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| slog.Info("etcd raft engine: message size limits", | ||||||||||||||||||||||||||
| "max_inflight_msgs", cfg.MaxInflightMsg, | ||||||||||||||||||||||||||
| "max_size_per_msg_bytes", cfg.MaxSizePerMsg, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
Comment on lines
+2739
to
+2742
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging the message size limits at Info level inside normalizeLimitConfig will produce redundant log entries for every shard opened (e.g., 100+ lines of identical output in a typical multi-shard deployment). Consider logging this once per process or only when the values deviate from the compiled-in defaults to reduce log noise.
Comment on lines
+2739
to
+2742
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In sharded environments where multiple Raft engines may be active, it is helpful to include the
Suggested change
|
||||||||||||||||||||||||||
| return cfg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -2845,6 +2937,51 @@ func snapshotEveryFromEnv() uint64 { | |||||||||||||||||||||||||
| return n | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // maxInflightMsgFromEnv parses ELASTICKV_RAFT_MAX_INFLIGHT_MSGS. Returns | ||||||||||||||||||||||||||
| // (value, true) when the env var is set to a valid positive integer. | ||||||||||||||||||||||||||
| // Returns (0, false) when the var is unset so the caller can keep the | ||||||||||||||||||||||||||
| // existing cfg.MaxInflightMsg (which normalizeLimitConfig has already | ||||||||||||||||||||||||||
| // defaulted to defaultMaxInflightMsg). Invalid values (non-numeric, | ||||||||||||||||||||||||||
| // negative, zero) are logged at warn level and return | ||||||||||||||||||||||||||
| // (defaultMaxInflightMsg, true) so the engine actually applies the | ||||||||||||||||||||||||||
| // compiled-in default the log message promises — otherwise a malformed | ||||||||||||||||||||||||||
| // env var would silently let an unrelated caller-supplied value win. | ||||||||||||||||||||||||||
| func maxInflightMsgFromEnv() (int, bool) { | ||||||||||||||||||||||||||
| v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar)) | ||||||||||||||||||||||||||
| if v == "" { | ||||||||||||||||||||||||||
| return 0, false | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| n, err := strconv.Atoi(v) | ||||||||||||||||||||||||||
| if err != nil || n < 1 { | ||||||||||||||||||||||||||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", | ||||||||||||||||||||||||||
|
Comment on lines
+3021
to
+3023
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| "value", v, "default", defaultMaxInflightMsg) | ||||||||||||||||||||||||||
| return defaultMaxInflightMsg, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+3021
to
+3026
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The warning message indicates that the engine will fall back to the default value (defaultMaxInflightMsg), but the function returns (0, false). This causes normalizeLimitConfig to retain the caller-supplied value from OpenConfig instead of the compiled-in default. If a caller provides a non-default value (e.g., 256) and the environment variable is malformed, the log message will be misleading as it won't actually use the default 1024. To align the implementation with the log message and the PR description, the function should return the default value and true when an invalid override is detected.
Suggested change
References
|
||||||||||||||||||||||||||
| return n, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // maxSizePerMsgFromEnv parses ELASTICKV_RAFT_MAX_SIZE_PER_MSG as a plain | ||||||||||||||||||||||||||
| // integer byte count. Returns (value, true) when the env var is set to a | ||||||||||||||||||||||||||
| // valid integer >= minMaxSizePerMsg (1 KiB). Returns (0, false) when the | ||||||||||||||||||||||||||
| // var is unset so normalizeLimitConfig can keep its earlier default. | ||||||||||||||||||||||||||
| // Invalid or too-small values fall back to the compiled-in default with | ||||||||||||||||||||||||||
| // a warning and return (defaultMaxSizePerMsg, true) so the override | ||||||||||||||||||||||||||
| // actually applies the default the warning promises; a sub-KiB cap | ||||||||||||||||||||||||||
| // would make MsgApp batching degenerate. | ||||||||||||||||||||||||||
| func maxSizePerMsgFromEnv() (uint64, bool) { | ||||||||||||||||||||||||||
| v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar)) | ||||||||||||||||||||||||||
| if v == "" { | ||||||||||||||||||||||||||
| return 0, false | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| n, err := strconv.ParseUint(v, 10, 64) | ||||||||||||||||||||||||||
| if err != nil || n < minMaxSizePerMsg { | ||||||||||||||||||||||||||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", | ||||||||||||||||||||||||||
| "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | ||||||||||||||||||||||||||
| return uint64(defaultMaxSizePerMsg), true | ||||||||||||||||||||||||||
|
Comment on lines
+3052
to
+3056
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+3052
to
+3062
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the MaxInflightMsgs helper, this function returns (0, false) on invalid input, which prevents normalizeLimitConfig from applying the compiled-in default if the caller provided a different value. This makes the using default warning message misleading. The function should return the default value and true to ensure the fallback behavior matches the logs.
Suggested change
References
|
||||||||||||||||||||||||||
| return n, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has | ||||||||||||||||||||||||||
| // been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value | ||||||||||||||||||||||||||
| // is parsed with strconv.ParseBool, which accepts the standard tokens | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring for
MaxSizePerMsgstates a default of 4 MiB, but the constantdefaultMaxSizePerMsgis defined as 2 MiB (line 88). Please update the documentation to match the implementation. Maintaining an accurate and strictly enforced maximum size limit is essential when pre-allocating buffers for deserialization.References