diff --git a/pkg/utils/sleeper_task.go b/pkg/utils/sleeper_task.go index 2024881c4..ecc0b19b9 100644 --- a/pkg/utils/sleeper_task.go +++ b/pkg/utils/sleeper_task.go @@ -2,6 +2,7 @@ package utils import ( "context" + "time" "github.com/smartcontractkit/chainlink-common/pkg/services" ) @@ -110,8 +111,7 @@ func (s *SleeperTask) WorkDone() <-chan struct{} { func (s *SleeperTask) workerLoop() { defer close(s.chDone) - ctx, cancel := s.chStop.NewCtx() - defer cancel() + ctx := &stopChanCtx{ch: s.chStop} for { select { @@ -124,6 +124,25 @@ func (s *SleeperTask) workerLoop() { } } +// stopChanCtx implements [context.Context] backed directly by a stop channel. +// Unlike [services.StopChan.NewCtx], this does not spawn a goroutine to call +// cancel(), avoiding a data race between context cancellation and concurrent +// reflect-based reads (e.g. testify mock argument formatting). +type stopChanCtx struct{ ch <-chan struct{} } + +func (c *stopChanCtx) Deadline() (time.Time, bool) { return time.Time{}, false } +func (c *stopChanCtx) Done() <-chan struct{} { return c.ch } +func (c *stopChanCtx) Value(any) any { return nil } + +func (c *stopChanCtx) Err() error { + select { + case <-c.ch: + return context.Canceled + default: + return nil + } +} + type sleeperTaskWorker struct { name string work func() diff --git a/pkg/utils/sleeper_task_test.go b/pkg/utils/sleeper_task_test.go index bc632cd36..20f8ddd2f 100644 --- a/pkg/utils/sleeper_task_test.go +++ b/pkg/utils/sleeper_task_test.go @@ -1,6 +1,9 @@ package utils_test import ( + "context" + "fmt" + "sync" "testing" "time" @@ -72,6 +75,43 @@ func TestSleeperTask_WakeupPerformsWork(t *testing.T) { require.NoError(t, sleeper.Stop()) } +// reflectWorker simulates the race scenario: Work() spawns goroutines that +// read the context via fmt.Sprintf (like testify's mock.callString), while +// Stop() concurrently triggers context cancellation. +type reflectWorker struct { + wg sync.WaitGroup +} + +func (w *reflectWorker) Name() string { return "ReflectWorker" } + +func (w *reflectWorker) Work(ctx context.Context) { + w.wg.Add(10) + for range 10 { + go func() { + defer w.wg.Done() + // Simulate testify's callString: fmt.Sprintf("%#v", ctx) reads context + // internals via reflect. With the old NewCtx()/CtxCancel pattern, cancel() + // fired concurrently, writing to context internals → DATA RACE. + _ = fmt.Sprintf("%#v", ctx) + }() + } + w.wg.Wait() +} + +func TestSleeperTask_NoConcurrentContextRace(t *testing.T) { + t.Parallel() + + // Run many iterations to increase chance of triggering a race. + for range 50 { + w := &reflectWorker{} + sleeper := utils.NewSleeperTaskCtx(w) + sleeper.WakeUp() + // Stop concurrently with Work — this closes chStop, which previously + // fired cancel() via a CtxCancel goroutine, racing with reflect reads. + require.NoError(t, sleeper.Stop()) + } +} + type controllableWorker struct { chanWorker awaitWorkStarted chan struct{}