Skip to content

Commit 51f8346

Browse files
authored
Basic stuck job detection (#1097)
Here, try to make some inroads on a feature we've been talking about for a while: detection of stuck jobs. Unfortunately in Go it's quite easy to accidentally park a job by using a `select` on a channel that won't return and forgetting a separate branch for `<-ctx.Done()` so that it won't respect job timeouts either. Here, add in some basic detection for that case. Eventually we'd like to give users some options for what to do in case jobs become stuck, but here we do only the simplest things for now: log when we detect a stuck job and count the number of stuck jobs in a producer's stats loop. In the future we may want to have some additional intelligence like having producers move stuck jobs to a separate bucket up to a certain limit before crashing (the next best option because it's not possible to manually kill goroutines).
1 parent 3a8c6b5 commit 51f8346

5 files changed

Lines changed: 197 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097).
13+
1014
## [0.29.0-rc.1] - 2025-12-04
1115

1216
- Added `HookPeriodicJobsStart` that can be used to run custom logic when a periodic job enqueuer starts up on a new leader. [PR #1084](https://github.com/riverqueue/river/pull/1084).

internal/jobexecutor/job_executor.go

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,17 @@ type JobExecutor struct {
112112
ErrorHandler ErrorHandler
113113
HookLookupByJob *hooklookup.JobHookLookup
114114
HookLookupGlobal hooklookup.HookLookupInterface
115-
InformProducerDoneFunc func(jobRow *rivertype.JobRow)
116115
JobRow *rivertype.JobRow
117116
MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
118-
SchedulerInterval time.Duration
119-
WorkerMiddleware []rivertype.WorkerMiddleware
120-
WorkUnit workunit.WorkUnit
117+
ProducerCallbacks struct {
118+
JobDone func(jobRow *rivertype.JobRow)
119+
Stuck func()
120+
Unstuck func()
121+
}
122+
SchedulerInterval time.Duration
123+
StuckThresholdOverride time.Duration
124+
WorkerMiddleware []rivertype.WorkerMiddleware
125+
WorkUnit workunit.WorkUnit
121126

122127
// Meant to be used from within the job executor only.
123128
start time.Time
@@ -159,7 +164,7 @@ func (e *JobExecutor) Execute(ctx context.Context) {
159164
}
160165
}
161166

162-
e.InformProducerDoneFunc(e.JobRow)
167+
e.ProducerCallbacks.JobDone(e.JobRow)
163168
}
164169

165170
// Executes the job, handling a panic if necessary (and various other error
@@ -171,6 +176,59 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
171176
metadataUpdates := make(map[string]any)
172177
ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates)
173178

179+
// Watches for jobs that may have become stuck. i.e. They've run longer than
180+
// their job timeout (plus a small margin) and don't appear to be responding
181+
// to context cancellation (unfortunately, quite an easy error to make in
182+
// Go).
183+
//
184+
// Currently we don't do anything if we notice a job is stuck. Knowing about
185+
// stuck jobs is just used for informational purposes in the producer in
186+
// generating periodic stats.
187+
if e.ClientJobTimeout > 0 {
188+
// We add a WithoutCancel here so that this inner goroutine becomes
189+
// immune to all context cancellations _except_ the one where it's
190+
// cancelled because we leave JobExecutor.execute.
191+
//
192+
// This shadows the context outside the e.ClientJobTimeout > 0 check.
193+
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
194+
defer cancel()
195+
196+
go func() {
197+
const stuckThresholdDefault = 5 * time.Second
198+
199+
select {
200+
case <-ctx.Done():
201+
// context cancelled as we leave JobExecutor.execute
202+
203+
case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
204+
e.ProducerCallbacks.Stuck()
205+
206+
e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",
207+
slog.Int64("job_id", e.JobRow.ID),
208+
slog.String("kind", e.JobRow.Kind),
209+
slog.Duration("timeout", e.ClientJobTimeout),
210+
)
211+
212+
// context cancelled as we leave JobExecutor.execute
213+
<-ctx.Done()
214+
215+
// In case the executor ever becomes unstuck, inform the
216+
// producer. However, if we got all the way here there's a good
217+
// chance this will never happen (the worker is really stuck and
218+
// will never return).
219+
defer e.ProducerCallbacks.Unstuck()
220+
221+
defer func() {
222+
e.Logger.InfoContext(ctx, e.Name+": Job became unstuck",
223+
slog.Duration("duration", time.Since(e.start)),
224+
slog.Int64("job_id", e.JobRow.ID),
225+
slog.String("kind", e.JobRow.Kind),
226+
)
227+
}()
228+
}
229+
}()
230+
}
231+
174232
defer func() {
175233
if recovery := recover(); recovery != nil {
176234
e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker",

internal/jobexecutor/job_executor_test.go

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,19 @@ func TestJobExecutor_Execute(t *testing.T) {
191191
ErrorHandler: bundle.errorHandler,
192192
HookLookupByJob: hooklookup.NewJobHookLookup(),
193193
HookLookupGlobal: hooklookup.NewHookLookup(nil),
194-
InformProducerDoneFunc: func(job *rivertype.JobRow) {},
195194
JobRow: bundle.jobRow,
196195
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil),
197-
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
198-
WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow),
196+
ProducerCallbacks: struct {
197+
JobDone func(jobRow *rivertype.JobRow)
198+
Stuck func()
199+
Unstuck func()
200+
}{
201+
JobDone: func(jobRow *rivertype.JobRow) {},
202+
Stuck: func() {},
203+
Unstuck: func() {},
204+
},
205+
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
206+
WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow),
199207
})
200208

201209
return executor, bundle
@@ -696,6 +704,94 @@ func TestJobExecutor_Execute(t *testing.T) {
696704
})
697705
})
698706

707+
configureStuckDetection := func(executor *JobExecutor) {
708+
executor.ClientJobTimeout = 5 * time.Millisecond
709+
executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect
710+
}
711+
712+
t.Run("StuckDetectionActivates", func(t *testing.T) {
713+
t.Parallel()
714+
715+
executor, bundle := setup(t)
716+
717+
configureStuckDetection(executor)
718+
719+
var (
720+
informProducerStuckReceived = make(chan struct{})
721+
informProducerUnstuckReceived = make(chan struct{})
722+
)
723+
executor.ProducerCallbacks.Stuck = func() {
724+
t.Log("Job executor reported stuck")
725+
close(informProducerStuckReceived)
726+
}
727+
executor.ProducerCallbacks.Unstuck = func() {
728+
t.Log("Job executor reported unstuck (after being stuck)")
729+
close(informProducerUnstuckReceived)
730+
}
731+
732+
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
733+
riversharedtest.WaitOrTimeout(t, informProducerStuckReceived)
734+
735+
select {
736+
case <-informProducerUnstuckReceived:
737+
require.FailNow(t, "Executor should not have reported unstuck immediately")
738+
case <-time.After(10 * time.Millisecond):
739+
t.Log("Job executor still stuck after wait (this is expected)")
740+
}
741+
742+
return nil
743+
}, nil).MakeUnit(bundle.jobRow)
744+
745+
executor.Execute(ctx)
746+
_ = riversharedtest.WaitOrTimeout(t, bundle.updateCh)
747+
748+
riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
749+
})
750+
751+
// Checks that even if a work context is cancelled immediately, stuck
752+
// detection still works as expected.
753+
t.Run("StuckDetectionIgnoresParentContextCancellation", func(t *testing.T) {
754+
t.Parallel()
755+
756+
executor, bundle := setup(t)
757+
758+
configureStuckDetection(executor)
759+
760+
var (
761+
informProducerStuckReceived = make(chan struct{})
762+
informProducerUnstuckReceived = make(chan struct{})
763+
)
764+
executor.ProducerCallbacks.Stuck = func() {
765+
t.Log("Job executor reported stuck")
766+
close(informProducerStuckReceived)
767+
}
768+
executor.ProducerCallbacks.Unstuck = func() {
769+
t.Log("Job executor reported unstuck (after being stuck)")
770+
close(informProducerUnstuckReceived)
771+
}
772+
773+
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
774+
riversharedtest.WaitOrTimeout(t, informProducerStuckReceived)
775+
776+
select {
777+
case <-informProducerUnstuckReceived:
778+
require.FailNow(t, "Executor should not have reported unstuck immediately")
779+
case <-time.After(10 * time.Millisecond):
780+
t.Log("Job executor still stuck after wait (this is expected)")
781+
}
782+
783+
return nil
784+
}, nil).MakeUnit(bundle.jobRow)
785+
786+
ctx, cancel := context.WithCancel(ctx)
787+
cancel() // cancel immediately
788+
789+
executor.Execute(ctx)
790+
_ = riversharedtest.WaitOrTimeout(t, bundle.updateCh)
791+
792+
riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
793+
})
794+
699795
t.Run("Panic", func(t *testing.T) {
700796
t.Parallel()
701797

producer.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ type producer struct {
209209
// An atomic count of the number of jobs actively being worked on. This is
210210
// written to by the main goroutine, but read by the dispatcher.
211211
numJobsActive atomic.Int32
212+
numJobsStuck atomic.Int32
212213

213214
numJobsRan atomic.Uint64
214215
paused bool
@@ -771,20 +772,26 @@ func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) {
771772
ticker := time.NewTicker(5 * time.Second)
772773
defer ticker.Stop()
773774
type jobCount struct {
774-
ran uint64
775775
active int
776+
ran uint64
777+
stuck int
776778
}
777779
var prevCount jobCount
778780
for {
779781
select {
780782
case <-ctx.Done():
781783
return
782784
case <-ticker.C:
783-
curCount := jobCount{ran: p.numJobsRan.Load(), active: int(p.numJobsActive.Load())}
785+
curCount := jobCount{
786+
active: int(p.numJobsActive.Load()),
787+
ran: p.numJobsRan.Load(),
788+
stuck: int(p.numJobsStuck.Load()),
789+
}
784790
if curCount != prevCount {
785791
p.Logger.InfoContext(ctx, p.Name+": Producer job counts",
786792
slog.Uint64("num_completed_jobs", curCount.ran),
787793
slog.Int("num_jobs_running", curCount.active),
794+
slog.Int("num_jobs_stuck", curCount.stuck),
788795
slog.String("queue", p.config.Queue),
789796
)
790797
}
@@ -815,10 +822,18 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.
815822
HookLookupByJob: p.config.HookLookupByJob,
816823
HookLookupGlobal: p.config.HookLookupGlobal,
817824
MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal,
818-
InformProducerDoneFunc: p.handleWorkerDone,
819825
JobRow: job,
820-
SchedulerInterval: p.config.SchedulerInterval,
821-
WorkUnit: workUnit,
826+
ProducerCallbacks: struct {
827+
JobDone func(jobRow *rivertype.JobRow)
828+
Stuck func()
829+
Unstuck func()
830+
}{
831+
JobDone: p.handleWorkerDone,
832+
Stuck: func() { p.numJobsStuck.Add(1) },
833+
Unstuck: func() { p.numJobsStuck.Add(-1) },
834+
},
835+
SchedulerInterval: p.config.SchedulerInterval,
836+
WorkUnit: workUnit,
822837
})
823838
p.addActiveJob(job.ID, executor)
824839

rivertest/worker.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,21 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job
203203
return nil
204204
},
205205
},
206-
InformProducerDoneFunc: func(job *rivertype.JobRow) { close(executionDone) },
207206
HookLookupGlobal: hooklookup.NewHookLookup(w.config.Hooks),
208207
HookLookupByJob: hooklookup.NewJobHookLookup(),
209208
JobRow: job,
210209
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(w.config.Middleware),
211-
SchedulerInterval: maintenance.JobSchedulerIntervalDefault,
212-
WorkUnit: workUnit,
210+
ProducerCallbacks: struct {
211+
JobDone func(jobRow *rivertype.JobRow)
212+
Stuck func()
213+
Unstuck func()
214+
}{
215+
JobDone: func(job *rivertype.JobRow) { close(executionDone) },
216+
Stuck: func() {},
217+
Unstuck: func() {},
218+
},
219+
SchedulerInterval: maintenance.JobSchedulerIntervalDefault,
220+
WorkUnit: workUnit,
213221
})
214222

215223
executor.Execute(jobCtx)

0 commit comments

Comments
 (0)