diff --git a/services/propagation/dispatcher.go b/services/propagation/dispatcher.go index 6f41e16..e803732 100644 --- a/services/propagation/dispatcher.go +++ b/services/propagation/dispatcher.go @@ -582,10 +582,41 @@ func (p *Propagator) admitToDispatcher(msg propagationMsg, offset int64) admitRe // notifyTerminalToDispatcher is the post-broadcast helper. Tells the // dispatcher the txid reached terminal status, returns the cascaded // descendants (caller writes REJECTED rows for them). -func (p *Propagator) notifyTerminalToDispatcher(txid string, status models.Status) terminalResult { +// +// Both the send and the reply receive are guarded by ctx. processBatch +// goroutines outlive the dispatcher on a claim teardown (rebalance / +// shutdown): the dispatcher loop returns as soon as its ctx is canceled, +// so an unguarded reply receive here would block this goroutine forever +// on a dispatcher that will never answer. On ctx cancellation the notify +// is abandoned and a zero terminalResult is returned — the tx's Kafka +// offset is simply left un-Done and uncommitted, and the next claim +// replays it (at-least-once). reply is buffered so a late dispatcher +// answer after we've stopped waiting never blocks the dispatcher. +func (p *Propagator) notifyTerminalToDispatcher(ctx context.Context, txid string, status models.Status) terminalResult { reply := make(chan terminalResult, 1) - p.terminalCh <- terminalEvent{txid: txid, status: status, reply: reply} - return <-reply + select { + case p.terminalCh <- terminalEvent{txid: txid, status: status, reply: reply}: + case <-ctx.Done(): + return terminalResult{} + } + select { + case r := <-reply: + return r + case <-ctx.Done(): + // ctx and reply can both be ready at once, and select picks a + // ready case at random — so a teardown could win even though the + // dispatcher already processed the event and replied. That reply + // carries cascaded txids the caller must persist; dropping it + // leaves those REJECTED rows unwritten even though the dispatcher + // already advanced the offset. Prefer a ready reply over the + // cancellation with a final non-blocking read. + select { + case r := <-reply: + return r + default: + return terminalResult{} + } + } } // drainPending asks the dispatcher for the current pendingMsgs as a @@ -603,6 +634,18 @@ func (p *Propagator) drainPending() []propagationMsg { // (held vs pending) doesn't change anything the caller can act on. // The send is buffered (dispatcherChannelBuffer); a momentarily // saturated buffer applies natural backpressure to the caller. -func (p *Propagator) requeueToDispatcher(msg propagationMsg) { - p.requeueCh <- requeueRequest{msg: msg} +// +// The send is guarded by ctx: when the claim is torn down the +// dispatcher stops draining requeueCh, so an unguarded send would block +// (and leak) the requeue goroutine once the buffer fills. Returns true +// if the requeue was handed to the dispatcher, false if ctx was +// canceled first — in which case the tx is left uncommitted for the +// next claim to replay. +func (p *Propagator) requeueToDispatcher(ctx context.Context, msg propagationMsg) bool { + select { + case p.requeueCh <- requeueRequest{msg: msg}: + return true + case <-ctx.Done(): + return false + } } diff --git a/services/propagation/dispatcher_test.go b/services/propagation/dispatcher_test.go new file mode 100644 index 0000000..7d62793 --- /dev/null +++ b/services/propagation/dispatcher_test.go @@ -0,0 +1,417 @@ +package propagation + +import ( + "context" + "errors" + "net/http" + "sync" + "testing" + "time" + + "github.com/bsv-blockchain/arcade/kafka" + "github.com/bsv-blockchain/arcade/models" +) + +// These tests pin the dispatcher's Kafka offset accounting: every offset +// Add'ed to the offsetTracker on admission must be matched by a Done on +// the tx's terminal outcome, or LowestUnfinished() — the commit +// watermark — pins and the arcade-propagation consumer group never +// commits (it shows "-" in kafka-consumer-groups --describe and re-reads +// the whole topic on every restart). +// +// Two layers of coverage: +// +// - Pure-function tests call handleAdmit / handleTerminal / +// handleRequeue / cascadeReject directly against an offsetTracker so +// the Add/Done bookkeeping is asserted in isolation. +// +// - Integration tests drive a real runDispatcher loop against a +// fakeClaim and assert claim.MarkMessage — the actual Kafka commit +// signal — fires for terminalized offsets. These are the regression +// guard for the applyTerminalStatuses offset-leak: a tx replayed +// from Kafka after a rebalance is already at its terminal status, so +// the store reports a lattice no-op; the dispatcher must still be +// told so it releases the offset. + +// --- pure-function offset accounting ------------------------------------- + +// dispatcherState is the bundle of maps runDispatcher owns as locals. +// The handle* functions take them as explicit params, so a test can +// build the bundle, drive a sequence of calls, and inspect the tracker. +type dispatcherState struct { + inFlight map[string]int64 + waiters map[string]map[string]struct{} + heldMsgs map[string]propagationMsg + pendingMsgs []propagationMsg + tracker *offsetTracker +} + +func newDispatcherState() *dispatcherState { + return &dispatcherState{ + inFlight: make(map[string]int64), + waiters: make(map[string]map[string]struct{}), + heldMsgs: make(map[string]propagationMsg), + tracker: newOffsetTracker(), + } +} + +func (s *dispatcherState) admit(txid string, offset int64, parents ...string) admitResult { + return handleAdmit( + propagationMsg{TXID: txid, RawTx: []byte{0x01}, InputTXIDs: parents}, + offset, s.inFlight, s.waiters, s.heldMsgs, &s.pendingMsgs, s.tracker, + ) +} + +func (s *dispatcherState) terminal(txid string, status models.Status) terminalResult { + return handleTerminal( + terminalEvent{txid: txid, status: status}, + s.inFlight, s.waiters, s.heldMsgs, &s.pendingMsgs, s.tracker, + ) +} + +// TestDispatcherOffsets_AdmitThenTerminal_ReleasesWatermark is the +// happy path: a tx admitted at offset N pins the watermark at N until +// it terminalizes, then the tracker drains. +func TestDispatcherOffsets_AdmitThenTerminal_ReleasesWatermark(t *testing.T) { + s := newDispatcherState() + + s.admit("a", 10) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 10 { + t.Fatalf("after admit, LowestUnfinished = (%d, %v), want (10, true)", low, ok) + } + + s.terminal("a", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("tracker must be empty after the only in-flight tx terminalizes — offset stranded") + } + if _, ok := s.inFlight["a"]; ok { + t.Fatal("terminalized tx must be removed from inFlight") + } +} + +// TestDispatcherOffsets_RejectedTerminal_DonesOffset confirms the +// REJECTED branch of handleTerminal also Dones the offset, not just the +// ACCEPTED branch. +func TestDispatcherOffsets_RejectedTerminal_DonesOffset(t *testing.T) { + s := newDispatcherState() + s.admit("bad", 42) + s.terminal("bad", models.StatusRejected) + if !s.tracker.Empty() { + t.Fatal("REJECTED terminal must Done the offset; tracker should be empty") + } +} + +// TestDispatcherOffsets_OutOfOrderTerminal_PinsAtLowest verifies the +// watermark holds at the lowest un-terminalized offset even when later +// offsets terminalize first — txs finish in broadcast order, not Kafka +// order, and committing past an un-terminalized offset would lose it +// on a crash. +func TestDispatcherOffsets_OutOfOrderTerminal_PinsAtLowest(t *testing.T) { + s := newDispatcherState() + s.admit("a", 1) + s.admit("b", 2) + s.admit("c", 3) + + s.terminal("c", models.StatusAcceptedByNetwork) + s.terminal("b", models.StatusAcceptedByNetwork) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 1 { + t.Fatalf("watermark must hold at offset 1 while 'a' is in-flight; got (%d, %v)", low, ok) + } + + s.terminal("a", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("tracker must drain once the lowest offset terminalizes") + } +} + +// TestDispatcherOffsets_Requeue_KeepsOffsetPinned is the contract that +// makes the requeue path safe: a requeue must NOT Done the offset. The +// tx is still in-flight (it'll be re-broadcast), so its offset has to +// keep pinning the watermark until it reaches a real terminal verdict. +// Done'ing it on requeue would commit past an un-broadcast tx and lose +// it on a crash. +func TestDispatcherOffsets_Requeue_KeepsOffsetPinned(t *testing.T) { + s := newDispatcherState() + s.admit("r", 7) + s.pendingMsgs = nil // simulate the drain into a broadcasting batch + + handleRequeue( + propagationMsg{TXID: "r", RawTx: []byte{0x01}}, + s.inFlight, s.waiters, s.heldMsgs, &s.pendingMsgs, + ) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 7 { + t.Fatalf("requeue must leave the offset pinned; LowestUnfinished = (%d, %v), want (7, true)", low, ok) + } + if len(s.pendingMsgs) != 1 { + t.Fatalf("requeue should put the tx back on pendingMsgs; got %d", len(s.pendingMsgs)) + } + + // The requeued tx eventually terminalizes — only then does the + // offset release. + s.terminal("r", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("offset must release once the requeued tx finally terminalizes") + } +} + +// TestDispatcherOffsets_CascadeReject_DonesEveryDescendant verifies a +// rejected parent releases the offsets of every cascade-rejected +// descendant. Those children never broadcast, so cascadeReject is their +// only path to Done — miss one and its offset pins forever. +func TestDispatcherOffsets_CascadeReject_DonesEveryDescendant(t *testing.T) { + s := newDispatcherState() + + // parent admitted and broadcasting; child + grandchild held behind it. + if r := s.admit("parent", 1); !r.admitted { + t.Fatalf("parent should admit; got %+v", r) + } + s.pendingMsgs = nil + if r := s.admit("child", 2, "parent"); !r.held { + t.Fatalf("child should be held behind parent; got %+v", r) + } + if r := s.admit("grandchild", 3, "child"); !r.held { + t.Fatalf("grandchild should be held behind child; got %+v", r) + } + + res := s.terminal("parent", models.StatusRejected) + if len(res.cascaded) != 2 { + t.Fatalf("rejecting parent should cascade to 2 descendants; got %v", res.cascaded) + } + if !s.tracker.Empty() { + t.Fatal("cascadeReject must Done the offset of every descendant; tracker should be empty") + } +} + +// TestDispatcherOffsets_HeldTxReleasedThenTerminal verifies a held tx's +// offset survives the held→pending release and is only Done'd when the +// released tx itself terminalizes. +func TestDispatcherOffsets_HeldTxReleasedThenTerminal(t *testing.T) { + s := newDispatcherState() + s.admit("parent", 1) + s.pendingMsgs = nil + s.admit("child", 2, "parent") + + // child is held; its offset 2 is in-flight behind parent's offset 1. + if low, _ := s.tracker.LowestUnfinished(); low != 1 { + t.Fatalf("watermark should be 1 (parent); got %d", low) + } + + // parent ACCEPTED releases child into pendingMsgs but does NOT + // terminalize the child — child's offset 2 must still pin. + s.terminal("parent", models.StatusAcceptedByNetwork) + if low, ok := s.tracker.LowestUnfinished(); !ok || low != 2 { + t.Fatalf("after parent ACCEPTED, watermark should move to child's offset 2; got (%d, %v)", low, ok) + } + + s.terminal("child", models.StatusAcceptedByNetwork) + if !s.tracker.Empty() { + t.Fatal("tracker must drain once the released child terminalizes") + } +} + +// --- runDispatcher integration: the commit watermark -------------------- + +// fakeClaim is a kafka.Claim that feeds messages from a channel and +// records every offset MarkMessage is called for. MarkMessage is the +// real Kafka commit-watermark signal, so a test can assert the +// dispatcher actually advanced the watermark. +type fakeClaim struct { + ch chan *kafka.Message + // a kafka.Claim must expose Context(); fakeClaim has to store the + // claim context to return it, exactly as the real saramaClaim does. + ctx context.Context //nolint:containedctx // satisfies the kafka.Claim interface + + mu sync.Mutex + marked map[int64]bool +} + +func newFakeClaim(ctx context.Context) *fakeClaim { + return &fakeClaim{ + ch: make(chan *kafka.Message, 64), + ctx: ctx, + marked: make(map[int64]bool), + } +} + +func (c *fakeClaim) Messages() <-chan *kafka.Message { return c.ch } +func (c *fakeClaim) Context() context.Context { return c.ctx } + +func (c *fakeClaim) MarkMessage(m *kafka.Message) { + c.mu.Lock() + c.marked[m.Offset] = true + c.mu.Unlock() +} + +func (c *fakeClaim) isMarked(offset int64) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.marked[offset] +} + +var _ kafka.Claim = (*fakeClaim)(nil) + +// runDispatcherWithClaim swaps out the test-mode dispatcher New() starts +// and runs the production runDispatcher loop against a fakeClaim +// instead, so a test can feed real Kafka messages and observe the +// commit watermark. Returns the claim and a stop func. +func runDispatcherWithClaim(t *testing.T, p *Propagator) (*fakeClaim, func()) { + t.Helper() + // The test-mode and production dispatcher loops share p.terminalCh + // et al. and must never run concurrently — cancel the test-mode one + // New() spawned and wait for it to exit first. + p.dispatcherCancel() + <-p.dispatcherDone + p.dispatcherCancel = nil + + claimCtx, cancel := context.WithCancel(context.Background()) + claim := newFakeClaim(claimCtx) + done := make(chan struct{}) + go func() { + defer close(done) + _ = p.runDispatcher(claimCtx, claim, dispatcherConfig{maxPending: 1000}) + }() + return claim, func() { + cancel() + <-done + p.WaitForBatches() + } +} + +// waitForMark polls until claim has marked offset, or fails after ~3s. +func waitForMark(t *testing.T, claim *fakeClaim, offset int64, msg string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if claim.isMarked(offset) { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("offset %d was never MarkMessage'd: %s", offset, msg) +} + +// TestRunDispatcher_TerminalTx_AdvancesCommitWatermark is the happy +// path / harness sanity check: a tx that broadcasts and is accepted +// has its Kafka offset committed. +func TestRunDispatcher_TerminalTx_AdvancesCommitWatermark(t *testing.T) { + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + p := newPropagator("", teranodeSrv.URL, newMockStore()) + claim, stop := runDispatcherWithClaim(t, p) + defer stop() + + claim.ch <- &kafka.Message{Offset: 11, Value: makePropMsg("fresh-tx")} + waitForMark(t, claim, 11, "an accepted tx must advance the commit watermark") +} + +// TestRunDispatcher_AlreadyTerminalTx_AdvancesCommitWatermark is the +// core offset-leak regression test. A tx replayed from Kafka after a +// rebalance is already at its terminal status, so +// BatchUpdateStatusReturning reports a lattice no-op (prev.Status == +// new.Status). Pre-fix, applyTerminalStatuses filtered those out of the +// dispatcher notification, so handleTerminal never ran, the offset was +// never Done'd, and LowestUnfinished() pinned forever — the +// arcade-propagation group's committed offset stuck at "-". +func TestRunDispatcher_AlreadyTerminalTx_AdvancesCommitWatermark(t *testing.T) { + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + ms := newMockStore() + // Every status write is reported as a lattice no-op: the row is + // already at exactly the status being written. + ms.returningPrev = func(s *models.TransactionStatus) *models.TransactionStatus { + return &models.TransactionStatus{ + TxID: s.TxID, + Status: s.Status, + Timestamp: time.Now().Add(-time.Hour), + } + } + + p := newPropagator("", teranodeSrv.URL, ms) + claim, stop := runDispatcherWithClaim(t, p) + defer stop() + + claim.ch <- &kafka.Message{Offset: 7, Value: makePropMsg("replayed-mined-tx")} + waitForMark(t, claim, 7, + "a lattice-no-op terminal must still notify the dispatcher — otherwise the commit watermark pins forever") +} + +// TestRunDispatcher_ReapedRowTerminal_AdvancesCommitWatermark covers the +// other skipped path: BatchUpdateStatusReturning returns a nil prev row +// (the txid is unknown — the status row was reaped between RECEIVED and +// broadcast). The offset must still be released. +func TestRunDispatcher_ReapedRowTerminal_AdvancesCommitWatermark(t *testing.T) { + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.returningPrev = func(*models.TransactionStatus) *models.TransactionStatus { + return nil // unknown / reaped row + } + + p := newPropagator("", teranodeSrv.URL, ms) + claim, stop := runDispatcherWithClaim(t, p) + defer stop() + + claim.ch <- &kafka.Message{Offset: 5, Value: makePropMsg("reaped-row-tx")} + waitForMark(t, claim, 5, + "a terminal for a reaped (nil-prev) row must still notify the dispatcher") +} + +// --- store-error guard: offsets must not release on a failed write ------ + +// TestApplyTerminalStatuses_StoreError_DoesNotReleaseOffset is the +// at-least-once guard. When BatchUpdateStatusReturning fails the terminal +// status did not durably persist, so the dispatcher must NOT be notified — +// otherwise the tx's Kafka offset would be marked Done and the commit +// watermark would advance past a tx whose status never reached the store. +// The tx must stay in-flight so the offset stays uncommitted and the next +// claim replays it. +func TestApplyTerminalStatuses_StoreError_DoesNotReleaseOffset(t *testing.T) { + ms := newMockStore() + ms.returningErr = errors.New("store write failed") + p, cancel := newPropagatorForDepTest(t, ms) + defer cancel() + + // Admit a tx and move it onto the broadcasting path. + if res := p.admitToDispatcher(propagationMsg{TXID: "x", RawTx: []byte{0x01}}, 5); !res.admitted { + t.Fatalf("first admit should be admitted; got %+v", res) + } + _ = p.drainPending() + + // Terminalize it — but the store write fails. + p.applyTerminalStatuses(context.Background(), []*models.TransactionStatus{ + {TxID: "x", Status: models.StatusAcceptedByNetwork, Timestamp: time.Now()}, + }, 1, 0) + + // The dispatcher must NOT have been notified: "x" is still in flight, + // so a re-admission is detected as a duplicate (offset bookkept, not a + // fresh admit). Had the notify fired, "x" would be gone from inFlight + // and this would come back admitted. + if res := p.admitToDispatcher(propagationMsg{TXID: "x", RawTx: []byte{0x01}}, 6); !res.duplicate { + t.Fatalf("after a failed status write the tx must stay in-flight (offset uncommitted for replay); re-admit got %+v, want duplicate", res) + } +} + +// TestApplyTerminalStatuses_StoreOK_ReleasesOffset is the positive control: +// a successful status write DOES notify the dispatcher, so the tx leaves +// the in-flight set and a re-admission is a fresh admit. +func TestApplyTerminalStatuses_StoreOK_ReleasesOffset(t *testing.T) { + ms := newMockStore() + p, cancel := newPropagatorForDepTest(t, ms) + defer cancel() + + if res := p.admitToDispatcher(propagationMsg{TXID: "y", RawTx: []byte{0x01}}, 5); !res.admitted { + t.Fatalf("first admit should be admitted; got %+v", res) + } + _ = p.drainPending() + + p.applyTerminalStatuses(context.Background(), []*models.TransactionStatus{ + {TxID: "y", Status: models.StatusAcceptedByNetwork, Timestamp: time.Now()}, + }, 1, 0) + + if res := p.admitToDispatcher(propagationMsg{TXID: "y", RawTx: []byte{0x01}}, 6); !res.admitted { + t.Fatalf("after a successful terminal write the tx must be released from in-flight; re-admit got %+v, want admitted", res) + } +} diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index 7e424e2..81168b7 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -329,10 +329,28 @@ func (p *Propagator) Name() string { return "propagation" } // applyTerminalStatuses persists the per-tx terminal statuses produced by // processBatch in one BatchUpdateStatusReturning call, observes the -// RECEIVED→{ACCEPTED_BY_NETWORK,REJECTED} transition age, and emits one -// PublishBulk per terminal status. Lattice no-ops (prev.Status == st.Status) -// and unknown txids (prev == nil — row reaped between RECEIVED and -// broadcast) are excluded from the bulk publish to avoid phantom events. +// RECEIVED→{ACCEPTED_BY_NETWORK,REJECTED} transition age, emits one +// PublishBulk per terminal status, and notifies the dispatcher so it can +// release/cascade waiters and mark the Kafka offset Done. +// +// It walks the batch with two deliberately separate accounting passes: +// +// - Dispatcher notification covers EVERY terminalized tx. Each tx in +// terminalStatuses still holds a live Kafka offset on the dispatcher's +// offsetTracker; the dispatcher only marks that offset Done when it +// receives the terminal event. This pass MUST NOT be filtered by +// whether the store write moved the row — a tx re-read from Kafka +// after a rebalance is typically already at its terminal status, so +// BatchUpdateStatusReturning reports a lattice no-op (prev.Status == +// st.Status) or an unknown row (prev == nil, row reaped). Skipping the +// notify for those strands the offset on the tracker and pins +// LowestUnfinished() — the Kafka commit watermark — forever, so the +// consumer group never commits and every restart re-reads the whole +// topic from offset 0. +// +// - Bulk publish covers only rows that actually transitioned. A lattice +// no-op or a reaped row would be a phantom SSE/webhook event. +// // Split out of processBatch so the surrounding flush loop stays under // nesting-complexity limits. func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses []*models.TransactionStatus, accepted, rejected int) { @@ -347,19 +365,41 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses zap.Error(err), ) // Continue: per-row entries may still be valid; bulk-publish - // those whose prev row is populated below. - } - - acceptedTxIDs := make([]string, 0, accepted) - rejectedTxIDs := make([]string, 0, rejected) + // those whose prev row is populated below. The dispatcher + // notification below runs regardless — the offset still has to + // be released even when the store write failed. + } + + // terminalAccepted/terminalRejected drive the dispatcher notification: + // every terminalized tx, unconditionally. + terminalAccepted := make([]string, 0, accepted) + terminalRejected := make([]string, 0, rejected) + // publishedAccepted/publishedRejected drive the bulk publish: only + // rows whose store status actually transitioned. + publishedAccepted := make([]string, 0, accepted) + publishedRejected := make([]string, 0, rejected) now := time.Now() for i, st := range terminalStatuses { var prev *models.TransactionStatus if i < len(prevs) { prev = prevs[i] } - // Unknown txid (row was reaped between RECEIVED and broadcast) - // or per-row store error. Skip publish to avoid phantom events. + + // Dispatcher accounting — unconditional. processBatch only routes + // ACCEPTED_BY_NETWORK and REJECTED into terminalStatuses; the + // defensive default keeps the switch exhaustive. + switch st.Status { + case models.StatusAcceptedByNetwork: + terminalAccepted = append(terminalAccepted, st.TxID) + case models.StatusRejected: + terminalRejected = append(terminalRejected, st.TxID) + default: + } + + // Publish accounting — only real transitions. An unknown txid + // (row reaped between RECEIVED and broadcast, or a per-row store + // error) or a lattice no-op is skipped here to avoid a phantom + // event; the dispatcher notify above still fires for it. if prev == nil { continue } @@ -368,25 +408,35 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses WithLabelValues(string(prev.Status), string(st.Status)). Observe(time.Since(prev.Timestamp).Seconds()) } - // Lattice no-op — no transition to fan out. if prev.Status == st.Status { continue } switch st.Status { case models.StatusAcceptedByNetwork: - acceptedTxIDs = append(acceptedTxIDs, st.TxID) + publishedAccepted = append(publishedAccepted, st.TxID) case models.StatusRejected: - rejectedTxIDs = append(rejectedTxIDs, st.TxID) + publishedRejected = append(publishedRejected, st.TxID) default: - // processBatch only routes ACCEPTED_BY_NETWORK and REJECTED - // terminal statuses into this slice; other statuses are - // either retryable (re-queued) or no_verdict (no store - // update). A defensive default keeps the switch exhaustive. } } - p.publishBulkStatus(ctx, models.StatusAcceptedByNetwork, acceptedTxIDs, now) - p.publishBulkStatus(ctx, models.StatusRejected, rejectedTxIDs, now) + p.publishBulkStatus(ctx, models.StatusAcceptedByNetwork, publishedAccepted, now) + p.publishBulkStatus(ctx, models.StatusRejected, publishedRejected, now) + + // Notify the dispatcher ONLY when the batch status write fully + // succeeded. BatchUpdateStatusReturning surfaces per-row failures as + // a non-nil err with no per-row detail — so on any error we cannot + // tell which rows actually persisted. Notifying anyway would mark + // those txs' Kafka offsets Done and let the commit watermark advance + // past a tx whose terminal status never reached the store, breaking + // the at-least-once coupling between Kafka and the status ledger. + // Skipping the notify leaves the whole batch's offsets uncommitted; + // the next claim replays it and re-terminalizes once the store + // recovers (the reaper is the secondary backstop). A partial-batch + // over-replay on a rare store error is the correct trade. + if err != nil { + return + } // Notify the dispatcher of every terminal status flip. ACCEPTED // releases waiters via the dispatcher itself (no caller action @@ -396,11 +446,11 @@ func (p *Propagator) applyTerminalStatuses(ctx context.Context, terminalStatuses // always "parent rejected" regardless of the parent's actual // cause — see persistCascadeRejections. var allCascaded []string - for _, txid := range acceptedTxIDs { - p.notifyTerminalToDispatcher(txid, models.StatusAcceptedByNetwork) + for _, txid := range terminalAccepted { + p.notifyTerminalToDispatcher(ctx, txid, models.StatusAcceptedByNetwork) } - for _, txid := range rejectedTxIDs { - r := p.notifyTerminalToDispatcher(txid, models.StatusRejected) + for _, txid := range terminalRejected { + r := p.notifyTerminalToDispatcher(ctx, txid, models.StatusRejected) allCascaded = append(allCascaded, r.cascaded...) } if len(allCascaded) > 0 { @@ -937,6 +987,15 @@ const requeueDelay = 2 * time.Second // dispatcher. Spawns a goroutine that sleeps requeueDelay then sends // each msg to requeueCh. The goroutine bails on ctx cancellation so // claim revocation and shutdown don't hold txs in limbo. +// +// Bailing on ctx.Done is safe for at-least-once delivery: a requeue +// dropped here leaves the tx in the dispatcher's inFlight set with its +// Kafka offset still un-Done on the (per-claim) offsetTracker, so that +// offset is never marked for commit. When the claim is replaced the +// next claim re-reads the tx from Kafka and reprocesses it — the +// uncommitted offset IS the retry. The drop must not be silent though: +// it's logged so a teardown that strands a large requeue batch is +// visible in the operator's logs rather than looking like lost work. func (p *Propagator) requeueAfterDelay(ctx context.Context, msgs []propagationMsg) { if len(msgs) == 0 { return @@ -953,15 +1012,23 @@ func (p *Propagator) requeueAfterDelay(ctx context.Context, msgs []propagationMs select { case <-time.After(requeueDelay): case <-ctx.Done(): + p.logger.Debug( + "requeue dropped before delay elapsed; txs left uncommitted for replay", + zap.Int("dropped", len(msgs)), + ) return } - for _, m := range msgs { - select { - case <-ctx.Done(): + for i, m := range msgs { + // requeueToDispatcher selects on ctx, so a teardown mid-loop + // unblocks the send instead of leaking this goroutine on a + // requeueCh whose dispatcher has already exited. + if !p.requeueToDispatcher(ctx, m) { + p.logger.Debug( + "requeue dropped mid-batch; txs left uncommitted for replay", + zap.Int("dropped", len(msgs)-i), + ) return - default: } - p.requeueToDispatcher(m) } }(msgs) } diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index d03d6ef..479dd7f 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -128,6 +128,16 @@ type mockStore struct { // markErr forces MarkMerkleRegisteredByTxIDs to return this error. // Used by tests that verify a mark failure doesn't block broadcast. markErr error + // returningPrev, when non-nil, overrides the synthetic previous-status + // row BatchUpdateStatusReturning hands back for each input. The default + // (RECEIVED prev) always reads as a real transition; this hook lets + // tests drive the lattice-no-op (prev.Status == new.Status) and + // reaped-row (prev == nil) paths a Kafka-replayed tx actually hits. + returningPrev func(*models.TransactionStatus) *models.TransactionStatus + // returningErr, when non-nil, is the error BatchUpdateStatusReturning + // returns — simulating a store write failure so tests can exercise the + // at-least-once guard that skips dispatcher notification on error. + returningErr error } type clearedCall struct { @@ -166,13 +176,17 @@ func (m *mockStore) BatchUpdateStatusReturning(_ context.Context, statuses []*mo prevs := make([]*models.TransactionStatus, len(statuses)) for i, s := range statuses { m.updates = append(m.updates, s) + if m.returningPrev != nil { + prevs[i] = m.returningPrev(s) + continue + } prevs[i] = &models.TransactionStatus{ TxID: s.TxID, Status: models.StatusReceived, Timestamp: time.Now(), } } - return prevs, nil + return prevs, m.returningErr } func (m *mockStore) BumpRetryCount(_ context.Context, txid string) (int, error) {