feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)#723
feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)#723
Conversation
Closes the Phase 3.D PR 4-B-2 → PR 5 chain by adding the §8
leadership-refusal safeguard and flipping htfifoCapabilityAdvertised
to true. This is the final piece of the routing+leadership-refusal
pair the §11 PR 4 contract requires before a binary is "marked
htfifo-eligible".
What changes
raftengine: leader-acquired observer (mirror of leader-loss)
- raftengine.Admin gains RegisterLeaderAcquiredCallback. Same
contract as RegisterLeaderLossCallback (non-blocking, panic-
contained, sentinel-pointer deregister) but fires on the
previous!=Leader → status==Leader edge instead of leaving
the leader role.
- etcd backend: new leaderAcquiredCbs slice + mutex; fires from
refreshStatus on the leader-acquired edge AFTER e.isLeader is
published, so a callback that calls engine.State() observes
StateLeader.
- register / fire helpers extracted (registerLeaderCallback,
gatherLeaderCallbacks) so the leader-loss and leader-acquired
paths share one slot management implementation. The dupl
lint warning that triggered on first draft is the test that
keeps this consolidated.
- Test coverage: leader_acquired_callback_test.go mirrors
leader_loss_callback_test.go — panic containment, empty-list
safety, deregister removal, deregister idempotence, nil-fn
safety, nil-receiver safety, identical-fn disambiguation.
main: SQS leadership-refusal hook
- main_sqs_leadership_refusal.go: installSQSLeadershipRefusal +
installSQSLeadershipRefusalAcrossGroups + partitionedGroupSet.
- On install, if the engine is currently leader of a group
hosting a partitioned queue and the binary lacks htfifo, the
hook calls TransferLeadership immediately. The
leader-acquired observer then keeps catching future
transitions for the same group.
- TransferLeadership runs in a goroutine because the
leader-acquired callback contract is non-blocking — a
synchronous admin RPC inside the callback would stall
refreshStatus.
- sqsLeadershipController is the small interface the helper
accepts (subset of raftengine.Admin) so test doubles don't
have to satisfy the full Admin surface.
- run() wires installSQSLeadershipRefusalAcrossGroups after the
coordinator is built; the composite deregister flows through
cleanup.
adapter: AdvertisesHTFIFO + flag flip
- adapter.AdvertisesHTFIFO() reports the htfifo capability flag
so main.go can read it without touching the package-private
constant.
- htfifoCapabilityAdvertised = false → true. Both the routing
wiring (PR 4-B-2 #715) and the leadership-refusal hook (this
PR) are now in the binary, so the design's "marked
htfifo-eligible" bar is met.
What's still gated
PR 5 lifts the PartitionCount > 1 dormancy gate AND wires
PollSQSHTFIFOCapability (PR 4-B-3a #721) into the CreateQueue
gate in the same commit. Until PR 5 lands, no partitioned queue
can land in production — the leadership-refusal hook is dormant
in the happy-path runtime (every binary past this PR advertises
htfifo, and the per-group early return keeps the hook out of the
hot path).
Self-review (per CLAUDE.md)
1. Data loss — control-plane only; no FSM/Pebble/retention path.
The hook calls TransferLeadership which is itself an admin
action with the same data-loss profile as a graceful manual
transfer. No issue.
2. Concurrency / distributed failures — leader-acquired callback
contract mirrors leader-loss (non-blocking, panic-contained,
sentinel-pointer deregister). refuse() offloads the actual
TransferLeadership to a goroutine so refreshStatus stays
non-blocking. Multiple goroutines calling refuse() for the
same group queue serially in raft's admin channel; the worst
case is one redundant transfer attempt, which is idempotent
on the raft side. No issue.
3. Performance — leader-acquired callbacks fire only on the
transition edge (rare event); no per-request hot path cost.
The early return on advertisesHTFIFO=true means
production-binary hosts pay zero overhead. No issue.
4. Data consistency — the hook protects against the §8
downgrade scenario: a node rolled back to a pre-htfifo
binary that still gets elected leader of a partitioned-queue
shard would otherwise read/write under the legacy keyspace
and silently corrupt the queue. The hook steps it down via
TransferLeadership before any client request lands. No issue.
5. Test coverage — 7 raftengine observer tests (mirror of
leader-loss panic / empty / deregister / idempotence / nil
guards / sentinel-pointer disambiguation) + 11 main-side
helper tests (htfifo no-op, no-partitioned-queue no-op,
startup-already-leader refuses, startup-follower waits,
per-acquisition fires, deregister propagates, transfer error
logged, nil-admin safe, partitionedGroupSet flatten / empty /
malformed).
|
@claude review Phase 3.D PR 4-B-3b — leadership-refusal hook (raftengine leader-acquired observer + main-side helper) and |
|
Claude finished @bootjp's task in 3m 33s —— View job Review: Phase 3.D PR 4-B-3b — Leadership-Refusal Hook + Flag Flip
Finding 1 — Bug:
|
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThe PR enables HT-FIFO capability advertisement in SQS and adds a leader-acquired callback mechanism to the Raft engine. A new SQS leadership-refusal module uses this callback to prevent non-HT-FIFO nodes from leading partitioned FIFO queue groups. Changes
Sequence DiagramsequenceDiagram
participant Node as Raft Node
participant Eng as Raft Engine
participant SQS as Leadership<br/>Refusal
participant Coord as Coordinator
Coordinator->>Node: Start node (not HT-FIFO)
Node->>Eng: RegisterLeaderAcquiredCallback(refusalFn)
activate Eng
Eng-->>SQS: (deregister)
deactivate Eng
alt Starts as Leader
Eng->>Eng: Leader state detected
Eng->>SQS: Invoke refusal callback
activate SQS
SQS->>Coord: TransferLeadership()
Coord-->>SQS: Leadership transferred
deactivate SQS
else Starts as Follower
Node->>Node: Follower state
Note over Node: Waiting for<br/>leader acquisition...
end
alt Later Acquires Leadership
Eng->>Eng: State transition to Leader
Eng->>SQS: Fire registered callback
activate SQS
SQS->>Coord: TransferLeadership()
Coord-->>SQS: Leadership transferred
deactivate SQS
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 49 minutes and 2 seconds.Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fe99a436e5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // mutex so callers can fire them without holding the lock. | ||
| // Mirrors the snapshot-then-fire pattern used by the per-callback | ||
| // invoke helpers. | ||
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { |
There was a problem hiding this comment.
Lock callback slice before taking snapshot
gatherLeaderCallbacks takes cbs by value, so both fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks read e.leader* Cbs outside the mutex before mu.Lock() runs. That regresses the previous lock-then-copy behavior and introduces a real data race when a callback is registered/deregistered concurrently with refreshStatus firing callbacks (e.g., startup/shutdown churn), which can yield undefined behavior or race-detector failures. Please move the slice read under the lock (e.g., pass a pointer to the slice and dereference only after locking).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request enables the SQS HT-FIFO capability and implements a leadership-refusal safeguard by introducing a leader-acquisition callback mechanism in the Raft engine. This allows nodes to automatically transfer leadership if they are ineligible to host partitioned queues. Review feedback highlighted critical data races in the shared callback firing logic and suggested enhancing panic recovery logs with node identity and stack traces for better production debugging.
| // invokeLeaderLossCallback) so a bug in one holder cannot break | ||
| // subsequent callbacks or crash the process. | ||
| func (e *Engine) fireLeaderLossCallbacks() { | ||
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) { |
There was a problem hiding this comment.
This line contains a data race. The slice header of e.leaderLossCbs is read here to be passed by value to gatherLeaderCallbacks, but this read is not protected by e.leaderLossCbsMu. If another goroutine modifies the slice (e.g., via RegisterLeaderLossCallback) concurrently, it can lead to undefined behavior or a crash. The slice should be passed by pointer so that gatherLeaderCallbacks can read the header under the lock.
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) { | |
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, &e.leaderLossCbs) { |
References
- To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { | ||
| mu.Lock() | ||
| out := make([]func(), len(cbs)) | ||
| for i, c := range cbs { | ||
| out[i] = c.fn | ||
| } | ||
| mu.Unlock() | ||
| return out | ||
| } |
There was a problem hiding this comment.
To prevent data races on the slice header, this helper should accept a pointer to the slice. This allows the function to access the slice's pointer, length, and capacity safely while holding the mutex.
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { | |
| mu.Lock() | |
| out := make([]func(), len(cbs)) | |
| for i, c := range cbs { | |
| out[i] = c.fn | |
| } | |
| mu.Unlock() | |
| return out | |
| } | |
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot) []func() { | |
| mu.Lock() | |
| defer mu.Unlock() | |
| out := make([]func(), len(*cbs)) | |
| for i, c := range *cbs { | |
| out[i] = c.fn | |
| } | |
| return out | |
| } |
References
- To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.
| // synchronously. Same panic-containment + non-blocking contract | ||
| // as fireLeaderLossCallbacks. | ||
| func (e *Engine) fireLeaderAcquiredCallbacks() { | ||
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) { |
There was a problem hiding this comment.
This line also has a data race. e.leaderAcquiredCbs is read without holding e.leaderAcquiredCbsMu before being passed to gatherLeaderCallbacks. It should be passed by pointer instead.
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) { | |
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs) { |
References
- To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.
| func (e *Engine) invokeLeaderAcquiredCallback(fn func()) { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| slog.Error("etcd raft engine: leader-acquired callback panic", | ||
| "recover", r) | ||
| } | ||
| }() | ||
| fn() | ||
| } |
There was a problem hiding this comment.
The panic recovery logic here is less informative than its counterpart in invokeLeaderLossCallback. It should include the node identity and a stack trace to facilitate debugging of policy hook failures in production.
func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("etcd raft engine: leader-acquired callback panicked",
slog.String("node_id", e.localID),
slog.Uint64("raft_node_id", e.nodeID),
slog.Any("panic", r),
slog.String("stack", string(debug.Stack())),
)
}
}()
fn()
}References
- Include node identity in logs and metadata to ensure correct context and prevent incorrect metadata reuse.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 1028-1039: gatherLeaderCallbacks currently takes the callback
slice by value and locks after reading it, which races with concurrent
register/deregister; change gatherLeaderCallbacks to lock before reading the
slice (accept the slice as a pointer or otherwise take a locked reference) so
the snapshot of []leaderCallbackSlot is performed while mu is held, and update
callers fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks to pass
&e.leaderLossCbs and &e.leaderAcquiredCbs (using the same mu) so the function
reads a consistent slice of leaderCallbackSlot entries (and then returns the
copied []func() for use outside the lock).
In `@main_sqs_leadership_refusal.go`:
- Around line 97-103: Register the leader-acquired callback before doing the
immediate startup leader check: call
admin.RegisterLeaderAcquiredCallback(refuse) first to subscribe to future
acquisitions, then check admin.State() == raftengine.StateLeader and call
refuse() if true; this ensures the refuse() callback isn't missed if leadership
is gained between the check and registration. Reference:
admin.RegisterLeaderAcquiredCallback, admin.State(), refuse().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1c809151-22b1-4912-9ca5-c83d5b143f5f
📒 Files selected for processing (7)
adapter/sqs.gointernal/raftengine/engine.gointernal/raftengine/etcd/engine.gointernal/raftengine/etcd/leader_acquired_callback_test.gomain.gomain_sqs_leadership_refusal.gomain_sqs_leadership_refusal_test.go
PR #723 round 1 review caught four items. This commit lands all four (one is a real concurrency bug I introduced when refactoring the leader-callback duplication). 1) P1 (Codex) / HIGH (Gemini) — data race in gatherLeaderCallbacks My round 1 refactor extracted gatherLeaderCallbacks(mu, cbs []leaderCallbackSlot). Passing the slice by value means the slice header (pointer, length, capacity) is dereferenced at the call site — i.e. BEFORE mu.Lock() runs inside the helper. Concurrent registerLeaderCallback running on another goroutine could mutate the header while the caller is reading it, triggering -race detector failures or undefined behaviour. Fix: gatherLeaderCallbacks now takes *[]leaderCallbackSlot (pointer to slice). The slice is dereferenced INSIDE the locked section, closing the race. Both fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks updated to pass &e.leader*Cbs. This regresses to the pre-refactor "lock-then-copy" semantics the original RegisterLeaderLossCallback had. The lesson: passing a slice by value to a "thread-safe" helper does not give you mutex safety, because the function-argument evaluation happens before the function body locks anything. 2) MEDIUM (Gemini) / Claude finding 1 — invokeLeaderAcquiredCallback log fields Mirror invokeLeaderLossCallback's slog.Error fields: slog.String("node_id", e.localID) slog.Uint64("raft_node_id", e.nodeID) slog.Any("panic", r) slog.String("stack", string(debug.Stack())) Without these, an SQS leadership-refusal hook panicking in production would leave operators with only the recovered value to grep on. Same fields the leader-loss path has, so cross- family triage is consistent. 3) Claude finding 2 — TOCTOU window in startup leader check installSQSLeadershipRefusal previously did: if admin.State() == StateLeader { refuse() } return admin.RegisterLeaderAcquiredCallback(refuse) Window: the engine could win an election between State() and RegisterLeaderAcquiredCallback returning. refreshStatus would fire fireLeaderAcquiredCallbacks before refuse is in the slice, and the hook would miss that acquisition. Fix: post-registration State() re-check. If the install-time read returned follower but the post-registration read returns leader, refuse() fires for the in-flight election. refuse() is idempotent (TransferLeadership becomes a no-op once a transfer is already in flight), so a double-invocation across the boundary is safe. 4) Claude finding 3 — installSQSLeadershipRefusalAcrossGroups branches Three new tests: - TestInstallSQSLeadershipRefusalAcrossGroups_ComposesDeregisters: multi-group install + composite deregister fan-out. - TestInstallSQSLeadershipRefusalAcrossGroups_NoPartitionedQueueIsNoOp: empty partition map → no per-group install runs. - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsNilRuntimes: nil-runtime / nil-engine guard in the iterator. - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsEngineWithoutObserver: type-assertion-miss log path (engine that doesn't satisfy sqsLeadershipController). Plus TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered which pins fix #3 directly: a fake controller whose State() flips during RegisterLeaderAcquiredCallback exercises the post-registration re-check. 5) Audit per the lessons-learned discipline The semantic change here is the gatherLeaderCallbacks signature. grep -rn confirmed only fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks call it — both are local to this file, both updated together. No external callers to audit.
|
@claude review Round 2 — all four round 1 findings addressed: P1 (Codex) / HIGH (Gemini) — real data race in MEDIUM (Gemini) / Claude finding 1 — Claude finding 2 (TOCTOU) — Claude finding 3 (branch coverage) — 4 new Lessons-learned audit: semantic change is
|
|
Claude finished @bootjp's task in 3m 7s —— View job Review: Round 2 — Phase 3.D PR 4-B-3b
All four Round 1 findings are correctly addressed. No new issues introduced. Fix #1 —
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 634fbcf954
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err := admin.TransferLeadership(ctx); err != nil { | ||
| logger.Warn("sqs: TransferLeadership failed", | ||
| "group", gid, "err", err) | ||
| } |
There was a problem hiding this comment.
Fail closed when leadership transfer is rejected
When advertisesHTFIFO is false and this hook fires, a failed TransferLeadership (for example, no eligible transferee in a single-voter group or peers unavailable) is only logged and then ignored. In that case the node remains leader and continues serving despite the hook deciding it is unsafe to lead, which defeats the downgrade-protection guarantee and can still allow incorrect behavior on partitioned queues.
Useful? React with 👍 / 👎.
| for queue, routing := range partitionMap { | ||
| for _, groupRef := range routing.groups { | ||
| id, err := strconv.ParseUint(groupRef, 10, 64) |
There was a problem hiding this comment.
Exclude single-partition routes from refusal group set
partitionedGroupSet currently marks groups from every --sqsFifoPartitionMap entry, but PartitionCount=1 is valid input and is not a partitioned queue in the HT-FIFO sense. This makes the refusal hook run for groups that only host single-partition routes, causing unnecessary leadership transfers (and possible avoidable unavailability on constrained groups) without any data-safety benefit.
Useful? React with 👍 / 👎.
Summary
Phase 3.D PR 4-B-3b — closes the routing+leadership-refusal pair the §11 PR 4 contract requires before a binary is "marked htfifo-eligible". With this PR landed, every node's
/sqs_healthadvertiseshtfifoand the §8 downgrade-protection safeguard is in place. PR 5 (next) lifts thePartitionCount > 1dormancy gate and wiresPollSQSHTFIFOCapability(#721) into the CreateQueue gate in the same commit.What's added
raftengine: leader-acquired observer (mirror of leader-loss)
raftengine.AdmingainsRegisterLeaderAcquiredCallback. Same contract asRegisterLeaderLossCallback(non-blocking, panic-contained, sentinel-pointer deregister) but fires on theprevious!=Leader → status==Leaderedge inrefreshStatus.registerLeaderCallback+gatherLeaderCallbackshelpers — this satisfies thedupllint check and keeps both paths consistent in one place.main: SQS leadership-refusal hook
main_sqs_leadership_refusal.go:installSQSLeadershipRefusal(ctx, admin, gid, partitionedGroups, advertisesHTFIFO, logger) func()— startup check + per-acquisition observer.installSQSLeadershipRefusalAcrossGroups(...)— composite installer iterating every shard runtime.partitionedGroupSet(partitionMap, logger)— flattens--sqsFifoPartitionMapinto the{gid → bool}set the hook consumes.sqsAdvertisesHTFIFO()— wrapsadapter.AdvertisesHTFIFO().run()installs the composite refusal across runtimes after the coordinator is built; deregister flows throughcleanup.TransferLeadershipruns in a goroutine because the leader-acquired callback contract is non-blocking — a synchronous admin RPC inside the callback would stallrefreshStatus.partitionedGroupSetflatten / empty-input / malformed-ref-skip.adapter:
AdvertisesHTFIFO+ flag flipadapter.AdvertisesHTFIFO()exposes the package-private flag.htfifoCapabilityAdvertised = false → true. Both the routing wiring (PR 4-B-2 feat(sqs): partition resolver for HT-FIFO routing (Phase 3.D PR 4-B-2) #715) and the leadership-refusal hook (this PR) are now in the binary, so the design's "marked htfifo-eligible" bar is met.What's still gated
PR 5 lifts the
PartitionCount > 1dormancy gate AND wiresPollSQSHTFIFOCapability(#721) into the CreateQueue gate in the same commit. Until PR 5 lands, no partitioned queue can land in production — the leadership-refusal hook is dormant in the happy-path runtime (every binary past this PR advertiseshtfifo, and the per-group early return keeps the hook out of the hot path).Test plan
go test -race ./internal/raftengine/etcd/— 7 new + existing tests pass.go test -race ./kv/— existing 30+ tests pass (verified the tests that previously failed due to the misplaced interface method now pass).go test -race ./adapter/+go test -race .— all pass.golangci-lint ./kv/... ./adapter/... ./internal/raftengine/... .— clean.Self-review (per CLAUDE.md)
refuse()offloadsTransferLeadershipto a goroutine sorefreshStatusstays non-blocking. Multiple goroutines firing for the same group serialize on raft's admin channel; worst case is one redundant transfer attempt, which is idempotent on the raft side. No issue.advertisesHTFIFO=truemeans production-binary hosts pay zero overhead. No issue.Summary by CodeRabbit
New Features
Tests