Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,15 @@ func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, erro

func (lp *logPoller) recvReplayComplete() {
defer lp.wg.Done()
err := <-lp.replayComplete
if err != nil {
lp.lggr.Error(err)
// Also listen on stopCh: if Close runs before run loop produces a replayComplete
// value (e.g. a concurrent Replay observed ctx.Done at the same instant the run
// loop did and produced no completion), wait would block forever.
select {
case err := <-lp.replayComplete:
if err != nil {
lp.lggr.Error(err)
}
case <-lp.stopCh:
}
Comment thread
Tofel marked this conversation as resolved.
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,39 @@ func TestLogPoller_Replay(t *testing.T) {
})
}

// TestLogPoller_Close_RecvReplayComplete_NoDeadlock is a regression test for the scenario
// where Close() hangs in wg.Wait() because recvReplayComplete blocks forever on <-replayComplete.
//
// Sequence that previously deadlocked:
// 1. Replay's ctx fires Done → Replay spawns recvReplayComplete (wg.Add(1)) and returns ErrReplayInProgress.
// 2. Close's single non-blocking send to replayComplete fires to default (recvReplayComplete not yet receiving).
// 3. close(stopCh) then wg.Wait() — recvReplayComplete blocks on <-replayComplete forever, wg never drains.
//
// Fix: recvReplayComplete also selects on stopCh so it exits when the poller shuts down.
func TestLogPoller_Close_RecvReplayComplete_NoDeadlock(t *testing.T) {
t.Parallel()

lggr := logger.Test(t)
lp := NewLogPoller(nil, nil, lggr, nil, Opts{PollPeriod: time.Hour})

// Reproduce the race: Close's non-blocking send already went to default before
// recvReplayComplete was scheduled, then stopCh was closed. No replayComplete value
// will ever arrive; recvReplayComplete must exit via stopCh or wg.Wait() hangs.
close(lp.stopCh)

lp.wg.Add(1)
go lp.recvReplayComplete()

done := make(chan struct{})
go func() { lp.wg.Wait(); close(done) }()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("recvReplayComplete blocked on <-replayComplete after stopCh closed; Close() would deadlock")
}
}

func (lp *logPoller) reset() {
lp.StateMachine = services.StateMachine{}
lp.stopCh = make(chan struct{})
Expand Down
Loading