Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions services/propagation/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,41 @@ func (p *Propagator) admitToDispatcher(msg propagationMsg, offset int64) admitRe
// notifyTerminalToDispatcher is the post-broadcast helper. Tells the
// dispatcher the txid reached terminal status, returns the cascaded
// descendants (caller writes REJECTED rows for them).
func (p *Propagator) notifyTerminalToDispatcher(txid string, status models.Status) terminalResult {
//
// Both the send and the reply receive are guarded by ctx. processBatch
// goroutines outlive the dispatcher on a claim teardown (rebalance /
// shutdown): the dispatcher loop returns as soon as its ctx is canceled,
// so an unguarded reply receive here would block this goroutine forever
// on a dispatcher that will never answer. On ctx cancellation the notify
// is abandoned and a zero terminalResult is returned — the tx's Kafka
// offset is simply left un-Done and uncommitted, and the next claim
// replays it (at-least-once). reply is buffered so a late dispatcher
// answer after we've stopped waiting never blocks the dispatcher.
func (p *Propagator) notifyTerminalToDispatcher(ctx context.Context, txid string, status models.Status) terminalResult {
reply := make(chan terminalResult, 1)
p.terminalCh <- terminalEvent{txid: txid, status: status, reply: reply}
return <-reply
select {
case p.terminalCh <- terminalEvent{txid: txid, status: status, reply: reply}:
case <-ctx.Done():
return terminalResult{}
}
select {
case r := <-reply:
return r
case <-ctx.Done():
// ctx and reply can both be ready at once, and select picks a
// ready case at random — so a teardown could win even though the
// dispatcher already processed the event and replied. That reply
// carries cascaded txids the caller must persist; dropping it
// leaves those REJECTED rows unwritten even though the dispatcher
// already advanced the offset. Prefer a ready reply over the
// cancellation with a final non-blocking read.
select {
case r := <-reply:
return r
default:
return terminalResult{}
}
}
}

// drainPending asks the dispatcher for the current pendingMsgs as a
Expand All @@ -603,6 +634,18 @@ func (p *Propagator) drainPending() []propagationMsg {
// (held vs pending) doesn't change anything the caller can act on.
// The send is buffered (dispatcherChannelBuffer); a momentarily
// saturated buffer applies natural backpressure to the caller.
func (p *Propagator) requeueToDispatcher(msg propagationMsg) {
p.requeueCh <- requeueRequest{msg: msg}
//
// The send is guarded by ctx: when the claim is torn down the
// dispatcher stops draining requeueCh, so an unguarded send would block
// (and leak) the requeue goroutine once the buffer fills. Returns true
// if the requeue was handed to the dispatcher, false if ctx was
// canceled first — in which case the tx is left uncommitted for the
// next claim to replay.
func (p *Propagator) requeueToDispatcher(ctx context.Context, msg propagationMsg) bool {
select {
case p.requeueCh <- requeueRequest{msg: msg}:
return true
case <-ctx.Done():
return false
}
}
Loading
Loading