From ea43e673639a9125bea0e406cbb6553f736e6693 Mon Sep 17 00:00:00 2001 From: mboyd1 <5233780+mboyd1@users.noreply.github.com> Date: Thu, 21 May 2026 20:11:38 -0500 Subject: [PATCH 1/3] propagation: notify dispatcher of every terminalized tx so Kafka offsets release MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The arcade-propagation consumer group never committed its offset (shows "-" in kafka-consumer-groups --describe): it re-read its whole ~2.4M-msg topic on every restart, re-processing already-mined transactions and buffering the re-read in sarama fetch memory. Root cause is an offset-accounting leak in applyTerminalStatuses. The dispatcher's offsetTracker pins the Kafka commit watermark at LowestUnfinished() — the lowest admitted offset not yet Done()'d. Every admitted tx Adds its offset; handleTerminal Dones it when the tx terminalizes. But applyTerminalStatuses built the accepted/rejected txid lists — which feed BOTH the SSE bulk publish AND the dispatcher notification — only from rows whose store status actually transitioned (prev != nil && prev.Status != st.Status). A tx replayed from Kafka after a rebalance is already at its terminal status, so BatchUpdateStatusReturning reports a lattice no-op (prev.Status == st.Status) or a reaped row (prev == nil). Those txs were dropped from the notification list, so notifyTerminalToDispatcher never fired, handleTerminal never ran, the offset was never Done'd, and LowestUnfinished() pinned forever — even in a healthy long-lived claim. One stranded low offset freezes the commit watermark for good. Fix: decouple the two concerns. Dispatcher notification now covers every tx in terminalStatuses unconditionally — the offset must be released regardless of whether the store write moved the row. The bulk publish keeps its prev-row filter so a lattice no-op or reaped row doesn't emit a phantom SSE/webhook event. Also harden the claim-teardown paths so they can't strand an offset by leaking a goroutine: notifyTerminalToDispatcher and requeueToDispatcher now select on ctx, so a processBatch / requeue goroutine that outlives the dispatcher on a rebalance abandons its send instead of blocking forever on a dispatcher that has already exited. A requeue dropped on teardown is logged (it stays uncommitted and the next claim replays it — at-least-once is preserved). Tests: dispatcher_test.go adds pure-function offset-accounting coverage (admit/terminal/requeue/cascade Add/Done bookkeeping) and runDispatcher integration tests that drive a fakeClaim and assert claim.MarkMessage — the real commit-watermark signal — fires for lattice-no-op and reaped-row terminals. Both integration tests fail against the pre-fix code. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/propagation/dispatcher.go | 41 ++- services/propagation/dispatcher_test.go | 357 ++++++++++++++++++++++++ services/propagation/propagator.go | 110 ++++++-- services/propagation/propagator_test.go | 10 + 4 files changed, 484 insertions(+), 34 deletions(-) create mode 100644 services/propagation/dispatcher_test.go diff --git a/services/propagation/dispatcher.go b/services/propagation/dispatcher.go index 6f41e16..0a90cd7 100644 --- a/services/propagation/dispatcher.go +++ b/services/propagation/dispatcher.go @@ -582,10 +582,29 @@ func (p *Propagator) admitToDispatcher(msg propagationMsg, offset int64) admitRe // notifyTerminalToDispatcher is the post-broadcast helper. Tells the // dispatcher the txid reached terminal status, returns the cascaded // descendants (caller writes REJECTED rows for them). -func (p *Propagator) notifyTerminalToDispatcher(txid string, status models.Status) terminalResult { +// +// Both the send and the reply receive are guarded by ctx. processBatch +// goroutines outlive the dispatcher on a claim teardown (rebalance / +// shutdown): the dispatcher loop returns as soon as its ctx is canceled, +// so an unguarded reply receive here would block this goroutine forever +// on a dispatcher that will never answer. On ctx cancellation the notify +// is abandoned and a zero terminalResult is returned — the tx's Kafka +// offset is simply left un-Done and uncommitted, and the next claim +// replays it (at-least-once). reply is buffered so a late dispatcher +// answer after we've stopped waiting never blocks the dispatcher. +func (p *Propagator) notifyTerminalToDispatcher(ctx context.Context, txid string, status models.Status) terminalResult { reply := make(chan terminalResult, 1) - p.terminalCh <- terminalEvent{txid: txid, status: status, reply: reply} - return <-reply + select { + case p.terminalCh <- terminalEvent{txid: txid, status: status, reply: reply}: + case <-ctx.Done(): + return terminalResult{} + } + select { + case r := <-reply: + return r + case <-ctx.Done(): + return terminalResult{} + } } // drainPending asks the dispatcher for the current pendingMsgs as a @@ -603,6 +622,18 @@ func (p *Propagator) drainPending() []propagationMsg { // (held vs pending) doesn't change anything the caller can act on. // The send is buffered (dispatcherChannelBuffer); a momentarily // saturated buffer applies natural backpressure to the caller. -func (p *Propagator) requeueToDispatcher(msg propagationMsg) { - p.requeueCh <- requeueRequest{msg: msg} +// +// The send is guarded by ctx: when the claim is torn down the +// dispatcher stops draining requeueCh, so an unguarded send would block +// (and leak) the requeue goroutine once the buffer fills. Returns true +// if the requeue was handed to the dispatcher, false if ctx was +// canceled first — in which case the tx is left uncommitted for the +// next claim to replay. +func (p *Propagator) requeueToDispatcher(ctx context.Context, msg propagationMsg) bool { + select { + case p.requeueCh <- requeueRequest{msg: msg}: + return true + case <-ctx.Done(): + return false + } } diff --git a/services/propagation/dispatcher_test.go b/services/propagation/dispatcher_test.go new file mode 100644 index 0000000..928c4b5 --- /dev/null +++ b/services/propagation/dispatcher_test.go @@ -0,0 +1,357 @@ +package propagation + +import ( + "context" + "net/http" + "sync" + "testing" + "time" + + "github.com/bsv-blockchain/arcade/kafka" + "github.com/bsv-blockchain/arcade/models" +) + +// These tests pin the dispatcher's Kafka offset accounting: every offset +// Add'ed to the offsetTracker on admission must be matched by a Done on +// the tx's terminal outcome, or LowestUnfinished() — the commit +// watermark — pins and the arcade-propagation consumer group never +// commits (it shows "-" in kafka-consumer-groups --describe and re-reads +// the whole topic on every restart). +// +// Two layers of coverage: +// +// - Pure-function tests call handleAdmit / handleTerminal / +// handleRequeue / cascadeReject directly against an offsetTracker so +// the Add/Done bookkeeping is asserted in isolation. +// +// - Integration tests drive a real runDispatcher loop against a +// fakeClaim and assert claim.MarkMessage — the actual Kafka commit +// signal — fires for terminalized offsets. These are the regression +// guard for the applyTerminalStatuses offset-leak: a tx replayed +// from Kafka after a rebalance is already at its terminal status, so +// the store reports a lattice no-op; the dispatcher must still be +// told so it releases the offset. + +// --- pure-function offset accounting ------------------------------------- + +// dispatcherState is the bundle of maps runDispatcher owns as locals. +// The handle* functions take them as explicit params, so a test can +// build the bundle, drive a sequence of calls, and inspect the tracker. +type dispatcherState struct { + inFlight map[string]int64 + waiters map[string]map[string]struct{} + heldMsgs map[string]propagationMsg + pendingMsgs []propagationMsg + tracker *offsetTracker +} + +func newDispatcherState() *dispatcherState { + return &dispatcherState{ + inFlight: make(map[string]int64), + waiters: make(map[string]map[string]struct{}), + heldMsgs: make(map[string]propagationMsg), + tracker: newOffsetTracker(), + } +} + +func (s *dispatcherState) admit(txid string, offset int64, parents ...string) admitResult { + return handleAdmit( + propagationMsg{TXID: txid, RawTx: []byte{0x01}, InputTXIDs: parents}, + offset, s.inFlight, s.waiters, s.heldMsgs, &s.pendingMsgs, s.tracker, + ) +} + +func (s *dispatcherState) terminal(txid string, status models.Status) terminalResult { + return handleTerminal( + terminalEvent{txid: txid, status: status}, + s.inFlight, s.waiters, s.heldMsgs, &s.pendingMsgs, s.tracker, + ) +} + +// TestDispatcherOffsets_AdmitThenTerminal_ReleasesWatermark is the +// happy path: a tx admitted at offset N pins the watermark at N until +// it terminalizes, then the tracker drains. +func TestDispatcherOffsets_AdmitThenTerminal_ReleasesWatermark(t *testing.T) { + s := newDispatcherState() + + s.admit("a", 10) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 10 { + t.Fatalf("after admit, LowestUnfinished = (%d, %v), want (10, true)", low, ok) + } + + s.terminal("a", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("tracker must be empty after the only in-flight tx terminalizes — offset stranded") + } + if _, ok := s.inFlight["a"]; ok { + t.Fatal("terminalized tx must be removed from inFlight") + } +} + +// TestDispatcherOffsets_RejectedTerminal_DonesOffset confirms the +// REJECTED branch of handleTerminal also Dones the offset, not just the +// ACCEPTED branch. +func TestDispatcherOffsets_RejectedTerminal_DonesOffset(t *testing.T) { + s := newDispatcherState() + s.admit("bad", 42) + s.terminal("bad", models.StatusRejected) + if !s.tracker.Empty() { + t.Fatal("REJECTED terminal must Done the offset; tracker should be empty") + } +} + +// TestDispatcherOffsets_OutOfOrderTerminal_PinsAtLowest verifies the +// watermark holds at the lowest un-terminalized offset even when later +// offsets terminalize first — txs finish in broadcast order, not Kafka +// order, and committing past an un-terminalized offset would lose it +// on a crash. +func TestDispatcherOffsets_OutOfOrderTerminal_PinsAtLowest(t *testing.T) { + s := newDispatcherState() + s.admit("a", 1) + s.admit("b", 2) + s.admit("c", 3) + + s.terminal("c", models.StatusAcceptedByNetwork) + s.terminal("b", models.StatusAcceptedByNetwork) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 1 { + t.Fatalf("watermark must hold at offset 1 while 'a' is in-flight; got (%d, %v)", low, ok) + } + + s.terminal("a", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("tracker must drain once the lowest offset terminalizes") + } +} + +// TestDispatcherOffsets_Requeue_KeepsOffsetPinned is the contract that +// makes the requeue path safe: a requeue must NOT Done the offset. The +// tx is still in-flight (it'll be re-broadcast), so its offset has to +// keep pinning the watermark until it reaches a real terminal verdict. +// Done'ing it on requeue would commit past an un-broadcast tx and lose +// it on a crash. +func TestDispatcherOffsets_Requeue_KeepsOffsetPinned(t *testing.T) { + s := newDispatcherState() + s.admit("r", 7) + s.pendingMsgs = nil // simulate the drain into a broadcasting batch + + handleRequeue( + propagationMsg{TXID: "r", RawTx: []byte{0x01}}, + s.inFlight, s.waiters, s.heldMsgs, &s.pendingMsgs, + ) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 7 { + t.Fatalf("requeue must leave the offset pinned; LowestUnfinished = (%d, %v), want (7, true)", low, ok) + } + if len(s.pendingMsgs) != 1 { + t.Fatalf("requeue should put the tx back on pendingMsgs; got %d", len(s.pendingMsgs)) + } + + // The requeued tx eventually terminalizes — only then does the + // offset release. + s.terminal("r", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("offset must release once the requeued tx finally terminalizes") + } +} + +// TestDispatcherOffsets_CascadeReject_DonesEveryDescendant verifies a +// rejected parent releases the offsets of every cascade-rejected +// descendant. Those children never broadcast, so cascadeReject is their +// only path to Done — miss one and its offset pins forever. +func TestDispatcherOffsets_CascadeReject_DonesEveryDescendant(t *testing.T) { + s := newDispatcherState() + + // parent admitted and broadcasting; child + grandchild held behind it. + if r := s.admit("parent", 1); !r.admitted { + t.Fatalf("parent should admit; got %+v", r) + } + s.pendingMsgs = nil + if r := s.admit("child", 2, "parent"); !r.held { + t.Fatalf("child should be held behind parent; got %+v", r) + } + if r := s.admit("grandchild", 3, "child"); !r.held { + t.Fatalf("grandchild should be held behind child; got %+v", r) + } + + res := s.terminal("parent", models.StatusRejected) + if len(res.cascaded) != 2 { + t.Fatalf("rejecting parent should cascade to 2 descendants; got %v", res.cascaded) + } + if !s.tracker.Empty() { + t.Fatal("cascadeReject must Done the offset of every descendant; tracker should be empty") + } +} + +// TestDispatcherOffsets_HeldTxReleasedThenTerminal verifies a held tx's +// offset survives the held→pending release and is only Done'd when the +// released tx itself terminalizes. +func TestDispatcherOffsets_HeldTxReleasedThenTerminal(t *testing.T) { + s := newDispatcherState() + s.admit("parent", 1) + s.pendingMsgs = nil + s.admit("child", 2, "parent") + + // child is held; its offset 2 is in-flight behind parent's offset 1. + if low, _ := s.tracker.LowestUnfinished(); low != 1 { + t.Fatalf("watermark should be 1 (parent); got %d", low) + } + + // parent ACCEPTED releases child into pendingMsgs but does NOT + // terminalize the child — child's offset 2 must still pin. + s.terminal("parent", models.StatusAcceptedByNetwork) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 2 { + t.Fatalf("after parent ACCEPTED, watermark should move to child's offset 2; got (%d, %v)", low, ok) + } + + s.terminal("child", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("tracker must drain once the released child terminalizes") + } +} + +// --- runDispatcher integration: the commit watermark -------------------- + +// fakeClaim is a kafka.Claim that feeds messages from a channel and +// records every offset MarkMessage is called for. MarkMessage is the +// real Kafka commit-watermark signal, so a test can assert the +// dispatcher actually advanced the watermark. +type fakeClaim struct { + ch chan *kafka.Message + ctx context.Context + + mu sync.Mutex + marked map[int64]bool +} + +func newFakeClaim(ctx context.Context) *fakeClaim { + return &fakeClaim{ + ch: make(chan *kafka.Message, 64), + ctx: ctx, + marked: make(map[int64]bool), + } +} + +func (c *fakeClaim) Messages() <-chan *kafka.Message { return c.ch } +func (c *fakeClaim) Context() context.Context { return c.ctx } + +func (c *fakeClaim) MarkMessage(m *kafka.Message) { + c.mu.Lock() + c.marked[m.Offset] = true + c.mu.Unlock() +} + +func (c *fakeClaim) isMarked(offset int64) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.marked[offset] +} + +var _ kafka.Claim = (*fakeClaim)(nil) + +// runDispatcherWithClaim swaps out the test-mode dispatcher New() starts +// and runs the production runDispatcher loop against a fakeClaim +// instead, so a test can feed real Kafka messages and observe the +// commit watermark. Returns the claim and a stop func. +func runDispatcherWithClaim(t *testing.T, p *Propagator) (*fakeClaim, func()) { + t.Helper() + // The test-mode and production dispatcher loops share p.terminalCh + // et al. and must never run concurrently — cancel the test-mode one + // New() spawned and wait for it to exit first. + p.dispatcherCancel() + <-p.dispatcherDone + p.dispatcherCancel = nil + + claimCtx, cancel := context.WithCancel(context.Background()) + claim := newFakeClaim(claimCtx) + done := make(chan struct{}) + go func() { + defer close(done) + _ = p.runDispatcher(claimCtx, claim, dispatcherConfig{maxPending: 1000}) + }() + return claim, func() { + cancel() + <-done + p.WaitForBatches() + } +} + +// waitForMark polls until claim has marked offset, or fails after ~3s. +func waitForMark(t *testing.T, claim *fakeClaim, offset int64, msg string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if claim.isMarked(offset) { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("offset %d was never MarkMessage'd: %s", offset, msg) +} + +// TestRunDispatcher_TerminalTx_AdvancesCommitWatermark is the happy +// path / harness sanity check: a tx that broadcasts and is accepted +// has its Kafka offset committed. +func TestRunDispatcher_TerminalTx_AdvancesCommitWatermark(t *testing.T) { + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + p := newPropagator("", teranodeSrv.URL, newMockStore()) + claim, stop := runDispatcherWithClaim(t, p) + defer stop() + + claim.ch <- &kafka.Message{Offset: 11, Value: makePropMsg("fresh-tx")} + waitForMark(t, claim, 11, "an accepted tx must advance the commit watermark") +} + +// TestRunDispatcher_AlreadyTerminalTx_AdvancesCommitWatermark is the +// core offset-leak regression test. A tx replayed from Kafka after a +// rebalance is already at its terminal status, so +// BatchUpdateStatusReturning reports a lattice no-op (prev.Status == +// new.Status). Pre-fix, applyTerminalStatuses filtered those out of the +// dispatcher notification, so handleTerminal never ran, the offset was +// never Done'd, and LowestUnfinished() pinned forever — the +// arcade-propagation group's committed offset stuck at "-". +func TestRunDispatcher_AlreadyTerminalTx_AdvancesCommitWatermark(t *testing.T) { + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + ms := newMockStore() + // Every status write is reported as a lattice no-op: the row is + // already at exactly the status being written. + ms.returningPrev = func(s *models.TransactionStatus) *models.TransactionStatus { + return &models.TransactionStatus{ + TxID: s.TxID, + Status: s.Status, + Timestamp: time.Now().Add(-time.Hour), + } + } + + p := newPropagator("", teranodeSrv.URL, ms) + claim, stop := runDispatcherWithClaim(t, p) + defer stop() + + claim.ch <- &kafka.Message{Offset: 7, Value: makePropMsg("replayed-mined-tx")} + waitForMark(t, claim, 7, + "a lattice-no-op terminal must still notify the dispatcher — otherwise the commit watermark pins forever") +} + +// TestRunDispatcher_ReapedRowTerminal_AdvancesCommitWatermark covers the +// other skipped path: BatchUpdateStatusReturning returns a nil prev row +// (the txid is unknown — the status row was reaped between RECEIVED and +// broadcast). The offset must still be released. +func TestRunDispatcher_ReapedRowTerminal_AdvancesCommitWatermark(t *testing.T) { + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.returningPrev = func(*models.TransactionStatus) *models.TransactionStatus { + return nil // unknown / reaped row + } + + p := newPropagator("", teranodeSrv.URL, ms) + claim, stop := runDispatcherWithClaim(t, p) + defer stop() + + claim.ch <- &kafka.Message{Offset: 5, Value: makePropMsg("reaped-row-tx")} + waitForMark(t, claim, 5, + "a terminal for a reaped (nil-prev) row must still notify the dispatcher") +} diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index c56f49d..5811df6 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -329,10 +329,28 @@ func (p *Propagator) Name() string { return "propagation" } // applyTerminalStatuses persists the per-tx terminal statuses produced by // processBatch in one BatchUpdateStatusReturning call, observes the -// RECEIVED→{ACCEPTED_BY_NETWORK,REJECTED} transition age, and emits one -// PublishBulk per terminal status. Lattice no-ops (prev.Status == st.Status) -// and unknown txids (prev == nil — row reaped between RECEIVED and -// broadcast) are excluded from the bulk publish to avoid phantom events. +// RECEIVED→{ACCEPTED_BY_NETWORK,REJECTED} transition age, emits one +// PublishBulk per terminal status, and notifies the dispatcher so it can +// release/cascade waiters and mark the Kafka offset Done. +// +// It walks the batch with two deliberately separate accounting passes: +// +// - Dispatcher notification covers EVERY terminalized tx. Each tx in +// terminalStatuses still holds a live Kafka offset on the dispatcher's +// offsetTracker; the dispatcher only marks that offset Done when it +// receives the terminal event. This pass MUST NOT be filtered by +// whether the store write moved the row — a tx re-read from Kafka +// after a rebalance is typically already at its terminal status, so +// BatchUpdateStatusReturning reports a lattice no-op (prev.Status == +// st.Status) or an unknown row (prev == nil, row reaped). Skipping the +// notify for those strands the offset on the tracker and pins +// LowestUnfinished() — the Kafka commit watermark — forever, so the +// consumer group never commits and every restart re-reads the whole +// topic from offset 0. +// +// - Bulk publish covers only rows that actually transitioned. A lattice +// no-op or a reaped row would be a phantom SSE/webhook event. +// // Split out of processBatch so the surrounding flush loop stays under // nesting-complexity limits. func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses []*models.TransactionStatus, accepted, rejected int) { @@ -347,19 +365,41 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses zap.Error(err), ) // Continue: per-row entries may still be valid; bulk-publish - // those whose prev row is populated below. - } - - acceptedTxIDs := make([]string, 0, accepted) - rejectedTxIDs := make([]string, 0, rejected) + // those whose prev row is populated below. The dispatcher + // notification below runs regardless — the offset still has to + // be released even when the store write failed. + } + + // terminalAccepted/terminalRejected drive the dispatcher notification: + // every terminalized tx, unconditionally. + terminalAccepted := make([]string, 0, accepted) + terminalRejected := make([]string, 0, rejected) + // publishedAccepted/publishedRejected drive the bulk publish: only + // rows whose store status actually transitioned. + publishedAccepted := make([]string, 0, accepted) + publishedRejected := make([]string, 0, rejected) now := time.Now() for i, st := range terminalStatuses { var prev *models.TransactionStatus if i < len(prevs) { prev = prevs[i] } - // Unknown txid (row was reaped between RECEIVED and broadcast) - // or per-row store error. Skip publish to avoid phantom events. + + // Dispatcher accounting — unconditional. processBatch only routes + // ACCEPTED_BY_NETWORK and REJECTED into terminalStatuses; the + // defensive default keeps the switch exhaustive. + switch st.Status { + case models.StatusAcceptedByNetwork: + terminalAccepted = append(terminalAccepted, st.TxID) + case models.StatusRejected: + terminalRejected = append(terminalRejected, st.TxID) + default: + } + + // Publish accounting — only real transitions. An unknown txid + // (row reaped between RECEIVED and broadcast, or a per-row store + // error) or a lattice no-op is skipped here to avoid a phantom + // event; the dispatcher notify above still fires for it. if prev == nil { continue } @@ -368,25 +408,20 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses WithLabelValues(string(prev.Status), string(st.Status)). Observe(time.Since(prev.Timestamp).Seconds()) } - // Lattice no-op — no transition to fan out. if prev.Status == st.Status { continue } switch st.Status { case models.StatusAcceptedByNetwork: - acceptedTxIDs = append(acceptedTxIDs, st.TxID) + publishedAccepted = append(publishedAccepted, st.TxID) case models.StatusRejected: - rejectedTxIDs = append(rejectedTxIDs, st.TxID) + publishedRejected = append(publishedRejected, st.TxID) default: - // processBatch only routes ACCEPTED_BY_NETWORK and REJECTED - // terminal statuses into this slice; other statuses are - // either retryable (re-queued) or no_verdict (no store - // update). A defensive default keeps the switch exhaustive. } } - p.publishBulkStatus(ctx, models.StatusAcceptedByNetwork, acceptedTxIDs, now) - p.publishBulkStatus(ctx, models.StatusRejected, rejectedTxIDs, now) + p.publishBulkStatus(ctx, models.StatusAcceptedByNetwork, publishedAccepted, now) + p.publishBulkStatus(ctx, models.StatusRejected, publishedRejected, now) // Notify the dispatcher of every terminal status flip. ACCEPTED // releases waiters via the dispatcher itself (no caller action @@ -396,11 +431,11 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses // always "parent rejected" regardless of the parent's actual // cause — see persistCascadeRejections. var allCascaded []string - for _, txid := range acceptedTxIDs { - p.notifyTerminalToDispatcher(txid, models.StatusAcceptedByNetwork) + for _, txid := range terminalAccepted { + p.notifyTerminalToDispatcher(ctx, txid, models.StatusAcceptedByNetwork) } - for _, txid := range rejectedTxIDs { - r := p.notifyTerminalToDispatcher(txid, models.StatusRejected) + for _, txid := range terminalRejected { + r := p.notifyTerminalToDispatcher(ctx, txid, models.StatusRejected) allCascaded = append(allCascaded, r.cascaded...) } if len(allCascaded) > 0 { @@ -966,6 +1001,15 @@ const requeueDelay = 2 * time.Second // dispatcher. Spawns a goroutine that sleeps requeueDelay then sends // each msg to requeueCh. The goroutine bails on ctx cancellation so // claim revocation and shutdown don't hold txs in limbo. +// +// Bailing on ctx.Done is safe for at-least-once delivery: a requeue +// dropped here leaves the tx in the dispatcher's inFlight set with its +// Kafka offset still un-Done on the (per-claim) offsetTracker, so that +// offset is never marked for commit. When the claim is replaced the +// next claim re-reads the tx from Kafka and reprocesses it — the +// uncommitted offset IS the retry. The drop must not be silent though: +// it's logged so a teardown that strands a large requeue batch is +// visible in the operator's logs rather than looking like lost work. func (p *Propagator) requeueAfterDelay(ctx context.Context, msgs []propagationMsg) { if len(msgs) == 0 { return @@ -982,15 +1026,23 @@ func (p *Propagator) requeueAfterDelay(ctx context.Context, msgs []propagationMs select { case <-time.After(requeueDelay): case <-ctx.Done(): + p.logger.Debug( + "requeue dropped before delay elapsed; txs left uncommitted for replay", + zap.Int("dropped", len(msgs)), + ) return } - for _, m := range msgs { - select { - case <-ctx.Done(): + for i, m := range msgs { + // requeueToDispatcher selects on ctx, so a teardown mid-loop + // unblocks the send instead of leaking this goroutine on a + // requeueCh whose dispatcher has already exited. + if !p.requeueToDispatcher(ctx, m) { + p.logger.Debug( + "requeue dropped mid-batch; txs left uncommitted for replay", + zap.Int("dropped", len(msgs)-i), + ) return - default: } - p.requeueToDispatcher(m) } }(msgs) } diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index d03d6ef..e0ed860 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -128,6 +128,12 @@ type mockStore struct { // markErr forces MarkMerkleRegisteredByTxIDs to return this error. // Used by tests that verify a mark failure doesn't block broadcast. markErr error + // returningPrev, when non-nil, overrides the synthetic previous-status + // row BatchUpdateStatusReturning hands back for each input. The default + // (RECEIVED prev) always reads as a real transition; this hook lets + // tests drive the lattice-no-op (prev.Status == new.Status) and + // reaped-row (prev == nil) paths a Kafka-replayed tx actually hits. + returningPrev func(*models.TransactionStatus) *models.TransactionStatus } type clearedCall struct { @@ -166,6 +172,10 @@ func (m *mockStore) BatchUpdateStatusReturning(_ context.Context, statuses []*mo prevs := make([]*models.TransactionStatus, len(statuses)) for i, s := range statuses { m.updates = append(m.updates, s) + if m.returningPrev != nil { + prevs[i] = m.returningPrev(s) + continue + } prevs[i] = &models.TransactionStatus{ TxID: s.TxID, Status: models.StatusReceived, From 4d47ca4d264045094415a9713b0e8c577ccfe312 Mon Sep 17 00:00:00 2001 From: mboyd1 <5233780+mboyd1@users.noreply.github.com> Date: Thu, 21 May 2026 22:23:27 -0500 Subject: [PATCH 2/3] propagation: gate offset release on store-write success; fix reply-wait race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses two review findings on the offset-leak fix. 1. applyTerminalStatuses notified the dispatcher for every terminalized tx unconditionally — including when BatchUpdateStatusReturning returned an error. That store API reports per-row failures as a non-nil err with no per-row detail, so notifying anyway marks Kafka offsets Done and lets the commit watermark advance past a tx whose terminal status never reached the store, breaking the at-least-once coupling between Kafka and the status ledger. Now the dispatcher notification is skipped entirely on a store error: the whole batch's offsets stay uncommitted and the next claim replays it once the store recovers. A partial-batch over-replay on a rare store error is the correct trade. 2. notifyTerminalToDispatcher's reply-wait select could pick the ctx.Done() case even when the dispatcher had already replied — select chooses a ready case at random. Dropping that reply discards the cascaded txids the caller must persist, leaving descendant REJECTED rows unwritten while the dispatcher has already advanced the offset. On ctx cancellation it now does a final non-blocking read of reply and prefers a ready result. Tests: TestApplyTerminalStatuses_StoreError_DoesNotReleaseOffset (fails against pre-fix code) plus a StoreOK positive control. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/propagation/dispatcher.go | 14 +++++- services/propagation/dispatcher_test.go | 58 +++++++++++++++++++++++++ services/propagation/propagator.go | 15 +++++++ services/propagation/propagator_test.go | 6 ++- 4 files changed, 91 insertions(+), 2 deletions(-) diff --git a/services/propagation/dispatcher.go b/services/propagation/dispatcher.go index 0a90cd7..e803732 100644 --- a/services/propagation/dispatcher.go +++ b/services/propagation/dispatcher.go @@ -603,7 +603,19 @@ func (p *Propagator) notifyTerminalToDispatcher(ctx context.Context, txid string case r := <-reply: return r case <-ctx.Done(): - return terminalResult{} + // ctx and reply can both be ready at once, and select picks a + // ready case at random — so a teardown could win even though the + // dispatcher already processed the event and replied. That reply + // carries cascaded txids the caller must persist; dropping it + // leaves those REJECTED rows unwritten even though the dispatcher + // already advanced the offset. Prefer a ready reply over the + // cancellation with a final non-blocking read. + select { + case r := <-reply: + return r + default: + return terminalResult{} + } } } diff --git a/services/propagation/dispatcher_test.go b/services/propagation/dispatcher_test.go index 928c4b5..eba6013 100644 --- a/services/propagation/dispatcher_test.go +++ b/services/propagation/dispatcher_test.go @@ -2,6 +2,7 @@ package propagation import ( "context" + "errors" "net/http" "sync" "testing" @@ -355,3 +356,60 @@ func TestRunDispatcher_ReapedRowTerminal_AdvancesCommitWatermark(t *testing.T) { waitForMark(t, claim, 5, "a terminal for a reaped (nil-prev) row must still notify the dispatcher") } + +// --- store-error guard: offsets must not release on a failed write ------ + +// TestApplyTerminalStatuses_StoreError_DoesNotReleaseOffset is the +// at-least-once guard. When BatchUpdateStatusReturning fails the terminal +// status did not durably persist, so the dispatcher must NOT be notified — +// otherwise the tx's Kafka offset would be marked Done and the commit +// watermark would advance past a tx whose status never reached the store. +// The tx must stay in-flight so the offset stays uncommitted and the next +// claim replays it. +func TestApplyTerminalStatuses_StoreError_DoesNotReleaseOffset(t *testing.T) { + ms := newMockStore() + ms.returningErr = errors.New("store write failed") + p, cancel := newPropagatorForDepTest(t, ms) + defer cancel() + + // Admit a tx and move it onto the broadcasting path. + if res := p.admitToDispatcher(propagationMsg{TXID: "x", RawTx: []byte{0x01}}, 5); !res.admitted { + t.Fatalf("first admit should be admitted; got %+v", res) + } + _ = p.drainPending() + + // Terminalize it — but the store write fails. + p.applyTerminalStatuses(context.Background(), []*models.TransactionStatus{ + {TxID: "x", Status: models.StatusAcceptedByNetwork, Timestamp: time.Now()}, + }, 1, 0) + + // The dispatcher must NOT have been notified: "x" is still in flight, + // so a re-admission is detected as a duplicate (offset bookkept, not a + // fresh admit). Had the notify fired, "x" would be gone from inFlight + // and this would come back admitted. + if res := p.admitToDispatcher(propagationMsg{TXID: "x", RawTx: []byte{0x01}}, 6); !res.duplicate { + t.Fatalf("after a failed status write the tx must stay in-flight (offset uncommitted for replay); re-admit got %+v, want duplicate", res) + } +} + +// TestApplyTerminalStatuses_StoreOK_ReleasesOffset is the positive control: +// a successful status write DOES notify the dispatcher, so the tx leaves +// the in-flight set and a re-admission is a fresh admit. +func TestApplyTerminalStatuses_StoreOK_ReleasesOffset(t *testing.T) { + ms := newMockStore() + p, cancel := newPropagatorForDepTest(t, ms) + defer cancel() + + if res := p.admitToDispatcher(propagationMsg{TXID: "y", RawTx: []byte{0x01}}, 5); !res.admitted { + t.Fatalf("first admit should be admitted; got %+v", res) + } + _ = p.drainPending() + + p.applyTerminalStatuses(context.Background(), []*models.TransactionStatus{ + {TxID: "y", Status: models.StatusAcceptedByNetwork, Timestamp: time.Now()}, + }, 1, 0) + + if res := p.admitToDispatcher(propagationMsg{TXID: "y", RawTx: []byte{0x01}}, 6); !res.admitted { + t.Fatalf("after a successful terminal write the tx must be released from in-flight; re-admit got %+v, want admitted", res) + } +} diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index 5811df6..79796c5 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -423,6 +423,21 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses p.publishBulkStatus(ctx, models.StatusAcceptedByNetwork, publishedAccepted, now) p.publishBulkStatus(ctx, models.StatusRejected, publishedRejected, now) + // Notify the dispatcher ONLY when the batch status write fully + // succeeded. BatchUpdateStatusReturning surfaces per-row failures as + // a non-nil err with no per-row detail — so on any error we cannot + // tell which rows actually persisted. Notifying anyway would mark + // those txs' Kafka offsets Done and let the commit watermark advance + // past a tx whose terminal status never reached the store, breaking + // the at-least-once coupling between Kafka and the status ledger. + // Skipping the notify leaves the whole batch's offsets uncommitted; + // the next claim replays it and re-terminalizes once the store + // recovers (the reaper is the secondary backstop). A partial-batch + // over-replay on a rare store error is the correct trade. + if err != nil { + return + } + // Notify the dispatcher of every terminal status flip. ACCEPTED // releases waiters via the dispatcher itself (no caller action // needed — released msgs are appended directly to the diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index e0ed860..479dd7f 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -134,6 +134,10 @@ type mockStore struct { // tests drive the lattice-no-op (prev.Status == new.Status) and // reaped-row (prev == nil) paths a Kafka-replayed tx actually hits. returningPrev func(*models.TransactionStatus) *models.TransactionStatus + // returningErr, when non-nil, is the error BatchUpdateStatusReturning + // returns — simulating a store write failure so tests can exercise the + // at-least-once guard that skips dispatcher notification on error. + returningErr error } type clearedCall struct { @@ -182,7 +186,7 @@ func (m *mockStore) BatchUpdateStatusReturning(_ context.Context, statuses []*mo Timestamp: time.Now(), } } - return prevs, nil + return prevs, m.returningErr } func (m *mockStore) BumpRetryCount(_ context.Context, txid string) (int, error) { From 2a98f1db43215c0c9953b5f54d95fc8ae7ffa045 Mon Sep 17 00:00:00 2001 From: mboyd1 <5233780+mboyd1@users.noreply.github.com> Date: Fri, 22 May 2026 11:52:32 -0500 Subject: [PATCH 3/3] propagation: silence containedctx lint on fakeClaim test struct MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CI lint job flagged fakeClaim for holding a context.Context field (containedctx). fakeClaim has to store the claim context to implement kafka.Claim.Context() — the same shape as the real saramaClaim — so this is suppressed with a //nolint directive, matching the existing broadcastJob precedent in this package. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/propagation/dispatcher_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services/propagation/dispatcher_test.go b/services/propagation/dispatcher_test.go index eba6013..7d62793 100644 --- a/services/propagation/dispatcher_test.go +++ b/services/propagation/dispatcher_test.go @@ -216,8 +216,10 @@ func TestDispatcherOffsets_HeldTxReleasedThenTerminal(t *testing.T) { // real Kafka commit-watermark signal, so a test can assert the // dispatcher actually advanced the watermark. type fakeClaim struct { - ch chan *kafka.Message - ctx context.Context + ch chan *kafka.Message + // a kafka.Claim must expose Context(); fakeClaim has to store the + // claim context to return it, exactly as the real saramaClaim does. + ctx context.Context //nolint:containedctx // satisfies the kafka.Claim interface mu sync.Mutex marked map[int64]bool