Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/utils/sleeper_task.go
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A custom context.Context make me nervous. I do see some in the wild now, though most contain an embedded context.Context that they are extending and sometimes don't even override any methods 🤔

Let's wait and see how the other fixes help before trying this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing for now

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
)
Expand Down Expand Up @@ -110,8 +111,7 @@ func (s *SleeperTask) WorkDone() <-chan struct{} {
func (s *SleeperTask) workerLoop() {
defer close(s.chDone)

ctx, cancel := s.chStop.NewCtx()
defer cancel()
ctx := &stopChanCtx{ch: s.chStop}

for {
select {
Expand All @@ -124,6 +124,25 @@ func (s *SleeperTask) workerLoop() {
}
}

// stopChanCtx implements [context.Context] backed directly by a stop channel.
// Unlike [services.StopChan.NewCtx], this does not spawn a goroutine to call
// cancel(), avoiding a data race between context cancellation and concurrent
// reflect-based reads (e.g. testify mock argument formatting).
type stopChanCtx struct{ ch <-chan struct{} }

func (c *stopChanCtx) Deadline() (time.Time, bool) { return time.Time{}, false }
func (c *stopChanCtx) Done() <-chan struct{} { return c.ch }
func (c *stopChanCtx) Value(any) any { return nil }

func (c *stopChanCtx) Err() error {
select {
case <-c.ch:
return context.Canceled
default:
return nil
}
}

type sleeperTaskWorker struct {
name string
work func()
Expand Down
40 changes: 40 additions & 0 deletions pkg/utils/sleeper_task_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package utils_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -72,6 +75,43 @@ func TestSleeperTask_WakeupPerformsWork(t *testing.T) {
require.NoError(t, sleeper.Stop())
}

// reflectWorker simulates the race scenario: Work() spawns goroutines that
// read the context via fmt.Sprintf (like testify's mock.callString), while
// Stop() concurrently triggers context cancellation.
type reflectWorker struct {
wg sync.WaitGroup
}

func (w *reflectWorker) Name() string { return "ReflectWorker" }

func (w *reflectWorker) Work(ctx context.Context) {
w.wg.Add(10)
for range 10 {
go func() {
defer w.wg.Done()
// Simulate testify's callString: fmt.Sprintf("%#v", ctx) reads context
// internals via reflect. With the old NewCtx()/CtxCancel pattern, cancel()
// fired concurrently, writing to context internals → DATA RACE.
_ = fmt.Sprintf("%#v", ctx)
}()
}
w.wg.Wait()
}

func TestSleeperTask_NoConcurrentContextRace(t *testing.T) {
t.Parallel()

// Run many iterations to increase chance of triggering a race.
for range 50 {
w := &reflectWorker{}
sleeper := utils.NewSleeperTaskCtx(w)
sleeper.WakeUp()
// Stop concurrently with Work — this closes chStop, which previously
// fired cancel() via a CtxCancel goroutine, racing with reflect reads.
require.NoError(t, sleeper.Stop())
}
}

type controllableWorker struct {
chanWorker
awaitWorkStarted chan struct{}
Expand Down
Loading