Skip to content

Commit 33439c2

Browse files
fix: forward controlActionCancel to cancelCh in poll-mode fetchAndRunLoop
When using drivers that don't support LISTEN/NOTIFY (e.g. riverdatabasesql), job cancel events are routed in-process via queueControlCh. The controlActionCancel case was missing from fetchAndRunLoop's queueControlCh handler, causing cancel events to be silently dropped and ctx.Done() to never fire inside a running Work() call. Forward the job ID to cancelCh so the existing maybeCancelJob call handles it, matching the behaviour of the LISTEN/NOTIFY path in handleControlNotification. Adds a test that verifies ctx.Done() fires in a running job after JobCancel is called when using a poll-only driver (SupportsListener() == false).
1 parent 82c968a commit 33439c2

2 files changed

Lines changed: 45 additions & 2 deletions

File tree

client_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,46 @@ func Test_Client_Common(t *testing.T) {
987987
})
988988
})
989989

990+
t.Run("CancelRunningJobPollOnly", func(t *testing.T) {
991+
t.Parallel()
992+
993+
config, bundle := setupConfig(t)
994+
995+
client, err := NewClient(NewDriverPollOnly(bundle.dbPool), config)
996+
require.NoError(t, err)
997+
998+
jobStartedChan := make(chan int64)
999+
1000+
type JobArgs struct {
1001+
testutil.JobArgsReflectKind[JobArgs]
1002+
}
1003+
1004+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
1005+
jobStartedChan <- job.ID
1006+
<-ctx.Done()
1007+
return ctx.Err()
1008+
}))
1009+
1010+
subscribeChan := subscribe(t, client)
1011+
startClient(ctx, t, client)
1012+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
1013+
1014+
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
1015+
require.NoError(t, err)
1016+
1017+
startedJobID := riversharedtest.WaitOrTimeout(t, jobStartedChan)
1018+
require.Equal(t, insertRes.Job.ID, startedJobID)
1019+
1020+
updatedJob, err := client.JobCancel(ctx, insertRes.Job.ID)
1021+
require.NoError(t, err)
1022+
require.Equal(t, rivertype.JobStateRunning, updatedJob.State)
1023+
1024+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
1025+
require.Equal(t, EventKindJobCancelled, event.Kind)
1026+
require.Equal(t, rivertype.JobStateCancelled, event.Job.State)
1027+
require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second)
1028+
})
1029+
9901030
t.Run("CancelScheduledJob", func(t *testing.T) {
9911031
t.Parallel()
9921032

producer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,8 +528,11 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) {
528528
case msg := <-p.queueControlCh:
529529
switch msg.Action {
530530
case controlActionCancel:
531-
// Separate this case to make linter happy:
532-
p.Logger.DebugContext(workCtx, p.Name+": Unhandled queue control action", "action", msg.Action)
531+
select {
532+
case p.cancelCh <- msg.JobID:
533+
default:
534+
p.Logger.WarnContext(workCtx, p.Name+": Job cancel notification dropped due to full buffer", slog.Int64("job_id", msg.JobID))
535+
}
533536
case controlActionMetadataChanged:
534537
p.Logger.DebugContext(workCtx, p.Name+": Queue metadata changed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue))
535538
p.testSignals.MetadataChanged.Signal(struct{}{})

0 commit comments

Comments
 (0)