Skip to content

Commit d6ac75a

Browse files
fix: forward controlActionCancel to cancelCh in poll-mode fetchAndRunLoop (#1245)
When using drivers that don't support LISTEN/NOTIFY (e.g. riverdatabasesql), job cancel events are routed in-process via queueControlCh. The controlActionCancel case was missing from fetchAndRunLoop's queueControlCh handler, causing cancel events to be silently dropped and ctx.Done() to never fire inside a running Work() call. Forward the job ID to cancelCh so the existing maybeCancelJob call handles it, matching the behaviour of the LISTEN/NOTIFY path in handleControlNotification. Adds a test that verifies ctx.Done() fires in a running job after JobCancel is called when using a poll-only driver (SupportsListener() == false).
1 parent 5b985d5 commit d6ac75a

3 files changed

Lines changed: 48 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245).
13+
1014
## [0.39.0] - 2026-06-03
1115

1216
⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

client_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,46 @@ func Test_Client_Common(t *testing.T) {
10431043
})
10441044
})
10451045

1046+
t.Run("CancelRunningJobPollOnly", func(t *testing.T) {
1047+
t.Parallel()
1048+
1049+
config, bundle := setupConfig(t)
1050+
1051+
client, err := NewClient(NewDriverPollOnly(bundle.dbPool), config)
1052+
require.NoError(t, err)
1053+
1054+
jobStartedChan := make(chan int64)
1055+
1056+
type JobArgs struct {
1057+
testutil.JobArgsReflectKind[JobArgs]
1058+
}
1059+
1060+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
1061+
jobStartedChan <- job.ID
1062+
<-ctx.Done()
1063+
return ctx.Err()
1064+
}))
1065+
1066+
subscribeChan := subscribe(t, client)
1067+
startClient(ctx, t, client)
1068+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
1069+
1070+
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
1071+
require.NoError(t, err)
1072+
1073+
startedJobID := riversharedtest.WaitOrTimeout(t, jobStartedChan)
1074+
require.Equal(t, insertRes.Job.ID, startedJobID)
1075+
1076+
updatedJob, err := client.JobCancel(ctx, insertRes.Job.ID)
1077+
require.NoError(t, err)
1078+
require.Equal(t, rivertype.JobStateRunning, updatedJob.State)
1079+
1080+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
1081+
require.Equal(t, EventKindJobCancelled, event.Kind)
1082+
require.Equal(t, rivertype.JobStateCancelled, event.Job.State)
1083+
require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second)
1084+
})
1085+
10461086
t.Run("CancelScheduledJob", func(t *testing.T) {
10471087
t.Parallel()
10481088

producer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,8 +528,10 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) {
528528
case msg := <-p.queueControlCh:
529529
switch msg.Action {
530530
case controlActionCancel:
531-
// Separate this case to make linter happy:
532-
p.Logger.DebugContext(workCtx, p.Name+": Unhandled queue control action", "action", msg.Action)
531+
// This path is only expected to take effect in poll-only mode, and
532+
// only works for the case of a single process. Multi-process setups
533+
// will have to wait for the next poll event for a cancel to take effect.
534+
p.maybeCancelJob(workCtx, msg.JobID)
533535
case controlActionMetadataChanged:
534536
p.Logger.DebugContext(workCtx, p.Name+": Queue metadata changed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue))
535537
p.testSignals.MetadataChanged.Signal(struct{}{})

0 commit comments

Comments
 (0)