Skip to content

Commit 46e6a39

Browse files
authored
try to fix clean up deadlock in logpoller (#463)
* try to fix clean up deadlock * add test for log poller closing deadlock
1 parent f91f459 commit 46e6a39

2 files changed

Lines changed: 42 additions & 3 deletions

File tree

pkg/logpoller/log_poller.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -522,9 +522,15 @@ func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, erro
522522

523523
func (lp *logPoller) recvReplayComplete() {
524524
defer lp.wg.Done()
525-
err := <-lp.replayComplete
526-
if err != nil {
527-
lp.lggr.Error(err)
525+
// Also listen on stopCh: if Close runs before run loop produces a replayComplete
526+
// value (e.g. a concurrent Replay observed ctx.Done at the same instant the run
527+
// loop did and produced no completion), wait would block forever.
528+
select {
529+
case err := <-lp.replayComplete:
530+
if err != nil {
531+
lp.lggr.Error(err)
532+
}
533+
case <-lp.stopCh:
528534
}
529535
}
530536

pkg/logpoller/log_poller_internal_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,39 @@ func TestLogPoller_Replay(t *testing.T) {
713713
})
714714
}
715715

716+
// TestLogPoller_Close_RecvReplayComplete_NoDeadlock is a regression test for the scenario
717+
// where Close() hangs in wg.Wait() because recvReplayComplete blocks forever on <-replayComplete.
718+
//
719+
// Sequence that previously deadlocked:
720+
// 1. Replay's ctx fires Done → Replay spawns recvReplayComplete (wg.Add(1)) and returns ErrReplayInProgress.
721+
// 2. Close's single non-blocking send to replayComplete fires to default (recvReplayComplete not yet receiving).
722+
// 3. close(stopCh) then wg.Wait() — recvReplayComplete blocks on <-replayComplete forever, wg never drains.
723+
//
724+
// Fix: recvReplayComplete also selects on stopCh so it exits when the poller shuts down.
725+
func TestLogPoller_Close_RecvReplayComplete_NoDeadlock(t *testing.T) {
726+
t.Parallel()
727+
728+
lggr := logger.Test(t)
729+
lp := NewLogPoller(nil, nil, lggr, nil, Opts{PollPeriod: time.Hour})
730+
731+
// Reproduce the race: Close's non-blocking send already went to default before
732+
// recvReplayComplete was scheduled, then stopCh was closed. No replayComplete value
733+
// will ever arrive; recvReplayComplete must exit via stopCh or wg.Wait() hangs.
734+
close(lp.stopCh)
735+
736+
lp.wg.Add(1)
737+
go lp.recvReplayComplete()
738+
739+
done := make(chan struct{})
740+
go func() { lp.wg.Wait(); close(done) }()
741+
742+
select {
743+
case <-done:
744+
case <-time.After(time.Second):
745+
t.Fatal("recvReplayComplete blocked on <-replayComplete after stopCh closed; Close() would deadlock")
746+
}
747+
}
748+
716749
func (lp *logPoller) reset() {
717750
lp.StateMachine = services.StateMachine{}
718751
lp.stopCh = make(chan struct{})

0 commit comments

Comments
 (0)