Skip to content

feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)#723

Open
bootjp wants to merge 2 commits intomainfrom
feat/sqs-htfifo-leadership-refusal
Open

feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)#723
bootjp wants to merge 2 commits intomainfrom
feat/sqs-htfifo-leadership-refusal

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 30, 2026

Summary

Phase 3.D PR 4-B-3b — closes the routing+leadership-refusal pair the §11 PR 4 contract requires before a binary is "marked htfifo-eligible". With this PR landed, every node's /sqs_health advertises htfifo and the §8 downgrade-protection safeguard is in place. PR 5 (next) lifts the PartitionCount > 1 dormancy gate and wires PollSQSHTFIFOCapability (#721) into the CreateQueue gate in the same commit.

What's added

raftengine: leader-acquired observer (mirror of leader-loss)

  • raftengine.Admin gains RegisterLeaderAcquiredCallback. Same contract as RegisterLeaderLossCallback (non-blocking, panic-contained, sentinel-pointer deregister) but fires on the previous!=Leader → status==Leader edge in refreshStatus.
  • The etcd backend's slot-management is now shared between leader-loss and leader-acquired via registerLeaderCallback + gatherLeaderCallbacks helpers — this satisfies the dupl lint check and keeps both paths consistent in one place.
  • 7 observer tests mirror the existing leader-loss tests: panic-containment, empty-list safety, deregister removal, idempotence, nil-fn safe, nil-receiver safe, identical-fn disambiguation.

main: SQS leadership-refusal hook

  • main_sqs_leadership_refusal.go:
    • installSQSLeadershipRefusal(ctx, admin, gid, partitionedGroups, advertisesHTFIFO, logger) func() — startup check + per-acquisition observer.
    • installSQSLeadershipRefusalAcrossGroups(...) — composite installer iterating every shard runtime.
    • partitionedGroupSet(partitionMap, logger) — flattens --sqsFifoPartitionMap into the {gid → bool} set the hook consumes.
    • sqsAdvertisesHTFIFO() — wraps adapter.AdvertisesHTFIFO().
  • run() installs the composite refusal across runtimes after the coordinator is built; deregister flows through cleanup.
  • TransferLeadership runs in a goroutine because the leader-acquired callback contract is non-blocking — a synchronous admin RPC inside the callback would stall refreshStatus.
  • 11 helper tests cover the contract: htfifo no-op, no-partitioned-queue no-op, startup-already-leader refuses, startup-follower waits, per-acquisition fires, deregister propagates, transfer-error logged, nil-admin safe, partitionedGroupSet flatten / empty-input / malformed-ref-skip.

adapter: AdvertisesHTFIFO + flag flip

What's still gated

PR 5 lifts the PartitionCount > 1 dormancy gate AND wires PollSQSHTFIFOCapability (#721) into the CreateQueue gate in the same commit. Until PR 5 lands, no partitioned queue can land in production — the leadership-refusal hook is dormant in the happy-path runtime (every binary past this PR advertises htfifo, and the per-group early return keeps the hook out of the hot path).

Test plan

  • go test -race ./internal/raftengine/etcd/ — 7 new + existing tests pass.
  • go test -race ./kv/ — existing 30+ tests pass (verified the tests that previously failed due to the misplaced interface method now pass).
  • go test -race ./adapter/ + go test -race . — all pass.
  • golangci-lint ./kv/... ./adapter/... ./internal/raftengine/... . — clean.

Self-review (per CLAUDE.md)

  1. Data loss — control-plane only; no FSM/Pebble/retention path. The hook calls TransferLeadership which is itself an admin action with the same data-loss profile as a graceful manual transfer. No issue.
  2. Concurrency / distributed failures — leader-acquired callback contract mirrors leader-loss (non-blocking, panic-contained, sentinel-pointer deregister). refuse() offloads TransferLeadership to a goroutine so refreshStatus stays non-blocking. Multiple goroutines firing for the same group serialize on raft's admin channel; worst case is one redundant transfer attempt, which is idempotent on the raft side. No issue.
  3. Performance — leader-acquired callbacks fire only on the transition edge (rare event); no per-request hot path cost. The early return on advertisesHTFIFO=true means production-binary hosts pay zero overhead. No issue.
  4. Data consistency — the hook protects against the §8 downgrade scenario: a node rolled back to a pre-htfifo binary that still gets elected leader of a partitioned-queue shard would otherwise read/write under the legacy keyspace and silently corrupt the queue. The hook steps it down via TransferLeadership before any client request lands. No issue.
  5. Test coverage — 7 raftengine observer tests + 11 main-side helper tests (18 new). Existing kv lease tests confirm the misplaced-interface bug from first draft was caught and fixed.

Summary by CodeRabbit

  • New Features

    • SQS health endpoint now advertises HT-FIFO capability status for client detection.
    • Raft administration introduces leader-acquired callbacks, enabling systems to respond immediately to leadership acquisition events.
    • SQS FIFO queue groups now automatically refuse and transfer leadership when HT-FIFO capability is unavailable.
  • Tests

    • Added comprehensive test coverage for leader-acquired callback registration, deregistration, panic isolation, and SQS leadership refusal logic.

Closes the Phase 3.D PR 4-B-2 → PR 5 chain by adding the §8
leadership-refusal safeguard and flipping htfifoCapabilityAdvertised
to true. This is the final piece of the routing+leadership-refusal
pair the §11 PR 4 contract requires before a binary is "marked
htfifo-eligible".

What changes

raftengine: leader-acquired observer (mirror of leader-loss)

  - raftengine.Admin gains RegisterLeaderAcquiredCallback. Same
    contract as RegisterLeaderLossCallback (non-blocking, panic-
    contained, sentinel-pointer deregister) but fires on the
    previous!=Leader → status==Leader edge instead of leaving
    the leader role.
  - etcd backend: new leaderAcquiredCbs slice + mutex; fires from
    refreshStatus on the leader-acquired edge AFTER e.isLeader is
    published, so a callback that calls engine.State() observes
    StateLeader.
  - register / fire helpers extracted (registerLeaderCallback,
    gatherLeaderCallbacks) so the leader-loss and leader-acquired
    paths share one slot management implementation. The dupl
    lint warning that triggered on first draft is the test that
    keeps this consolidated.
  - Test coverage: leader_acquired_callback_test.go mirrors
    leader_loss_callback_test.go — panic containment, empty-list
    safety, deregister removal, deregister idempotence, nil-fn
    safety, nil-receiver safety, identical-fn disambiguation.

main: SQS leadership-refusal hook

  - main_sqs_leadership_refusal.go: installSQSLeadershipRefusal +
    installSQSLeadershipRefusalAcrossGroups + partitionedGroupSet.
  - On install, if the engine is currently leader of a group
    hosting a partitioned queue and the binary lacks htfifo, the
    hook calls TransferLeadership immediately. The
    leader-acquired observer then keeps catching future
    transitions for the same group.
  - TransferLeadership runs in a goroutine because the
    leader-acquired callback contract is non-blocking — a
    synchronous admin RPC inside the callback would stall
    refreshStatus.
  - sqsLeadershipController is the small interface the helper
    accepts (subset of raftengine.Admin) so test doubles don't
    have to satisfy the full Admin surface.
  - run() wires installSQSLeadershipRefusalAcrossGroups after the
    coordinator is built; the composite deregister flows through
    cleanup.

adapter: AdvertisesHTFIFO + flag flip

  - adapter.AdvertisesHTFIFO() reports the htfifo capability flag
    so main.go can read it without touching the package-private
    constant.
  - htfifoCapabilityAdvertised = false → true. Both the routing
    wiring (PR 4-B-2 #715) and the leadership-refusal hook (this
    PR) are now in the binary, so the design's "marked
    htfifo-eligible" bar is met.

What's still gated

PR 5 lifts the PartitionCount > 1 dormancy gate AND wires
PollSQSHTFIFOCapability (PR 4-B-3a #721) into the CreateQueue
gate in the same commit. Until PR 5 lands, no partitioned queue
can land in production — the leadership-refusal hook is dormant
in the happy-path runtime (every binary past this PR advertises
htfifo, and the per-group early return keeps the hook out of the
hot path).

Self-review (per CLAUDE.md)

1. Data loss — control-plane only; no FSM/Pebble/retention path.
   The hook calls TransferLeadership which is itself an admin
   action with the same data-loss profile as a graceful manual
   transfer. No issue.
2. Concurrency / distributed failures — leader-acquired callback
   contract mirrors leader-loss (non-blocking, panic-contained,
   sentinel-pointer deregister). refuse() offloads the actual
   TransferLeadership to a goroutine so refreshStatus stays
   non-blocking. Multiple goroutines calling refuse() for the
   same group queue serially in raft's admin channel; the worst
   case is one redundant transfer attempt, which is idempotent
   on the raft side. No issue.
3. Performance — leader-acquired callbacks fire only on the
   transition edge (rare event); no per-request hot path cost.
   The early return on advertisesHTFIFO=true means
   production-binary hosts pay zero overhead. No issue.
4. Data consistency — the hook protects against the §8
   downgrade scenario: a node rolled back to a pre-htfifo
   binary that still gets elected leader of a partitioned-queue
   shard would otherwise read/write under the legacy keyspace
   and silently corrupt the queue. The hook steps it down via
   TransferLeadership before any client request lands. No issue.
5. Test coverage — 7 raftengine observer tests (mirror of
   leader-loss panic / empty / deregister / idempotence / nil
   guards / sentinel-pointer disambiguation) + 11 main-side
   helper tests (htfifo no-op, no-partitioned-queue no-op,
   startup-already-leader refuses, startup-follower waits,
   per-acquisition fires, deregister propagates, transfer error
   logged, nil-admin safe, partitionedGroupSet flatten / empty /
   malformed).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 30, 2026

@claude review

Phase 3.D PR 4-B-3b — leadership-refusal hook (raftengine leader-acquired observer + main-side helper) and htfifoCapabilityAdvertised flag flip. Closes the §11 PR 4 routing+leadership-refusal pair so the binary is now "marked htfifo-eligible". PR 5 lifts the dormancy gate next.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Claude finished @bootjp's task in 3m 33s —— View job


Review: Phase 3.D PR 4-B-3b — Leadership-Refusal Hook + Flag Flip

  • Gather context
  • Read changed files
  • Review internal/raftengine/engine.go — leader-acquired interface
  • Review internal/raftengine/etcd/engine.go — implementation + dedup helpers
  • Review internal/raftengine/etcd/leader_acquired_callback_test.go — observer tests
  • Review main_sqs_leadership_refusal.go — hook logic
  • Review main_sqs_leadership_refusal_test.go — hook tests
  • Review adapter/sqs.go — flag flip + AdvertisesHTFIFO
  • Review main.go — wiring
  • Post final review

Finding 1 — Bug: invokeLeaderAcquiredCallback missing structured log fields and stack trace

internal/raftengine/etcd/engine.go:1088-1096

func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
    defer func() {
        if r := recover(); r != nil {
            slog.Error("etcd raft engine: leader-acquired callback panic",
                "recover", r)   // ← missing node_id, raft_node_id, stack
        }
    }()
    fn()
}

Its counterpart invokeLeaderLossCallback (lines 1098–1122) logs four structured fields: node_id, raft_node_id, panic, and debug.Stack(). The acquired variant logs only the recovered value. In production — where the leadership-refusal callback itself panics — an operator has no node identity or stack to diagnose the problem. This should mirror the leader-loss pattern exactly.

Fix this →


Finding 2 — Minor: TOCTOU window in installSQSLeadershipRefusal startup check

main_sqs_leadership_refusal.go:97-103

if admin.State() == raftengine.StateLeader {
    refuse()
}
return admin.RegisterLeaderAcquiredCallback(refuse)

There is a narrow window between the admin.State() read and RegisterLeaderAcquiredCallback completing. If the node wins a Raft election in that window, refreshStatus calls fireLeaderAcquiredCallbacks before refuse is registered — the hook misses that acquisition. The node stays as leader of a partitioned-queue group indefinitely until a re-election occurs (e.g. network partition, transfer triggered by something else). Probability is extremely low (microsecond window), but this is a safety-enforcement hook so the gap is worth a comment.

The simplest mitigation is a second State() check after registration and an immediate refuse() call if the node turned leader in the interim. Alternatively, a comment explicitly acknowledging the invariant and why it is acceptable would be sufficient.


Finding 3 — Observation: installSQSLeadershipRefusalAcrossGroups nil/skip branches not covered

main_sqs_leadership_refusal.go:173-190

The nil-engine check (rt == nil || rt.engine == nil) and the interface-assertion-miss log path ("engine does not implement leader-acquired observer") are meaningful defensive branches with no direct test. The installSQSLeadershipRefusal path IS tested exhaustively; it's only the iterator's guard clauses that lack coverage. Not blocking, but worth a table-driven test against a nil-runtime or a stub engine that fails the type assertion.


Architecture / design passes (per CLAUDE.md)

Data loss — control-plane only; TransferLeadership has the same data-loss profile as a graceful manual transfer. No concern.

Concurrency / distributed failures — The shared registerLeaderCallback/gatherLeaderCallbacks helper correctly snapshots the callback list under a lock before firing, so a registration or deregistration concurrent with a fire event cannot cause a torn-read of in-flight callback state. The once.Do wrapper on deregister makes repeated calls safe. The goroutine in refuse() correctly offloads TransferLeadership so refreshStatus is never stalled. Cleanup ordering is correct (LIFO, deregister added last → runs first, before engine close).

Performance — callback fires only on the previous!=Leader → status==Leader edge (rare). The advertisesHTFIFO=true early return is in the hot call sites, so post-PR-4-B-3b production binaries pay exactly zero overhead.

Data consistency — the §8 downgrade-protection invariant is correctly closed: a rolled-back binary lacking the htfifo data plane can never hold leadership of a partitioned-queue shard, because either (a) it is already leader at install time → refuse() fires immediately, or (b) it wins a future election → RegisterLeaderAcquiredCallback fires refuse(). Finding 2 identifies a narrow TOCTOU window in case (a)→(b) boundary.

Test coverage — 7 raftengine observer tests mirror every leader-loss contract; 11 main-side tests cover all significant branches. The awaitTransferCalls polling loop (1 s deadline, 5 ms tick) is generous and race-free. The only gaps are Finding 1 (untestable at unit level, but the log difference is a maintenance concern) and Finding 3.


Positive notes

  • The registerLeaderCallback/gatherLeaderCallbacks abstraction is a clean solution to the dupl lint constraint — sharing slot management between leader-loss and leader-acquired keeps the two paths symmetric with one code path to audit.
  • Sentinel-pointer deregister correctly disambiguates identical fn values and the leaderCallbackSlot{} zero-clear on removal prevents GC pinning of the removed coordinator.
  • htfifoCapabilityAdvertised = true as a const with the justification comment in adapter/sqs.go is the right call; a var would invite accidental mutation.
  • Shutdown LIFO ordering for leadershipRefusalDeregister (runs before rt.Close()) is correct.

Summary: One clear fix needed (Finding 1 — missing log fields in panic recovery). Finding 2 is a minor robustness gap worth at least a comment. Everything else is solid.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 30, 2026

Warning

Rate limit exceeded

@bootjp has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 49 minutes and 2 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e2c5347b-5683-47fd-8496-2cacc43b8abd

📥 Commits

Reviewing files that changed from the base of the PR and between fe99a43 and 634fbcf.

📒 Files selected for processing (3)
  • internal/raftengine/etcd/engine.go
  • main_sqs_leadership_refusal.go
  • main_sqs_leadership_refusal_test.go
📝 Walkthrough

Walkthrough

The PR enables HT-FIFO capability advertisement in SQS and adds a leader-acquired callback mechanism to the Raft engine. A new SQS leadership-refusal module uses this callback to prevent non-HT-FIFO nodes from leading partitioned FIFO queue groups.

Changes

Cohort / File(s) Summary
SQS Capability Advertisement
adapter/sqs.go
Enables htfifo capability advertisement by flipping htfifoCapabilityAdvertised to true and exports AdvertisesHTFIFO() accessor function for external consumption.
Raft Engine Leader-Acquired Callback API
internal/raftengine/engine.go, internal/raftengine/etcd/engine.go
Adds public RegisterLeaderAcquiredCallback method to the Admin interface and implements callback registration, storage, and synchronous firing within refreshStatus on leader-acquire state edge. Refactors callback logic into shared helpers (leaderCallbackSlot, registerLeaderCallback, gatherLeaderCallbacks) with panic containment via invokeLeaderAcquiredCallback.
Raft Engine Callback Tests
internal/raftengine/etcd/leader_acquired_callback_test.go
Tests leader-acquired callback behavior including panic containment, deregistration idempotency, nil-safe handling, and callback disambiguation when identical functions are registered multiple times.
SQS Leadership Refusal Integration
main.go, main_sqs_leadership_refusal.go, main_sqs_leadership_refusal_test.go
Installs per-group leadership-refusal hook for non-HT-FIFO nodes hosting partitioned FIFO queues. Controller either steps down immediately if already leader or registers acquired-callback to refuse on future transition. Tests validate immediate/deferred refusal, callback firing, and safe nil-controller handling.

Sequence Diagram

sequenceDiagram
    participant Node as Raft Node
    participant Eng as Raft Engine
    participant SQS as Leadership<br/>Refusal
    participant Coord as Coordinator

    Coordinator->>Node: Start node (not HT-FIFO)
    Node->>Eng: RegisterLeaderAcquiredCallback(refusalFn)
    activate Eng
    Eng-->>SQS: (deregister)
    deactivate Eng
    
    alt Starts as Leader
        Eng->>Eng: Leader state detected
        Eng->>SQS: Invoke refusal callback
        activate SQS
        SQS->>Coord: TransferLeadership()
        Coord-->>SQS: Leadership transferred
        deactivate SQS
    else Starts as Follower
        Node->>Node: Follower state
        Note over Node: Waiting for<br/>leader acquisition...
    end
    
    alt Later Acquires Leadership
        Eng->>Eng: State transition to Leader
        Eng->>SQS: Fire registered callback
        activate SQS
        SQS->>Coord: TransferLeadership()
        Coord-->>SQS: Leadership transferred
        deactivate SQS
    end
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly Related PRs

Poem

🐰 A hop, a bound—callbacks now acquired!
The leader learns when roles are desired,
SQS queues refuse with grace,
HT-FIFO finds its place,
One hop closer to the finish line, inspired! 🎉

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)' directly and comprehensively summarizes the main changes: adding a leadership-refusal hook and flipping the htfifo capability flag.
Docstring Coverage ✅ Passed Docstring coverage is 96.30% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-htfifo-leadership-refusal

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 49 minutes and 2 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fe99a436e5

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread internal/raftengine/etcd/engine.go Outdated
// mutex so callers can fire them without holding the lock.
// Mirrors the snapshot-then-fire pattern used by the per-callback
// invoke helpers.
func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Lock callback slice before taking snapshot

gatherLeaderCallbacks takes cbs by value, so both fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks read e.leader* Cbs outside the mutex before mu.Lock() runs. That regresses the previous lock-then-copy behavior and introduces a real data race when a callback is registered/deregistered concurrently with refreshStatus firing callbacks (e.g., startup/shutdown churn), which can yield undefined behavior or race-detector failures. Please move the slice read under the lock (e.g., pass a pointer to the slice and dereference only after locking).

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enables the SQS HT-FIFO capability and implements a leadership-refusal safeguard by introducing a leader-acquisition callback mechanism in the Raft engine. This allows nodes to automatically transfer leadership if they are ineligible to host partitioned queues. Review feedback highlighted critical data races in the shared callback firing logic and suggested enhancing panic recovery logs with node identity and stack traces for better production debugging.

Comment thread internal/raftengine/etcd/engine.go Outdated
// invokeLeaderLossCallback) so a bug in one holder cannot break
// subsequent callbacks or crash the process.
func (e *Engine) fireLeaderLossCallbacks() {
for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This line contains a data race. The slice header of e.leaderLossCbs is read here to be passed by value to gatherLeaderCallbacks, but this read is not protected by e.leaderLossCbsMu. If another goroutine modifies the slice (e.g., via RegisterLeaderLossCallback) concurrently, it can lead to undefined behavior or a crash. The slice should be passed by pointer so that gatherLeaderCallbacks can read the header under the lock.

Suggested change
for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) {
for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, &e.leaderLossCbs) {
References
  1. To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.

Comment thread internal/raftengine/etcd/engine.go Outdated
Comment on lines 1032 to 1040
func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() {
mu.Lock()
out := make([]func(), len(cbs))
for i, c := range cbs {
out[i] = c.fn
}
mu.Unlock()
return out
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To prevent data races on the slice header, this helper should accept a pointer to the slice. This allows the function to access the slice's pointer, length, and capacity safely while holding the mutex.

Suggested change
func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() {
mu.Lock()
out := make([]func(), len(cbs))
for i, c := range cbs {
out[i] = c.fn
}
mu.Unlock()
return out
}
func gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot) []func() {
mu.Lock()
defer mu.Unlock()
out := make([]func(), len(*cbs))
for i, c := range *cbs {
out[i] = c.fn
}
return out
}
References
  1. To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.

Comment thread internal/raftengine/etcd/engine.go Outdated
// synchronously. Same panic-containment + non-blocking contract
// as fireLeaderLossCallbacks.
func (e *Engine) fireLeaderAcquiredCallbacks() {
for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This line also has a data race. e.leaderAcquiredCbs is read without holding e.leaderAcquiredCbsMu before being passed to gatherLeaderCallbacks. It should be passed by pointer instead.

Suggested change
for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) {
for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs) {
References
  1. To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.

Comment on lines +1088 to +1096
func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("etcd raft engine: leader-acquired callback panic",
"recover", r)
}
}()
fn()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The panic recovery logic here is less informative than its counterpart in invokeLeaderLossCallback. It should include the node identity and a stack trace to facilitate debugging of policy hook failures in production.

func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
	defer func() {
		if r := recover(); r != nil {
			slog.Error("etcd raft engine: leader-acquired callback panicked",
				slog.String("node_id", e.localID),
				slog.Uint64("raft_node_id", e.nodeID),
				slog.Any("panic", r),
				slog.String("stack", string(debug.Stack())),
			)
		}
	}()
	fn()
}
References
  1. Include node identity in logs and metadata to ensure correct context and prevent incorrect metadata reuse.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 1028-1039: gatherLeaderCallbacks currently takes the callback
slice by value and locks after reading it, which races with concurrent
register/deregister; change gatherLeaderCallbacks to lock before reading the
slice (accept the slice as a pointer or otherwise take a locked reference) so
the snapshot of []leaderCallbackSlot is performed while mu is held, and update
callers fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks to pass
&e.leaderLossCbs and &e.leaderAcquiredCbs (using the same mu) so the function
reads a consistent slice of leaderCallbackSlot entries (and then returns the
copied []func() for use outside the lock).

In `@main_sqs_leadership_refusal.go`:
- Around line 97-103: Register the leader-acquired callback before doing the
immediate startup leader check: call
admin.RegisterLeaderAcquiredCallback(refuse) first to subscribe to future
acquisitions, then check admin.State() == raftengine.StateLeader and call
refuse() if true; this ensures the refuse() callback isn't missed if leadership
is gained between the check and registration. Reference:
admin.RegisterLeaderAcquiredCallback, admin.State(), refuse().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1c809151-22b1-4912-9ca5-c83d5b143f5f

📥 Commits

Reviewing files that changed from the base of the PR and between e9f33eb and fe99a43.

📒 Files selected for processing (7)
  • adapter/sqs.go
  • internal/raftengine/engine.go
  • internal/raftengine/etcd/engine.go
  • internal/raftengine/etcd/leader_acquired_callback_test.go
  • main.go
  • main_sqs_leadership_refusal.go
  • main_sqs_leadership_refusal_test.go

Comment thread internal/raftengine/etcd/engine.go
Comment thread main_sqs_leadership_refusal.go Outdated
PR #723 round 1 review caught four items. This commit lands all
four (one is a real concurrency bug I introduced when refactoring
the leader-callback duplication).

1) P1 (Codex) / HIGH (Gemini) — data race in gatherLeaderCallbacks

  My round 1 refactor extracted gatherLeaderCallbacks(mu, cbs
  []leaderCallbackSlot). Passing the slice by value means the
  slice header (pointer, length, capacity) is dereferenced at
  the call site — i.e. BEFORE mu.Lock() runs inside the helper.
  Concurrent registerLeaderCallback running on another goroutine
  could mutate the header while the caller is reading it,
  triggering -race detector failures or undefined behaviour.

  Fix: gatherLeaderCallbacks now takes *[]leaderCallbackSlot
  (pointer to slice). The slice is dereferenced INSIDE the locked
  section, closing the race. Both fireLeaderLossCallbacks and
  fireLeaderAcquiredCallbacks updated to pass &e.leader*Cbs.

  This regresses to the pre-refactor "lock-then-copy" semantics
  the original RegisterLeaderLossCallback had. The lesson:
  passing a slice by value to a "thread-safe" helper does not
  give you mutex safety, because the function-argument
  evaluation happens before the function body locks anything.

2) MEDIUM (Gemini) / Claude finding 1 — invokeLeaderAcquiredCallback log fields

  Mirror invokeLeaderLossCallback's slog.Error fields:
    slog.String("node_id", e.localID)
    slog.Uint64("raft_node_id", e.nodeID)
    slog.Any("panic", r)
    slog.String("stack", string(debug.Stack()))

  Without these, an SQS leadership-refusal hook panicking in
  production would leave operators with only the recovered value
  to grep on. Same fields the leader-loss path has, so cross-
  family triage is consistent.

3) Claude finding 2 — TOCTOU window in startup leader check

  installSQSLeadershipRefusal previously did:
    if admin.State() == StateLeader { refuse() }
    return admin.RegisterLeaderAcquiredCallback(refuse)

  Window: the engine could win an election between State() and
  RegisterLeaderAcquiredCallback returning. refreshStatus would
  fire fireLeaderAcquiredCallbacks before refuse is in the
  slice, and the hook would miss that acquisition.

  Fix: post-registration State() re-check. If the install-time
  read returned follower but the post-registration read returns
  leader, refuse() fires for the in-flight election. refuse()
  is idempotent (TransferLeadership becomes a no-op once a
  transfer is already in flight), so a double-invocation across
  the boundary is safe.

4) Claude finding 3 — installSQSLeadershipRefusalAcrossGroups branches

  Three new tests:
    - TestInstallSQSLeadershipRefusalAcrossGroups_ComposesDeregisters:
      multi-group install + composite deregister fan-out.
    - TestInstallSQSLeadershipRefusalAcrossGroups_NoPartitionedQueueIsNoOp:
      empty partition map → no per-group install runs.
    - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsNilRuntimes:
      nil-runtime / nil-engine guard in the iterator.
    - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsEngineWithoutObserver:
      type-assertion-miss log path (engine that doesn't satisfy
      sqsLeadershipController).

  Plus TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered which
  pins fix #3 directly: a fake controller whose State() flips
  during RegisterLeaderAcquiredCallback exercises the
  post-registration re-check.

5) Audit per the lessons-learned discipline

  The semantic change here is the gatherLeaderCallbacks
  signature. grep -rn confirmed only fireLeaderLossCallbacks
  and fireLeaderAcquiredCallbacks call it — both are local to
  this file, both updated together. No external callers to
  audit.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 30, 2026

@claude review

Round 2 — all four round 1 findings addressed:

P1 (Codex) / HIGH (Gemini) — real data race in gatherLeaderCallbacks. My round 1 refactor passed the slice by value (cbs []leaderCallbackSlot), so the slice header was dereferenced at the call site BEFORE mu.Lock() ran inside the helper. Concurrent registerLeaderCallback could mutate the header during that window. Fix: signature changed to *[]leaderCallbackSlot; both call sites updated. The dereference now happens inside the locked section.

MEDIUM (Gemini) / Claude finding 1invokeLeaderAcquiredCallback panic log now matches invokeLeaderLossCallback (node_id / raft_node_id / panic / debug.Stack).

Claude finding 2 (TOCTOU)installSQSLeadershipRefusal now re-checks State() after RegisterLeaderAcquiredCallback returns. If the engine wins an election in the registration window, refuse() fires for that in-flight acquisition. refuse() is idempotent so a double-fire across the boundary is safe.

Claude finding 3 (branch coverage) — 4 new installSQSLeadershipRefusalAcrossGroups tests cover composite deregister fan-out, empty partition map, nil-runtime / nil-engine guard, and engine-doesn't-implement-observer. Plus TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered directly exercises fix #2.

Lessons-learned audit: semantic change is gatherLeaderCallbacks signature. grep -rn confirmed only the two fire*Callbacks methods call it; both updated together. No external callers.

go test -race ./kv/... ./internal/raftengine/etcd/ . and golangci-lint clean.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Claude finished @bootjp's task in 3m 7s —— View job


Review: Round 2 — Phase 3.D PR 4-B-3b


All four Round 1 findings are correctly addressed. No new issues introduced.

Fix #1gatherLeaderCallbacks slice pointer race ✅

internal/raftengine/etcd/engine.go:1039

Signature is now gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot). The slice header (pointer, length, capacity) is now dereferenced as *cbs inside mu.Lock(). Both callers pass &e.leaderLossCbs and &e.leaderAcquiredCbs. The comment at lines 1028–1038 explains the race and traces it to the Codex P1 / Gemini HIGH finding — useful for future auditors. Clean fix.

Fix #2invokeLeaderAcquiredCallback panic log parity ✅

internal/raftengine/etcd/engine.go:1095–1113

The recovery block now logs slog.String("node_id", e.localID), slog.Uint64("raft_node_id", e.nodeID), slog.Any("panic", r), and slog.String("stack", string(debug.Stack())). Leader-acquired and leader-loss panic handlers are now structurally identical. The comment at lines 1098–1103 correctly attributes this to the Gemini/Claude finding and explains why the fields matter for production triage.

Fix #3 — TOCTOU order flip in installSQSLeadershipRefusal

main_sqs_leadership_refusal.go:97–119

The new sequence:

  1. wasLeaderBefore = admin.State() — pre-registration snapshot
  2. If already leader → refuse() immediately
  3. deregister = admin.RegisterLeaderAcquiredCallback(refuse) — subscribe
  4. if !wasLeaderBefore && admin.State() == raftengine.StateLeader — post-registration check

The !wasLeaderBefore guard on step 4 is correct: it prevents a double-fire in the wasLeaderBefore=true path (where we already called refuse() in step 2), while still catching the race window where the node wins an election between steps 1 and 3.

One residual theoretical race not covered: if wasLeaderBefore=true and the TransferLeadership goroutine is in-flight when the node loses leadership and then wins it again during the microsecond of RegisterLeaderAcquiredCallback's execution — the !wasLeaderBefore guard skips the second check. In that case, the next subsequent election will be caught by the registered callback. This is acceptable: the window is sub-microsecond, refuse() is idempotent, and the callback guarantees eventual step-down.

TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered exercises this directly via stateAfterRegister: raftengine.StateLeader, which flips state to leader exactly when RegisterLeaderAcquiredCallback is called. The test's awaitTransferCallsAtLeastOne is correct given refuse() is asynchronous.

Fix #4 — Branch coverage for installSQSLeadershipRefusalAcrossGroups

main_sqs_leadership_refusal_test.go:290–392

Four new tests cover the previously-untested guard paths:

  • ComposesDeregisters — confirms N per-group install calls and composite deregister fan-out across two runtimes
  • NoPartitionedQueueIsNoOp — empty partition map returns early with no registrations
  • SkipsNilRuntimesnil runtime entry and nil engine entry are skipped; valid entry still gets registered
  • SkipsEngineWithoutObserverengineWithoutAcquiredObserver (embeds raftengine.Engine but lacks RegisterLeaderAcquiredCallback) triggers the type-assertion-miss path without panicking

The engineWithoutAcquiredObserver type-assertion approach is the right design for the test double.

Additional observations (non-blocking)

Cleanup LIFO ordering confirmed: CleanupStack.Run() is LIFO (internal/cleanup.go:3). leadershipRefusalDeregister is added at line 336, which is after the rt.Close() group at line 309. LIFO means deregister runs first → callbacks are cleanly removed before the engine closes.

partitionedGroupSet empty-groups edge case: if partitionMap has entries where all queues have empty groups arrays, the function returns a non-nil but empty map[uint64]bool{}. The len(partGroups) == 0 check in installSQSLeadershipRefusalAcrossGroups correctly handles this (returns early). No issue.

adapter/sqs.go flag flip: htfifoCapabilityAdvertised = true as a const is the right choice — a var would be mutable at runtime. AdvertisesHTFIFO() is a clean accessor.


Verdict: All Round 1 blocking issues resolved. The implementation is correct, the tests are comprehensive, and the code is ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 634fbcf954

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +91 to +94
if err := admin.TransferLeadership(ctx); err != nil {
logger.Warn("sqs: TransferLeadership failed",
"group", gid, "err", err)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Fail closed when leadership transfer is rejected

When advertisesHTFIFO is false and this hook fires, a failed TransferLeadership (for example, no eligible transferee in a single-voter group or peers unavailable) is only logged and then ignored. In that case the node remains leader and continues serving despite the hook deciding it is unsafe to lead, which defeats the downgrade-protection guarantee and can still allow incorrect behavior on partitioned queues.

Useful? React with 👍 / 👎.

Comment on lines +144 to +146
for queue, routing := range partitionMap {
for _, groupRef := range routing.groups {
id, err := strconv.ParseUint(groupRef, 10, 64)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Exclude single-partition routes from refusal group set

partitionedGroupSet currently marks groups from every --sqsFifoPartitionMap entry, but PartitionCount=1 is valid input and is not a partitioned queue in the HT-FIFO sense. This makes the refusal hook run for groups that only host single-partition routes, causing unnecessary leadership transfers (and possible avoidable unavailability on constrained groups) without any data-safety benefit.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant