Skip to content

Commit 6b1dae7

Browse files
committed
Implement resumable jobs
Here, implement "resumable" jobs, which are jobs that can checkpoint their progress so that in case they have to stop early, they're picked up from a point that lets them skip work that's already been done. This is especially useful for long running jobs that are at risk of being interrupted from something like a deploy. Here's roughly the shape of the API, with the same normal `Work` function that all jobs implement, and with a series of `ResumableStep` calls within, each of which take a name for the step and function representing it: func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error { river.ResumableStep(ctx, "step1", func(ctx context.Context) error { fmt.Println("Step 1") return nil }) river.ResumableStep(ctx, "step2", func(ctx context.Context) error { fmt.Println("Step 2") return nil }) river.ResumableStep(ctx, "step3", func(ctx context.Context) error { fmt.Println("Step 3") return nil }) return nil } We also provide a cursor API for more granularity. This lets a step set an arbitrary cursor value periodically as it's doing something like looping over records in a set: river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error { for _, id := range job.Args.IDs { if id <= cursor.LastProcessedID { continue } fmt.Printf("Processed %d\n", id) if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil { return err } } return nil }) The function is `ResumableStepCursor[TCursor any]` where `TCursor` can be defined arbitrarily by the user. This could be a simple scalar value representing an ID, or a more complex `struct` value containing multiple IDs, enabling nested loops that set inner and outer IDs at the same time. `ResumableStep` and `ResumableStepCursor` steps can be freely intermingled, and multiple `ResumableStepCursor` steps with different cursor types are supported. Cursors must be JSON marshable because they're stored to a job's metadata. Lastly, we provide `ResumableSetStepTx` and `ResumableSetStepCursorTx` for cases where a transaction guarantee is necessary. Normally, resumable step and cursor are set as a job's being completed, but there's a chance this is never called in case of sudden failure. `ResumableSetStepTx` (and its cursor version) is available to durably persist a step at the cost of an extra database operation similar to how `JobCompleteTx` does the same for job completion. One neat aspect the implementation here is that I was able to make it entirely middleware-only. So all the resumable job logic goes in an internal `resumableMiddleware` that's included in all clients by default. This is kind of nice because it keeps its code highly modular and will hopefully act as a template for future features.
1 parent 86f0c84 commit 6b1dae7

17 files changed

Lines changed: 1487 additions & 9 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added "resumable jobs" that can be broken down into multiple steps and with a step persisted after it finishes that lets them skip work that's already been done. This is particularly useful for long running jobs that may experience a cancellation (like in the event of a deploy) during the span of their run. [PR #1226](https://github.com/riverqueue/river/pull/1226).
13+
1014
## [0.36.0] - 2026-05-09
1115

1216
### Added

client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/riverqueue/river/internal/notifier"
2525
"github.com/riverqueue/river/internal/notifylimiter"
2626
"github.com/riverqueue/river/internal/rivercommon"
27+
"github.com/riverqueue/river/internal/rivermiddleware"
2728
"github.com/riverqueue/river/internal/workunit"
2829
"github.com/riverqueue/river/riverdriver"
2930
"github.com/riverqueue/river/rivershared/baseservice"
@@ -782,7 +783,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
782783
// the more abstract config.Middleware for middleware are set, but not both,
783784
// so in practice we never append all three of these to each other.
784785
{
785-
middleware := config.Middleware
786+
middleware := rivermiddleware.DefaultMiddleware()
787+
middleware = append(middleware, config.Middleware...)
786788
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
787789
middleware = append(middleware, jobInsertMiddleware)
788790
}

client_test.go

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,62 @@ func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs])
8282
return nil
8383
}
8484

85+
type resumableClientTestArgs struct{}
86+
87+
func (resumableClientTestArgs) Kind() string { return "resumable_client_test" }
88+
89+
type resumableClientTestWorker struct {
90+
WorkerDefaults[resumableClientTestArgs]
91+
92+
calls []string
93+
callsMu sync.Mutex
94+
failedOnce atomic.Bool
95+
}
96+
97+
func (w *resumableClientTestWorker) Calls() []string {
98+
w.callsMu.Lock()
99+
defer w.callsMu.Unlock()
100+
101+
return append([]string(nil), w.calls...)
102+
}
103+
104+
func (w *resumableClientTestWorker) Work(ctx context.Context, job *Job[resumableClientTestArgs]) error {
105+
appendCall := func(call string) {
106+
w.callsMu.Lock()
107+
defer w.callsMu.Unlock()
108+
109+
w.calls = append(w.calls, call)
110+
}
111+
112+
ResumableStep(ctx, "step1", func(ctx context.Context) error {
113+
appendCall("step1")
114+
return nil
115+
})
116+
117+
ResumableStepCursor(ctx, "step2", func(ctx context.Context, cursor int) error {
118+
appendCall("step2:" + strconv.Itoa(cursor))
119+
120+
for itemID := cursor + 1; itemID <= 2; itemID++ {
121+
appendCall("item:" + strconv.Itoa(itemID))
122+
if err := ResumableSetCursor(ctx, itemID); err != nil {
123+
return err
124+
}
125+
if !w.failedOnce.Swap(true) {
126+
return errors.New("retry me")
127+
}
128+
}
129+
130+
return nil
131+
})
132+
133+
ResumableStep(ctx, "step3", func(ctx context.Context) error {
134+
appendCall("step3")
135+
return nil
136+
})
137+
138+
return nil
139+
}
140+
85141
func makeAwaitWorker[T JobArgs](startedCh chan<- int64, doneCh chan struct{}) Worker[T] {
86142
return WorkFunc(func(ctx context.Context, job *Job[T]) error {
87143
client := ClientFromContext[pgx.Tx](ctx)
@@ -7308,6 +7364,58 @@ func Test_Client_JobCompletion(t *testing.T) {
73087364
require.Nil(t, reloadedJob.FinalizedAt)
73097365
})
73107366

7367+
t.Run("ResumableJobRetriesAndResumes", func(t *testing.T) {
7368+
t.Parallel()
7369+
7370+
config := newTestConfig(t, "")
7371+
config.RetryPolicy = &retrypolicytest.RetryPolicyNoJitter{}
7372+
7373+
worker := &resumableClientTestWorker{}
7374+
AddWorker(config.Workers, worker)
7375+
7376+
client, bundle := setup(t, config)
7377+
7378+
insertRes, err := client.Insert(ctx, resumableClientTestArgs{}, nil)
7379+
require.NoError(t, err)
7380+
7381+
// Wait for the first attempt to fail after step2 checkpoints cursor
7382+
// progress and intentionally returns "retry me", leaving the job queued
7383+
// for retry.
7384+
eventFailed := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
7385+
require.Equal(t, EventKindJobFailed, eventFailed.Kind)
7386+
require.Equal(t, insertRes.Job.ID, eventFailed.Job.ID)
7387+
7388+
var retryableMetadata map[string]any
7389+
require.Contains(t, []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRetryable}, eventFailed.Job.State)
7390+
require.NoError(t, json.Unmarshal(eventFailed.Job.Metadata, &retryableMetadata))
7391+
require.Equal(t, "step1", retryableMetadata["river:resumable_step"])
7392+
require.Equal(t, map[string]any{"step2": float64(1)}, retryableMetadata["river:resumable_cursor"])
7393+
7394+
// Wait for the retried attempt to resume and then complete successfully.
7395+
eventCompleted := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
7396+
require.Equal(t, EventKindJobCompleted, eventCompleted.Kind)
7397+
require.Equal(t, insertRes.Job.ID, eventCompleted.Job.ID)
7398+
7399+
reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID)
7400+
require.NoError(t, err)
7401+
require.Equal(t, rivertype.JobStateCompleted, reloadedJob.State)
7402+
require.Len(t, reloadedJob.Errors, 1)
7403+
7404+
var metadata map[string]any
7405+
require.NoError(t, json.Unmarshal(reloadedJob.Metadata, &metadata))
7406+
require.Equal(t, "step1", metadata["river:resumable_step"])
7407+
require.Equal(t, map[string]any{"step2": float64(1)}, metadata["river:resumable_cursor"])
7408+
7409+
require.Equal(t, []string{
7410+
"step1",
7411+
"step2:0",
7412+
"item:1",
7413+
"step2:1",
7414+
"item:2",
7415+
"step3",
7416+
}, worker.Calls())
7417+
})
7418+
73117419
t.Run("JobThatReturnsJobCancelErrorIsImmediatelyCancelled", func(t *testing.T) {
73127420
t.Parallel()
73137421

@@ -7974,7 +8082,7 @@ func Test_NewClient_Validations(t *testing.T) {
79748082
},
79758083
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
79768084
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
7977-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
8085+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
79788086
},
79798087
},
79808088
{
@@ -7985,7 +8093,7 @@ func Test_NewClient_Validations(t *testing.T) {
79858093
},
79868094
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
79878095
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 2)
7988-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
8096+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 3)
79898097
},
79908098
},
79918099
{
@@ -7997,7 +8105,7 @@ func Test_NewClient_Validations(t *testing.T) {
79978105
},
79988106
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
79998107
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
8000-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
8108+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
80018109
},
80028110
},
80038111
{
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"os"
8+
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
11+
"github.com/riverqueue/river"
12+
"github.com/riverqueue/river/riverdbtest"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
"github.com/riverqueue/river/rivershared/util/slogutil"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
)
18+
19+
type ResumableCursorArgs struct {
20+
IDs []int `json:"ids"`
21+
}
22+
23+
func (ResumableCursorArgs) Kind() string { return "resumable_cursor" }
24+
25+
type ResumableCursor struct {
26+
LastProcessedID int `json:"last_processed_id"`
27+
}
28+
29+
type ResumableCursorWorker struct {
30+
river.WorkerDefaults[ResumableCursorArgs]
31+
}
32+
33+
func (w *ResumableCursorWorker) Work(ctx context.Context, job *river.Job[ResumableCursorArgs]) error {
34+
river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
35+
for _, id := range job.Args.IDs {
36+
if id <= cursor.LastProcessedID {
37+
continue
38+
}
39+
40+
fmt.Printf("Processed %d\n", id)
41+
42+
if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
43+
return err
44+
}
45+
}
46+
47+
return nil
48+
})
49+
50+
return nil
51+
}
52+
53+
// Example_resumableCursor demonstrates the use of a resumable cursor step, a
54+
// step that can store arbitrary JSON state to resume a partially completed loop.
55+
func Example_resumableCursor() {
56+
ctx := context.Background()
57+
58+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
59+
if err != nil {
60+
panic(err)
61+
}
62+
defer dbPool.Close()
63+
64+
workers := river.NewWorkers()
65+
river.AddWorker(workers, &ResumableCursorWorker{})
66+
67+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
68+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
69+
Queues: map[string]river.QueueConfig{
70+
river.QueueDefault: {MaxWorkers: 100},
71+
},
72+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
73+
TestOnly: true, // suitable only for use in tests; remove for live environments
74+
Workers: workers,
75+
})
76+
if err != nil {
77+
panic(err)
78+
}
79+
80+
// Out of example scope, but used to wait until a job is worked.
81+
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
82+
defer subscribeCancel()
83+
84+
if err := riverClient.Start(ctx); err != nil {
85+
panic(err)
86+
}
87+
88+
if _, err = riverClient.Insert(ctx, ResumableCursorArgs{
89+
IDs: []int{1, 2, 3},
90+
}, nil); err != nil {
91+
panic(err)
92+
}
93+
94+
// Wait for jobs to complete. Only needed for purposes of the example test.
95+
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
96+
97+
if err := riverClient.Stop(ctx); err != nil {
98+
panic(err)
99+
}
100+
101+
// Output:
102+
// Processed 1
103+
// Processed 2
104+
// Processed 3
105+
}

example_resumable_job_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"os"
8+
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
11+
"github.com/riverqueue/river"
12+
"github.com/riverqueue/river/riverdbtest"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
"github.com/riverqueue/river/rivershared/util/slogutil"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
)
18+
19+
type ResumableArgs struct{}
20+
21+
func (ResumableArgs) Kind() string { return "resumable" }
22+
23+
type ResumableWorker struct {
24+
river.WorkerDefaults[ResumableArgs]
25+
}
26+
27+
func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
28+
river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
29+
fmt.Println("Step 1")
30+
return nil
31+
})
32+
33+
// If River was forced to stop between step1 and step2 or midway into step2,
34+
// the next run of this job skips step1 and picks up from here.
35+
river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
36+
fmt.Println("Step 2")
37+
return nil
38+
})
39+
40+
river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
41+
fmt.Println("Step 3")
42+
return nil
43+
})
44+
45+
return nil
46+
}
47+
48+
// Example_resumable demonstrates the use of a "resumable job", a job that has
49+
// multiple steps, and which can be resumed after each one.
50+
func Example_resumable() {
51+
ctx := context.Background()
52+
53+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
54+
if err != nil {
55+
panic(err)
56+
}
57+
defer dbPool.Close()
58+
59+
workers := river.NewWorkers()
60+
river.AddWorker(workers, &ResumableWorker{})
61+
62+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
63+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
64+
Queues: map[string]river.QueueConfig{
65+
river.QueueDefault: {MaxWorkers: 100},
66+
},
67+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
68+
TestOnly: true, // suitable only for use in tests; remove for live environments
69+
Workers: workers,
70+
})
71+
if err != nil {
72+
panic(err)
73+
}
74+
75+
// Out of example scope, but used to wait until a job is worked.
76+
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
77+
defer subscribeCancel()
78+
79+
if err := riverClient.Start(ctx); err != nil {
80+
panic(err)
81+
}
82+
83+
if _, err = riverClient.Insert(ctx, ResumableArgs{}, nil); err != nil {
84+
panic(err)
85+
}
86+
87+
// Wait for jobs to complete. Only needed for purposes of the example test.
88+
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
89+
90+
if err := riverClient.Stop(ctx); err != nil {
91+
panic(err)
92+
}
93+
94+
// Output:
95+
// Step 1
96+
// Step 2
97+
// Step 3
98+
}

0 commit comments

Comments
 (0)