Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 144 additions & 23 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,44 @@ const (
// stepCh on followers filling up while their event loop was held
// up by adapter-side pebble seek storms (PRs #560, #562, #563,
// #565 removed most of that CPU); 1024 is a 4× safety margin.
// Note that with the current defaultMaxSizePerMsg of 1 MiB, the
// true worst-case bound can be much larger (up to roughly 1 GiB
// Note that with the current defaultMaxSizePerMsg of 4 MiB, the
// true worst-case bound can be much larger (up to roughly 4 GiB
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The increase in defaultMaxInflightMsg (1024) and defaultMaxSizePerMsg (4 MiB) significantly raises the worst-case memory footprint. As noted in the comments, this can reach ~4 GiB per peer. In a cluster with multiple peers, the aggregate memory buffered in stepCh and the per-peer outbound replication lanes could lead to OOM under congestion if entries are large. While this improves performance for small-entry workloads, consider if these defaults are too aggressive for general deployments, or if a more conservative MaxSizePerMsg (e.g., 1-2 MiB) would be safer while still providing significant batching benefits.

References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.

// per peer if every slot held a max-sized message). In practice,
// typical MsgApp payloads are far smaller, so expected steady-state
// memory remains much lower than that worst-case bound.
//
// Operators can override both knobs at runtime without a rebuild
// via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and
// ELASTICKV_RAFT_MAX_SIZE_PER_MSG.
defaultMaxInflightMsg = 1024
defaultMaxSizePerMsg = 1 << 20
// minInboundChannelCap is the floor applied when sizing the engine's
// inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg.
// Even if a (misconfigured) caller drops MaxInflightMsg below this,
// we keep at least this much buffering so that a single tick burst
// doesn't trip errStepQueueFull on the inbound side. 256 matches the
// pre-#529 compiled-in default that was known to be survivable.
minInboundChannelCap = 256
// defaultMaxSizePerMsg caps the byte size of a single MsgApp payload.
// Raised from 1 MiB → 4 MiB so each MsgApp amortises more entries
// under small-entry workloads (Redis-style KV, median entry ~500 B).
// Fewer MsgApps per committed byte means fewer dispatcher wake-ups
// on the leader and fewer recv syscalls on the follower; the
// follower's apply loop also contends less with the read path.
defaultMaxSizePerMsg = 4 << 20
// maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the
// Raft-level flow-control knobs without a rebuild. Parsed once at
// Open and passed through normalizeLimitConfig; invalid values fall
// back to the defaults with a warning. MaxSizePerMsg is expressed
// as an integer byte count for consistency with the other numeric
// knobs in this package.
maxInflightMsgEnvVar = "ELASTICKV_RAFT_MAX_INFLIGHT_MSGS"
maxSizePerMsgEnvVar = "ELASTICKV_RAFT_MAX_SIZE_PER_MSG"
// minMaxSizePerMsg is the lower bound accepted from the environment
// override. A payload cap below ~1 KiB makes MsgApp batching
// degenerate (one entry per message) which defeats the whole point
// of the knob; clamp to this floor rather than rejecting so that a
// fat-fingered operator doesn't take out the engine.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment mentions "clamping" to the floor, but the implementation in maxSizePerMsgFromEnv (line 2961) falls back to the defaultMaxSizePerMsg (4 MiB) if the value is below the floor (1 KiB). Clamping usually implies using the floor value itself. Given the large gap between the floor and the default, this behavior might be unexpected for an operator trying to tune for low memory.

Suggested change
// of the knob; clamp to this floor rather than rejecting so that a
// fat-fingered operator doesn't take out the engine.
// of the knob; fall back to the default rather than rejecting so that a
// fat-fingered operator doesn't take out the engine.

minMaxSizePerMsg uint64 = 1 << 10
// defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel.
// It carries low-frequency control traffic: heartbeats, votes, read-index,
// leader-transfer, and their corresponding response messages
Expand Down Expand Up @@ -142,13 +173,21 @@ type OpenConfig struct {
ElectionTick int
HeartbeatTick int
StateMachine StateMachine
// MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level
// flow control). Default: 4 MiB. Larger values amortise more entries per
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for MaxSizePerMsg states a default of 4 MiB, but the constant defaultMaxSizePerMsg is defined as 2 MiB (line 88). Please update the documentation to match the implementation. Maintaining an accurate and strictly enforced maximum size limit is essential when pre-allocating buffers for deserialization.

Suggested change
// flow control). Default: 4 MiB. Larger values amortise more entries per
// flow control). Default: 2 MiB. Larger values amortise more entries per
References
  1. Pre-allocating a buffer is acceptable if a strictly enforced maximum size limit is checked before allocation and the entire data must be materialized in memory.

// MsgApp under small-entry workloads; smaller values tighten worst-case
// memory. Operators can override at runtime via
// ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a
// rebuild; the env var takes precedence over the caller-supplied value.
MaxSizePerMsg uint64
// MaxInflightMsg controls how many MsgApp messages Raft may have in-flight
// per peer before waiting for an acknowledgement (Raft-level flow control).
// It also sets the per-peer dispatch channel capacity, so total buffered
// memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize).
// Default: 256. Increase for deeper pipelining on high-bandwidth links;
// lower in memory-constrained clusters.
// Default: 1024. Increase for deeper pipelining on high-bandwidth links;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Badge Update documented MaxInflightMsg default to match code

The OpenConfig.MaxInflightMsg docstring says the default is 1024, but this commit sets defaultMaxInflightMsg to 512 and tests pin that value. This mismatch can mislead operators and library users who rely on GoDoc/comments for tuning decisions, especially when diagnosing queue pressure or memory footprint. Please update the comment to reflect the actual default.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment indicates a default value of 1024, but the actual compiled-in default defaultMaxInflightMsg has been set to 512 in this PR (line 60). This mismatch can mislead callers who rely on the documentation to understand the behavior when the field is left at its zero value.

Suggested change
// Default: 1024. Increase for deeper pipelining on high-bandwidth links;
// Default: 512. Increase for deeper pipelining on high-bandwidth links;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for MaxInflightMsg states a default of 1024, but the constant defaultMaxInflightMsg is set to 512 (line 64). This should be updated to avoid confusing callers. Applying fixed bounds to collections that grow from external requests is necessary to prevent unbounded memory growth and OOM issues.

Suggested change
// Default: 1024. Increase for deeper pipelining on high-bandwidth links;
// Default: 512. Increase for deeper pipelining on high-bandwidth links;
References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Badge Correct stale defaults in OpenConfig docs

The exported OpenConfig field comments now advertise MaxSizePerMsg defaulting to 4 MiB and MaxInflightMsg defaulting to 1024, but normalizeLimitConfig and the package constants set the real defaults to 2 MiB and 512. This mismatch can mislead operators and integrators doing memory sizing or troubleshooting, because they will believe the runtime is using a different operating point than it actually is.

Useful? React with 👍 / 👎.

// lower in memory-constrained clusters. Operators can override at
// runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; the
// env var takes precedence over the caller-supplied value.
MaxInflightMsg int
}

Expand Down Expand Up @@ -419,24 +458,33 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) {
}()

engine := &Engine{
nodeID: prepared.cfg.NodeID,
localID: prepared.cfg.LocalID,
localAddress: prepared.cfg.LocalAddress,
dataDir: prepared.cfg.DataDir,
fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName),
tickInterval: prepared.cfg.TickInterval,
electionTick: prepared.cfg.ElectionTick,
storage: prepared.disk.Storage,
rawNode: rawNode,
persist: prepared.disk.Persist,
fsm: prepared.cfg.StateMachine,
peers: peerMap,
transport: prepared.cfg.Transport,
proposeCh: make(chan proposalRequest),
readCh: make(chan readRequest),
adminCh: make(chan adminRequest),
stepCh: make(chan raftpb.Message, defaultMaxInflightMsg),
dispatchReportCh: make(chan dispatchReport, defaultMaxInflightMsg),
nodeID: prepared.cfg.NodeID,
localID: prepared.cfg.LocalID,
localAddress: prepared.cfg.LocalAddress,
dataDir: prepared.cfg.DataDir,
fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName),
tickInterval: prepared.cfg.TickInterval,
electionTick: prepared.cfg.ElectionTick,
storage: prepared.disk.Storage,
rawNode: rawNode,
persist: prepared.disk.Persist,
fsm: prepared.cfg.StateMachine,
peers: peerMap,
transport: prepared.cfg.Transport,
proposeCh: make(chan proposalRequest),
readCh: make(chan readRequest),
adminCh: make(chan adminRequest),
// Size the inbound step / dispatch-report channels from the
// resolved MaxInflightMsg (post-normalizeLimitConfig, which has
// already applied the env override and compiled-in default) so
// that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above
// the default actually get the extra buffering they asked for.
// Using defaultMaxInflightMsg here would silently cap the
// channel at 1024 even when the Raft layer has been told to
// keep 2048 in flight, re-triggering errStepQueueFull under
// the exact bursty conditions this knob is meant to absorb.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)),
dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
startedCh: make(chan struct{}),
Expand Down Expand Up @@ -2581,13 +2629,41 @@ func normalizeTimingConfig(cfg OpenConfig) OpenConfig {
return cfg
}

// inboundChannelCap returns the capacity to use when allocating the
// engine's inbound stepCh and dispatchReportCh. It mirrors the resolved
// MaxInflightMsg but clamps to minInboundChannelCap so that a caller
// passing a tiny value doesn't shrink the buffers below a survivable
// floor. maxInflight is expected to be the post-normalizeLimitConfig
// value (compiled default or env override applied).
func inboundChannelCap(maxInflight int) int {
if maxInflight < minInboundChannelCap {
return minInboundChannelCap
}
return maxInflight
}

func normalizeLimitConfig(cfg OpenConfig) OpenConfig {
if cfg.MaxInflightMsg <= 0 {
cfg.MaxInflightMsg = defaultMaxInflightMsg
Comment on lines 2704 to 2705
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clamp caller MaxInflightMsg before channel allocation

normalizeLimitConfig only defaults cfg.MaxInflightMsg when it is non-positive, but it never applies the new upper safety bound to caller-supplied values. Since Open now allocates stepCh and dispatchReportCh from prepared.cfg.MaxInflightMsg, a fat-fingered programmatic config (for example 100000000) can trigger huge channel allocations and crash startup even though env overrides are capped. Apply maxMaxInflightMsg (or equivalent) to direct config input before these allocations.

Useful? React with 👍 / 👎.

}
if cfg.MaxSizePerMsg == 0 {
cfg.MaxSizePerMsg = defaultMaxSizePerMsg
}
// Env overrides win over caller-supplied values so that operators can
// retune replication flow-control without a rebuild. This mirrors the
// behaviour of ELASTICKV_RAFT_SNAPSHOT_COUNT and
// ELASTICKV_RAFT_MAX_WAL_FILES. Invalid values fall back to the
// compiled-in defaults with a warning.
if v, ok := maxInflightMsgFromEnv(); ok {
cfg.MaxInflightMsg = v
Comment on lines +2715 to +2716
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Size inbound queues from overridden MaxInflightMsg

When ELASTICKV_RAFT_MAX_INFLIGHT_MSGS is set, this code allows values above 1024, but Open still allocates stepCh and dispatchReportCh with a fixed capacity of defaultMaxInflightMsg (1024). In clusters that tune inflight to values like 2048 (as the new tests demonstrate), inbound traffic can hit the smaller fixed queue and return errStepQueueFull, effectively dropping/deferring raft messages and negating the intended throughput gain under bursty replication load.

Useful? React with 👍 / 👎.

}
if v, ok := maxSizePerMsgFromEnv(); ok {
cfg.MaxSizePerMsg = v
}
slog.Info("etcd raft engine: message size limits",
"max_inflight_msgs", cfg.MaxInflightMsg,
"max_size_per_msg_bytes", cfg.MaxSizePerMsg,
)
Comment on lines +2739 to +2742
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Logging the message size limits at Info level inside normalizeLimitConfig will produce redundant log entries for every shard opened (e.g., 100+ lines of identical output in a typical multi-shard deployment). Consider logging this once per process or only when the values deviate from the compiled-in defaults to reduce log noise.

Comment on lines +2739 to +2742
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In sharded environments where multiple Raft engines may be active, it is helpful to include the local_id in this log message to identify which engine's limits are being reported.

Suggested change
slog.Info("etcd raft engine: message size limits",
"max_inflight_msgs", cfg.MaxInflightMsg,
"max_size_per_msg_bytes", cfg.MaxSizePerMsg,
)
slog.Info("etcd raft engine: message size limits",
"local_id", cfg.LocalID,
"max_inflight_msgs", cfg.MaxInflightMsg,
"max_size_per_msg_bytes", cfg.MaxSizePerMsg,
)

return cfg
}

Expand Down Expand Up @@ -2845,6 +2921,51 @@ func snapshotEveryFromEnv() uint64 {
return n
}

// maxInflightMsgFromEnv parses ELASTICKV_RAFT_MAX_INFLIGHT_MSGS. Returns
// (value, true) when the env var is set to a valid positive integer.
// Returns (0, false) when the var is unset so the caller can keep the
// existing cfg.MaxInflightMsg (which normalizeLimitConfig has already
// defaulted to defaultMaxInflightMsg). Invalid values (non-numeric,
// negative, zero) are logged at warn level and return
// (defaultMaxInflightMsg, true) so the engine actually applies the
// compiled-in default the log message promises — otherwise a malformed
// env var would silently let an unrelated caller-supplied value win.
func maxInflightMsgFromEnv() (int, bool) {
v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar))
if v == "" {
return 0, false
}
n, err := strconv.Atoi(v)
if err != nil || n < 1 {
slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default",
Comment on lines +3021 to +3023
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cap MaxInflight env override to avoid startup OOM

maxInflightMsgFromEnv accepts any positive integer, but Open uses the resolved value to allocate stepCh, dispatchReportCh, and per-peer dispatch queues, so a fat-fingered value like ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=100000000 can trigger massive channel allocations and crash the process before the node becomes healthy. Since this knob is now operator-facing at runtime, it needs a sane upper bound (or rejection) rather than only checking n < 1.

Useful? React with 👍 / 👎.

"value", v, "default", defaultMaxInflightMsg)
return defaultMaxInflightMsg, true
}
Comment on lines +3021 to +3026
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The warning message indicates that the engine will fall back to the default value (defaultMaxInflightMsg), but the function returns (0, false). This causes normalizeLimitConfig to retain the caller-supplied value from OpenConfig instead of the compiled-in default. If a caller provides a non-default value (e.g., 256) and the environment variable is malformed, the log message will be misleading as it won't actually use the default 1024. To align the implementation with the log message and the PR description, the function should return the default value and true when an invalid override is detected.

Suggested change
n, err := strconv.Atoi(v)
if err != nil || n < 1 {
slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default",
"value", v, "default", defaultMaxInflightMsg)
return 0, false
}
n, err := strconv.Atoi(v)
if err != nil || n < 1 {
slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default",
"value", v, "default", defaultMaxInflightMsg)
return defaultMaxInflightMsg, true
}
References
  1. Avoid state inconsistencies during normalization by ensuring that invalid inputs are handled explicitly and do not lead to misleading states or logs.

return n, true
}

// maxSizePerMsgFromEnv parses ELASTICKV_RAFT_MAX_SIZE_PER_MSG as a plain
// integer byte count. Returns (value, true) when the env var is set to a
// valid integer >= minMaxSizePerMsg (1 KiB). Returns (0, false) when the
// var is unset so normalizeLimitConfig can keep its earlier default.
// Invalid or too-small values fall back to the compiled-in default with
// a warning and return (defaultMaxSizePerMsg, true) so the override
// actually applies the default the warning promises; a sub-KiB cap
// would make MsgApp batching degenerate.
func maxSizePerMsgFromEnv() (uint64, bool) {
v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar))
if v == "" {
return 0, false
}
n, err := strconv.ParseUint(v, 10, 64)
if err != nil || n < minMaxSizePerMsg {
slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default",
"value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg))
return uint64(defaultMaxSizePerMsg), true
Comment on lines +3052 to +3056
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Enforce transport-safe upper bound for MaxSizePerMsg env

maxSizePerMsgFromEnv currently accepts any value >= 1 KiB, but the transport path in this repo is bounded by gRPC message-size limits (internal.GRPCMaxMessageBytes = 64 MiB via server/dial options), so setting ELASTICKV_RAFT_MAX_SIZE_PER_MSG above that can make Raft emit MsgApp payloads the transport cannot carry, causing repeated send failures/unreachable reports under large batches. The env parser should reject or clamp values above the transport budget.

Useful? React with 👍 / 👎.

}
Comment on lines +3052 to +3062
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the MaxInflightMsgs helper, this function returns (0, false) on invalid input, which prevents normalizeLimitConfig from applying the compiled-in default if the caller provided a different value. This makes the using default warning message misleading. The function should return the default value and true to ensure the fallback behavior matches the logs.

Suggested change
n, err := strconv.ParseUint(v, 10, 64)
if err != nil || n < minMaxSizePerMsg {
slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default",
"value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg))
return 0, false
}
n, err := strconv.ParseUint(v, 10, 64)
if err != nil || n < minMaxSizePerMsg {
slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default",
"value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg))
return uint64(defaultMaxSizePerMsg), true
}
References
  1. Avoid state inconsistencies during normalization by ensuring that invalid inputs are handled explicitly and do not lead to misleading states or logs.

return n, true
}

// dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has
// been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value
// is parsed with strconv.ParseBool, which accepts the standard tokens
Expand Down
Loading
Loading