Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,40 @@ const (
const sqsCapabilityHTFIFO = "htfifo"

// htfifoCapabilityAdvertised gates whether this binary lists
// "htfifo" on /sqs_health. The flag is set to true only when the
// binary contains BOTH the routing-layer wiring AND the
// leadership-refusal safeguard from §8 — the design's "marked
// htfifo-eligible" bar (§11 PR 4). Lower-numbered PRs in the rollout
// keep this false so a partial deploy never advertises a capability
// it cannot safely back up. Phase 3.D PR 4-B flips this to true in
// the same commit that wires routing + leadership-refusal together.
const htfifoCapabilityAdvertised = false
// "htfifo" on /sqs_health. The §11 PR 4 contract requires BOTH
// the routing-layer wiring AND the leadership-refusal safeguard
// from §8 to be in place before this flag is true:
//
// - Routing wiring: kv.PartitionResolver +
// adapter.SQSPartitionResolver, merged via #715 (Phase 3.D
// PR 4-B-2). Partition-resolver-first dispatch in ShardRouter
// routes (queue, partition) keys to the operator-chosen Raft
// group; coordinator helpers (groupForKey,
// routeAndGroupForKey, groupMutations) consult the resolver
// before falling through to the byte-range engine; OCC read
// keys fail closed for recognised-but-unresolved partitioned
// keys.
// - Capability poller: PollSQSHTFIFOCapability, merged via
// #721 (Phase 3.D PR 4-B-3a). PR 5 will use this for the
// CreateQueue capability gate.
// - Leadership-refusal hook:
// raftengine.RegisterLeaderAcquiredCallback +
// main_sqs_leadership_refusal.go (Phase 3.D PR 4-B-3b, this
// PR). On startup AND on every leader-acquired transition,
// the hook refuses leadership of any Raft group hosting a
// partitioned queue when the binary lacks htfifo.
//
// Both pieces are now in the binary, so the flag flips to true.
// PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the
// CreateQueue capability poll in the same commit, at which point
// a partitioned queue can land in production and every node in
// the cluster must report htfifo for the gate to allow it.
//
// Stays a const (not a var) because the flag is build-time. A
// future runtime override (env var, --no-htfifo flag for
// graceful degradation) would reroute through
// adapter.AdvertisesHTFIFO() without changing the call sites.
const htfifoCapabilityAdvertised = true

// sqsAdvertisedCapabilities returns the capability list emitted on
// /sqs_health (JSON mode). Stable iteration order is significant —
Expand All @@ -83,6 +109,20 @@ func sqsAdvertisedCapabilities() []string {
return caps
}

// AdvertisesHTFIFO reports whether this binary's /sqs_health
// endpoint lists the htfifo capability. Mirror of the package-
// internal htfifoCapabilityAdvertised constant, exposed for the
// SQS leadership-refusal hook in main.go that uses this signal
// to decide whether to refuse leadership of any Raft group hosting
// a partitioned FIFO queue.
//
// Stays a function (not an exported constant) so a future runtime
// override (env var, --no-htfifo flag for graceful degradation)
// can be threaded through here without changing the call site.
func AdvertisesHTFIFO() bool {
return htfifoCapabilityAdvertised
}

const (
sqsHealthMaxRequestBodyBytes = 1024
sqsMaxRequestBodyBytes = 1 << 20
Expand Down
23 changes: 23 additions & 0 deletions internal/raftengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,29 @@ type Admin interface {
RemoveServer(ctx context.Context, id string, prevIndex uint64) (uint64, error)
TransferLeadership(ctx context.Context) error
TransferLeadershipToServer(ctx context.Context, id string, address string) error
// RegisterLeaderAcquiredCallback registers fn to fire every
// time the local node's Raft state transitions INTO leader
// (initial election, re-election, transfer target completion).
// Callbacks fire on the previous!=Leader → status==Leader edge
// AFTER the engine has published isLeader, so a callback that
// calls engine.State() observes StateLeader.
//
// Use case: per-shard policy hooks that need to audit a
// freshly-acquired leadership ("am I still allowed to be
// leader of this group?"). The SQS HT-FIFO leadership-refusal
// hook (§8 of the split-queue FIFO design) hangs off this to
// TransferLeadership when the binary lacks the htfifo
// capability but a partitioned queue is mapped to this Raft
// group.
//
// Same non-blocking + panic-contained contract as
// LeaseProvider.RegisterLeaderLossCallback. A callback that
// needs to do real work (enumerate the catalog, call
// TransferLeadership) MUST offload to a goroutine.
//
// The returned function deregisters this specific registration
// and is safe to call multiple times.
RegisterLeaderAcquiredCallback(fn func()) (deregister func())
}

type Engine interface {
Expand Down
210 changes: 169 additions & 41 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,24 @@ type Engine struct {
leaderLossCbsMu sync.Mutex
leaderLossCbs []leaderLossSlot

// leaderAcquiredCbsMu guards the slice of callbacks invoked when
// the node transitions INTO the leader role. Callbacks fire
// synchronously from refreshStatus on the previous!=Leader →
// status==Leader edge. The MUST-be-non-blocking contract is the
// same as leaderLossCbs — a slow callback would stall every
// other holder of the same engine. See
// RegisterLeaderAcquiredCallback for the full contract.
//
// The acquired-side mirror exists so per-shard policy hooks
// (SQS HT-FIFO leadership-refusal in §8 of the split-queue FIFO
// design) can audit "we just became leader, are we still
// allowed to be?" without polling. Pairing the slot with a
// sentinel pointer mirrors the leader-loss design and lets
// deregister identify THIS registration when the same fn is
// registered multiple times.
leaderAcquiredCbsMu sync.Mutex
leaderAcquiredCbs []leaderAcquiredSlot

pendingProposals map[uint64]proposalRequest
pendingReads map[uint64]readRequest
pendingConfigs map[uint64]adminRequest
Expand Down Expand Up @@ -931,67 +949,169 @@ func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func()) {
if e == nil || fn == nil {
return func() {}
}
// Allocate a unique sentinel pointer so the deregister closure can
// identify THIS specific registration even if the same fn is
// registered multiple times.
return registerLeaderCallback(&e.leaderLossCbsMu, &e.leaderLossCbs, fn)
}

// leaderLossSlot pairs a registered callback with an id-only sentinel
// pointer so deregister can distinguish identical fn values. Aliased
// to leaderCallbackSlot so the leader-loss and leader-acquired slices
// share the same in-memory shape — the register / fire helpers are
// generic over this single slot type.
type leaderLossSlot = leaderCallbackSlot

// fireLeaderLossCallbacks invokes all registered callbacks
// synchronously. The registered-callback contract requires each fn
// to be non-blocking (a lock-free lease-invalidate flag flip), so
// inline execution is safe and avoids spawning an unbounded number
// of goroutines per leader-loss event when many shards / coordinators
// are registered.
//
// A panicking callback is still contained (see
// invokeLeaderLossCallback) so a bug in one holder cannot break
// subsequent callbacks or crash the process.
func (e *Engine) fireLeaderLossCallbacks() {
for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, &e.leaderLossCbs) {
e.invokeLeaderLossCallback(fn)
}
}

// leaderCallbackSlot is the shared on-disk shape for leader-loss
// and leader-acquired callback registrations. The id pointer is a
// per-registration sentinel so deregister can target THIS specific
// entry even when the same fn is registered multiple times.
type leaderCallbackSlot struct {
id *struct{ fn func() }
fn func()
}

// registerLeaderCallback installs fn into the (mu, cbs) callback
// slice and returns a deregister closure. Shared by leader-loss
// and leader-acquired registration so the slot-management /
// dangling-reference / sync.Once-deregister logic lives in one
// place. The two callback families differ only in which (mu, slice)
// pair they target — the firing semantics, the sentinel-pointer
// disambiguation, and the GC-safe slice truncation are identical.
func registerLeaderCallback(mu *sync.Mutex, cbs *[]leaderCallbackSlot, fn func()) (deregister func()) {
// Allocate a unique sentinel pointer so the deregister closure
// can identify THIS specific registration even if the same fn
// is registered multiple times.
slot := &struct{ fn func() }{fn: fn}
e.leaderLossCbsMu.Lock()
e.leaderLossCbs = append(e.leaderLossCbs, leaderLossSlot{id: slot, fn: fn})
e.leaderLossCbsMu.Unlock()
mu.Lock()
*cbs = append(*cbs, leaderCallbackSlot{id: slot, fn: fn})
mu.Unlock()
var once sync.Once
return func() {
once.Do(func() {
e.leaderLossCbsMu.Lock()
defer e.leaderLossCbsMu.Unlock()
for i, c := range e.leaderLossCbs {
mu.Lock()
defer mu.Unlock()
for i, c := range *cbs {
if c.id != slot {
continue
}
// Remove without leaving a dangling reference at the
// tail of the underlying array. The removed slot's fn
// typically captures a *Coordinate; a plain
// `append(cbs[:i], cbs[i+1:]...)` would keep the old
// backing cell alive and prevent GC of the associated
// Coordinate until the engine itself is dropped.
last := len(e.leaderLossCbs) - 1
copy(e.leaderLossCbs[i:], e.leaderLossCbs[i+1:])
e.leaderLossCbs[last] = leaderLossSlot{}
e.leaderLossCbs = e.leaderLossCbs[:last]
// Remove without leaving a dangling reference at
// the tail of the underlying array. The removed
// slot's fn typically captures a *Coordinate; a
// plain `append(cbs[:i], cbs[i+1:]...)` would keep
// the old backing cell alive and prevent GC of the
// associated Coordinate until the engine itself is
// dropped.
last := len(*cbs) - 1
copy((*cbs)[i:], (*cbs)[i+1:])
(*cbs)[last] = leaderCallbackSlot{}
*cbs = (*cbs)[:last]
return
}
})
}
}

// leaderLossSlot pairs a registered callback with an id-only sentinel
// pointer so deregister can distinguish identical fn values.
type leaderLossSlot struct {
id *struct{ fn func() }
fn func()
// gatherLeaderCallbacks copies the live fn list out from under the
// mutex so callers can fire them without holding the lock.
// Mirrors the snapshot-then-fire pattern used by the per-callback
// invoke helpers.
//
// Takes a *pointer* to the slice (not the slice itself) so the
// header (pointer, length, capacity) is read INSIDE the locked
// section. Passing the slice by value would dereference the
// header at the call site — i.e. before mu.Lock() — racing with
// any concurrent registerLeaderCallback. The pointer parameter
// closes that race (codex P1 / gemini high on PR #723).
func gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot) []func() {
mu.Lock()
defer mu.Unlock()
out := make([]func(), len(*cbs))
for i, c := range *cbs {
out[i] = c.fn
}
return out
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// fireLeaderLossCallbacks invokes all registered callbacks
// synchronously. The registered-callback contract requires each fn
// to be non-blocking (a lock-free lease-invalidate flag flip), so
// inline execution is safe and avoids spawning an unbounded number
// of goroutines per leader-loss event when many shards / coordinators
// are registered.
// RegisterLeaderAcquiredCallback registers fn to fire every time
// the local node's Raft state transitions INTO leader (initial
// election win, re-election after partition heal, leadership
// transfer target completion). Callbacks fire on the
// previous!=Leader → status==Leader edge in refreshStatus, after
// e.isLeader has been published, so a callback that reads
// engine.State() observes StateLeader.
//
// A panicking callback is still contained (see
// invokeLeaderLossCallback) so a bug in one holder cannot break
// subsequent callbacks or crash the process.
func (e *Engine) fireLeaderLossCallbacks() {
e.leaderLossCbsMu.Lock()
cbs := make([]func(), len(e.leaderLossCbs))
for i, c := range e.leaderLossCbs {
cbs[i] = c.fn
// Use case: per-shard policy that needs to audit a freshly-acquired
// leadership ("am I still allowed to be leader of this group?").
// SQS HT-FIFO leadership-refusal (§8 of the split-queue FIFO
// design) hangs off this hook to TransferLeadership when the
// binary lacks the htfifo capability but a partitioned queue is
// mapped to this Raft group.
//
// Callbacks run synchronously from refreshStatus and MUST be
// non-blocking — same contract as RegisterLeaderLossCallback. A
// callback wanting to do real work (e.g. enumerate the catalog,
// call TransferLeadership) MUST offload to a goroutine.
//
// A panic inside a callback is contained and logged so a bug in
// one holder cannot crash the engine or break other callbacks.
//
// The returned deregister function removes this specific
// registration and is safe to call multiple times.
func (e *Engine) RegisterLeaderAcquiredCallback(fn func()) (deregister func()) {
if e == nil || fn == nil {
return func() {}
}
e.leaderLossCbsMu.Unlock()
for _, fn := range cbs {
e.invokeLeaderLossCallback(fn)
return registerLeaderCallback(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs, fn)
}

// leaderAcquiredSlot is the leader-acquired companion to
// leaderLossSlot — both alias the shared leaderCallbackSlot so a
// single set of register / fire helpers serves both transitions.
type leaderAcquiredSlot = leaderCallbackSlot

// fireLeaderAcquiredCallbacks invokes all registered callbacks
// synchronously. Same panic-containment + non-blocking contract
// as fireLeaderLossCallbacks.
func (e *Engine) fireLeaderAcquiredCallbacks() {
for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs) {
e.invokeLeaderAcquiredCallback(fn)
}
}

func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
defer func() {
if r := recover(); r != nil {
// Mirror the leader-loss panic log shape so operator
// triage of either family produces the same fields.
// Without node identity and the stack, an SQS
// leadership-refusal hook panicking in production
// would leave only the recovered value to grep on
// (gemini medium / claude finding 1 on PR #723).
slog.Error("etcd raft engine: leader-acquired callback panicked",
slog.String("node_id", e.localID),
slog.Uint64("raft_node_id", e.nodeID),
slog.Any("panic", r),
slog.String("stack", string(debug.Stack())),
)
}
}()
fn()
}
Comment on lines +1095 to +1113
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The panic recovery logic here is less informative than its counterpart in invokeLeaderLossCallback. It should include the node identity and a stack trace to facilitate debugging of policy hook failures in production.

func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
	defer func() {
		if r := recover(); r != nil {
			slog.Error("etcd raft engine: leader-acquired callback panicked",
				slog.String("node_id", e.localID),
				slog.Uint64("raft_node_id", e.nodeID),
				slog.Any("panic", r),
				slog.String("stack", string(debug.Stack())),
			)
		}
	}()
	fn()
}
References
  1. Include node identity in logs and metadata to ensure correct context and prevent incorrect metadata reuse.


func (e *Engine) invokeLeaderLossCallback(fn func()) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -2512,6 +2632,14 @@ func (e *Engine) refreshStatus() {
if status.State == raftengine.StateLeader {
e.leaderOnce.Do(func() { close(e.leaderReady) })
}
if previous != raftengine.StateLeader && status.State == raftengine.StateLeader {
// Edge: the node has just acquired leadership. Fire the
// leader-acquired callbacks so per-shard policy hooks
// (SQS HT-FIFO leadership-refusal §8) can audit the
// transition before any client request lands. Same
// non-blocking contract as fireLeaderLossCallbacks.
e.fireLeaderAcquiredCallbacks()
}
if previous == raftengine.StateLeader && status.State != raftengine.StateLeader {
e.failPending(errors.WithStack(errNotLeader))
// Drop the per-peer ack map so a future re-election cannot
Expand Down
Loading
Loading