-
Notifications
You must be signed in to change notification settings - Fork 2
feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b) #723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bootjp
wants to merge
2
commits into
main
Choose a base branch
from
feat/sqs-htfifo-leadership-refusal
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,025
−49
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -339,6 +339,24 @@ type Engine struct { | |
| leaderLossCbsMu sync.Mutex | ||
| leaderLossCbs []leaderLossSlot | ||
|
|
||
| // leaderAcquiredCbsMu guards the slice of callbacks invoked when | ||
| // the node transitions INTO the leader role. Callbacks fire | ||
| // synchronously from refreshStatus on the previous!=Leader → | ||
| // status==Leader edge. The MUST-be-non-blocking contract is the | ||
| // same as leaderLossCbs — a slow callback would stall every | ||
| // other holder of the same engine. See | ||
| // RegisterLeaderAcquiredCallback for the full contract. | ||
| // | ||
| // The acquired-side mirror exists so per-shard policy hooks | ||
| // (SQS HT-FIFO leadership-refusal in §8 of the split-queue FIFO | ||
| // design) can audit "we just became leader, are we still | ||
| // allowed to be?" without polling. Pairing the slot with a | ||
| // sentinel pointer mirrors the leader-loss design and lets | ||
| // deregister identify THIS registration when the same fn is | ||
| // registered multiple times. | ||
| leaderAcquiredCbsMu sync.Mutex | ||
| leaderAcquiredCbs []leaderAcquiredSlot | ||
|
|
||
| pendingProposals map[uint64]proposalRequest | ||
| pendingReads map[uint64]readRequest | ||
| pendingConfigs map[uint64]adminRequest | ||
|
|
@@ -931,67 +949,169 @@ func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func()) { | |
| if e == nil || fn == nil { | ||
| return func() {} | ||
| } | ||
| // Allocate a unique sentinel pointer so the deregister closure can | ||
| // identify THIS specific registration even if the same fn is | ||
| // registered multiple times. | ||
| return registerLeaderCallback(&e.leaderLossCbsMu, &e.leaderLossCbs, fn) | ||
| } | ||
|
|
||
| // leaderLossSlot pairs a registered callback with an id-only sentinel | ||
| // pointer so deregister can distinguish identical fn values. Aliased | ||
| // to leaderCallbackSlot so the leader-loss and leader-acquired slices | ||
| // share the same in-memory shape — the register / fire helpers are | ||
| // generic over this single slot type. | ||
| type leaderLossSlot = leaderCallbackSlot | ||
|
|
||
| // fireLeaderLossCallbacks invokes all registered callbacks | ||
| // synchronously. The registered-callback contract requires each fn | ||
| // to be non-blocking (a lock-free lease-invalidate flag flip), so | ||
| // inline execution is safe and avoids spawning an unbounded number | ||
| // of goroutines per leader-loss event when many shards / coordinators | ||
| // are registered. | ||
| // | ||
| // A panicking callback is still contained (see | ||
| // invokeLeaderLossCallback) so a bug in one holder cannot break | ||
| // subsequent callbacks or crash the process. | ||
| func (e *Engine) fireLeaderLossCallbacks() { | ||
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, &e.leaderLossCbs) { | ||
| e.invokeLeaderLossCallback(fn) | ||
| } | ||
| } | ||
|
|
||
| // leaderCallbackSlot is the shared on-disk shape for leader-loss | ||
| // and leader-acquired callback registrations. The id pointer is a | ||
| // per-registration sentinel so deregister can target THIS specific | ||
| // entry even when the same fn is registered multiple times. | ||
| type leaderCallbackSlot struct { | ||
| id *struct{ fn func() } | ||
| fn func() | ||
| } | ||
|
|
||
| // registerLeaderCallback installs fn into the (mu, cbs) callback | ||
| // slice and returns a deregister closure. Shared by leader-loss | ||
| // and leader-acquired registration so the slot-management / | ||
| // dangling-reference / sync.Once-deregister logic lives in one | ||
| // place. The two callback families differ only in which (mu, slice) | ||
| // pair they target — the firing semantics, the sentinel-pointer | ||
| // disambiguation, and the GC-safe slice truncation are identical. | ||
| func registerLeaderCallback(mu *sync.Mutex, cbs *[]leaderCallbackSlot, fn func()) (deregister func()) { | ||
| // Allocate a unique sentinel pointer so the deregister closure | ||
| // can identify THIS specific registration even if the same fn | ||
| // is registered multiple times. | ||
| slot := &struct{ fn func() }{fn: fn} | ||
| e.leaderLossCbsMu.Lock() | ||
| e.leaderLossCbs = append(e.leaderLossCbs, leaderLossSlot{id: slot, fn: fn}) | ||
| e.leaderLossCbsMu.Unlock() | ||
| mu.Lock() | ||
| *cbs = append(*cbs, leaderCallbackSlot{id: slot, fn: fn}) | ||
| mu.Unlock() | ||
| var once sync.Once | ||
| return func() { | ||
| once.Do(func() { | ||
| e.leaderLossCbsMu.Lock() | ||
| defer e.leaderLossCbsMu.Unlock() | ||
| for i, c := range e.leaderLossCbs { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
| for i, c := range *cbs { | ||
| if c.id != slot { | ||
| continue | ||
| } | ||
| // Remove without leaving a dangling reference at the | ||
| // tail of the underlying array. The removed slot's fn | ||
| // typically captures a *Coordinate; a plain | ||
| // `append(cbs[:i], cbs[i+1:]...)` would keep the old | ||
| // backing cell alive and prevent GC of the associated | ||
| // Coordinate until the engine itself is dropped. | ||
| last := len(e.leaderLossCbs) - 1 | ||
| copy(e.leaderLossCbs[i:], e.leaderLossCbs[i+1:]) | ||
| e.leaderLossCbs[last] = leaderLossSlot{} | ||
| e.leaderLossCbs = e.leaderLossCbs[:last] | ||
| // Remove without leaving a dangling reference at | ||
| // the tail of the underlying array. The removed | ||
| // slot's fn typically captures a *Coordinate; a | ||
| // plain `append(cbs[:i], cbs[i+1:]...)` would keep | ||
| // the old backing cell alive and prevent GC of the | ||
| // associated Coordinate until the engine itself is | ||
| // dropped. | ||
| last := len(*cbs) - 1 | ||
| copy((*cbs)[i:], (*cbs)[i+1:]) | ||
| (*cbs)[last] = leaderCallbackSlot{} | ||
| *cbs = (*cbs)[:last] | ||
| return | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // leaderLossSlot pairs a registered callback with an id-only sentinel | ||
| // pointer so deregister can distinguish identical fn values. | ||
| type leaderLossSlot struct { | ||
| id *struct{ fn func() } | ||
| fn func() | ||
| // gatherLeaderCallbacks copies the live fn list out from under the | ||
| // mutex so callers can fire them without holding the lock. | ||
| // Mirrors the snapshot-then-fire pattern used by the per-callback | ||
| // invoke helpers. | ||
| // | ||
| // Takes a *pointer* to the slice (not the slice itself) so the | ||
| // header (pointer, length, capacity) is read INSIDE the locked | ||
| // section. Passing the slice by value would dereference the | ||
| // header at the call site — i.e. before mu.Lock() — racing with | ||
| // any concurrent registerLeaderCallback. The pointer parameter | ||
| // closes that race (codex P1 / gemini high on PR #723). | ||
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot) []func() { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
| out := make([]func(), len(*cbs)) | ||
| for i, c := range *cbs { | ||
| out[i] = c.fn | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| // fireLeaderLossCallbacks invokes all registered callbacks | ||
| // synchronously. The registered-callback contract requires each fn | ||
| // to be non-blocking (a lock-free lease-invalidate flag flip), so | ||
| // inline execution is safe and avoids spawning an unbounded number | ||
| // of goroutines per leader-loss event when many shards / coordinators | ||
| // are registered. | ||
| // RegisterLeaderAcquiredCallback registers fn to fire every time | ||
| // the local node's Raft state transitions INTO leader (initial | ||
| // election win, re-election after partition heal, leadership | ||
| // transfer target completion). Callbacks fire on the | ||
| // previous!=Leader → status==Leader edge in refreshStatus, after | ||
| // e.isLeader has been published, so a callback that reads | ||
| // engine.State() observes StateLeader. | ||
| // | ||
| // A panicking callback is still contained (see | ||
| // invokeLeaderLossCallback) so a bug in one holder cannot break | ||
| // subsequent callbacks or crash the process. | ||
| func (e *Engine) fireLeaderLossCallbacks() { | ||
| e.leaderLossCbsMu.Lock() | ||
| cbs := make([]func(), len(e.leaderLossCbs)) | ||
| for i, c := range e.leaderLossCbs { | ||
| cbs[i] = c.fn | ||
| // Use case: per-shard policy that needs to audit a freshly-acquired | ||
| // leadership ("am I still allowed to be leader of this group?"). | ||
| // SQS HT-FIFO leadership-refusal (§8 of the split-queue FIFO | ||
| // design) hangs off this hook to TransferLeadership when the | ||
| // binary lacks the htfifo capability but a partitioned queue is | ||
| // mapped to this Raft group. | ||
| // | ||
| // Callbacks run synchronously from refreshStatus and MUST be | ||
| // non-blocking — same contract as RegisterLeaderLossCallback. A | ||
| // callback wanting to do real work (e.g. enumerate the catalog, | ||
| // call TransferLeadership) MUST offload to a goroutine. | ||
| // | ||
| // A panic inside a callback is contained and logged so a bug in | ||
| // one holder cannot crash the engine or break other callbacks. | ||
| // | ||
| // The returned deregister function removes this specific | ||
| // registration and is safe to call multiple times. | ||
| func (e *Engine) RegisterLeaderAcquiredCallback(fn func()) (deregister func()) { | ||
| if e == nil || fn == nil { | ||
| return func() {} | ||
| } | ||
| e.leaderLossCbsMu.Unlock() | ||
| for _, fn := range cbs { | ||
| e.invokeLeaderLossCallback(fn) | ||
| return registerLeaderCallback(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs, fn) | ||
| } | ||
|
|
||
| // leaderAcquiredSlot is the leader-acquired companion to | ||
| // leaderLossSlot — both alias the shared leaderCallbackSlot so a | ||
| // single set of register / fire helpers serves both transitions. | ||
| type leaderAcquiredSlot = leaderCallbackSlot | ||
|
|
||
| // fireLeaderAcquiredCallbacks invokes all registered callbacks | ||
| // synchronously. Same panic-containment + non-blocking contract | ||
| // as fireLeaderLossCallbacks. | ||
| func (e *Engine) fireLeaderAcquiredCallbacks() { | ||
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs) { | ||
| e.invokeLeaderAcquiredCallback(fn) | ||
| } | ||
| } | ||
|
|
||
| func (e *Engine) invokeLeaderAcquiredCallback(fn func()) { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| // Mirror the leader-loss panic log shape so operator | ||
| // triage of either family produces the same fields. | ||
| // Without node identity and the stack, an SQS | ||
| // leadership-refusal hook panicking in production | ||
| // would leave only the recovered value to grep on | ||
| // (gemini medium / claude finding 1 on PR #723). | ||
| slog.Error("etcd raft engine: leader-acquired callback panicked", | ||
| slog.String("node_id", e.localID), | ||
| slog.Uint64("raft_node_id", e.nodeID), | ||
| slog.Any("panic", r), | ||
| slog.String("stack", string(debug.Stack())), | ||
| ) | ||
| } | ||
| }() | ||
| fn() | ||
| } | ||
|
Comment on lines
+1095
to
+1113
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The panic recovery logic here is less informative than its counterpart in func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("etcd raft engine: leader-acquired callback panicked",
slog.String("node_id", e.localID),
slog.Uint64("raft_node_id", e.nodeID),
slog.Any("panic", r),
slog.String("stack", string(debug.Stack())),
)
}
}()
fn()
}References
|
||
|
|
||
| func (e *Engine) invokeLeaderLossCallback(fn func()) { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
|
|
@@ -2512,6 +2632,14 @@ func (e *Engine) refreshStatus() { | |
| if status.State == raftengine.StateLeader { | ||
| e.leaderOnce.Do(func() { close(e.leaderReady) }) | ||
| } | ||
| if previous != raftengine.StateLeader && status.State == raftengine.StateLeader { | ||
| // Edge: the node has just acquired leadership. Fire the | ||
| // leader-acquired callbacks so per-shard policy hooks | ||
| // (SQS HT-FIFO leadership-refusal §8) can audit the | ||
| // transition before any client request lands. Same | ||
| // non-blocking contract as fireLeaderLossCallbacks. | ||
| e.fireLeaderAcquiredCallbacks() | ||
| } | ||
| if previous == raftengine.StateLeader && status.State != raftengine.StateLeader { | ||
| e.failPending(errors.WithStack(errNotLeader)) | ||
| // Drop the per-peer ack map so a future re-election cannot | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.