Skip to content

Commit 017f408

Browse files
authored
Implement resumable jobs (#1226)
Here, implement "resumable" jobs, which are jobs that can checkpoint their progress so that in case they have to stop early, they're picked up from a point that lets them skip work that's already been done. This is especially useful for long running jobs that are at risk of being interrupted from something like a deploy. Here's roughly the shape of the API, with the same normal `Work` function that all jobs implement, and with a series of `ResumableStep` calls within, each of which take a name for the step and function representing it: func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error { river.ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { fmt.Println("Step 1") return nil }) river.ResumableStep(ctx, "step2", nil, func(ctx context.Context) error { fmt.Println("Step 2") return nil }) river.ResumableStep(ctx, "step3", nil, func(ctx context.Context) error { fmt.Println("Step 3") return nil }) return nil } We also provide a cursor API for more granularity. This lets a step set an arbitrary cursor value periodically as it's doing something like looping over records in a set: river.ResumableStepCursor(ctx, "process_ids", nil, func(ctx context.Context, cursor ResumableCursor) error { for _, id := range job.Args.IDs { if id <= cursor.LastProcessedID { continue } fmt.Printf("Processed %d\n", id) if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil { return err } } return nil }) The function is `ResumableStepCursor[TCursor any]` where `TCursor` can be defined arbitrarily by the user. This could be a simple scalar value representing an ID, or a more complex `struct` value containing multiple IDs, enabling nested loops that set inner and outer IDs at the same time. `ResumableStep` and `ResumableStepCursor` steps can be freely intermingled, and multiple `ResumableStepCursor` steps with different cursor types are supported. Cursors must be JSON marshable because they're stored to a job's metadata. Lastly, we provide `ResumableSetStepTx` and `ResumableSetStepCursorTx` for cases where a transaction guarantee is necessary. Normally, resumable step and cursor are set as a job's being completed, but there's a chance this is never called in case of sudden failure. `ResumableSetStepTx` (and its cursor version) is available to durably persist a step at the cost of an extra database operation similar to how `JobCompleteTx` does the same for job completion. One neat aspect the implementation here is that I was able to make it entirely middleware-only. So all the resumable job logic goes in an internal `resumableMiddleware` that's included in all clients by default. This is kind of nice because it keeps its code highly modular and will hopefully act as a template for future features.
1 parent 11ebb22 commit 017f408

17 files changed

Lines changed: 1572 additions & 9 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added "resumable jobs" that can be broken down into multiple steps and with a step persisted after it finishes that lets them skip work that's already been done. This is particularly useful for long running jobs that may experience a cancellation (like in the event of a deploy) during the span of their run. [PR #1226](https://github.com/riverqueue/river/pull/1226).
13+
1014
## [0.36.0] - 2026-05-09
1115

1216
### Added

client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/riverqueue/river/internal/notifier"
2525
"github.com/riverqueue/river/internal/notifylimiter"
2626
"github.com/riverqueue/river/internal/rivercommon"
27+
"github.com/riverqueue/river/internal/rivermiddleware"
2728
"github.com/riverqueue/river/internal/workunit"
2829
"github.com/riverqueue/river/riverdriver"
2930
"github.com/riverqueue/river/rivershared/baseservice"
@@ -782,7 +783,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
782783
// the more abstract config.Middleware for middleware are set, but not both,
783784
// so in practice we never append all three of these to each other.
784785
{
785-
middleware := config.Middleware
786+
middleware := rivermiddleware.DefaultMiddleware()
787+
middleware = append(middleware, config.Middleware...)
786788
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
787789
middleware = append(middleware, jobInsertMiddleware)
788790
}

client_test.go

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,62 @@ func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs])
8282
return nil
8383
}
8484

85+
type resumableClientTestArgs struct{}
86+
87+
func (resumableClientTestArgs) Kind() string { return "resumable_client_test" }
88+
89+
type resumableClientTestWorker struct {
90+
WorkerDefaults[resumableClientTestArgs]
91+
92+
calls []string
93+
callsMu sync.Mutex
94+
failedOnce atomic.Bool
95+
}
96+
97+
func (w *resumableClientTestWorker) Calls() []string {
98+
w.callsMu.Lock()
99+
defer w.callsMu.Unlock()
100+
101+
return append([]string(nil), w.calls...)
102+
}
103+
104+
func (w *resumableClientTestWorker) Work(ctx context.Context, job *Job[resumableClientTestArgs]) error {
105+
appendCall := func(call string) {
106+
w.callsMu.Lock()
107+
defer w.callsMu.Unlock()
108+
109+
w.calls = append(w.calls, call)
110+
}
111+
112+
ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
113+
appendCall("step1")
114+
return nil
115+
})
116+
117+
ResumableStepCursor(ctx, "step2", nil, func(ctx context.Context, cursor int) error {
118+
appendCall("step2:" + strconv.Itoa(cursor))
119+
120+
for itemID := cursor + 1; itemID <= 2; itemID++ {
121+
appendCall("item:" + strconv.Itoa(itemID))
122+
if err := ResumableSetCursor(ctx, itemID); err != nil {
123+
return err
124+
}
125+
if !w.failedOnce.Swap(true) {
126+
return errors.New("retry me")
127+
}
128+
}
129+
130+
return nil
131+
})
132+
133+
ResumableStep(ctx, "step3", nil, func(ctx context.Context) error {
134+
appendCall("step3")
135+
return nil
136+
})
137+
138+
return nil
139+
}
140+
85141
func makeAwaitWorker[T JobArgs](startedCh chan<- int64, doneCh chan struct{}) Worker[T] {
86142
return WorkFunc(func(ctx context.Context, job *Job[T]) error {
87143
client := ClientFromContext[pgx.Tx](ctx)
@@ -7306,6 +7362,58 @@ func Test_Client_JobCompletion(t *testing.T) {
73067362
require.Nil(t, reloadedJob.FinalizedAt)
73077363
})
73087364

7365+
t.Run("ResumableJobRetriesAndResumes", func(t *testing.T) {
7366+
t.Parallel()
7367+
7368+
config := newTestConfig(t, "")
7369+
config.RetryPolicy = &retrypolicytest.RetryPolicyNoJitter{}
7370+
7371+
worker := &resumableClientTestWorker{}
7372+
AddWorker(config.Workers, worker)
7373+
7374+
client, bundle := setup(t, config)
7375+
7376+
insertRes, err := client.Insert(ctx, resumableClientTestArgs{}, nil)
7377+
require.NoError(t, err)
7378+
7379+
// Wait for the first attempt to fail after step2 checkpoints cursor
7380+
// progress and intentionally returns "retry me", leaving the job queued
7381+
// for retry.
7382+
eventFailed := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
7383+
require.Equal(t, EventKindJobFailed, eventFailed.Kind)
7384+
require.Equal(t, insertRes.Job.ID, eventFailed.Job.ID)
7385+
7386+
var retryableMetadata map[string]any
7387+
require.Contains(t, []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRetryable}, eventFailed.Job.State)
7388+
require.NoError(t, json.Unmarshal(eventFailed.Job.Metadata, &retryableMetadata))
7389+
require.Equal(t, "step1", retryableMetadata["river:resumable_step"])
7390+
require.Equal(t, map[string]any{"step2": float64(1)}, retryableMetadata["river:resumable_cursor"])
7391+
7392+
// Wait for the retried attempt to resume and then complete successfully.
7393+
eventCompleted := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
7394+
require.Equal(t, EventKindJobCompleted, eventCompleted.Kind)
7395+
require.Equal(t, insertRes.Job.ID, eventCompleted.Job.ID)
7396+
7397+
reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID)
7398+
require.NoError(t, err)
7399+
require.Equal(t, rivertype.JobStateCompleted, reloadedJob.State)
7400+
require.Len(t, reloadedJob.Errors, 1)
7401+
7402+
var metadata map[string]any
7403+
require.NoError(t, json.Unmarshal(reloadedJob.Metadata, &metadata))
7404+
require.Equal(t, "step1", metadata["river:resumable_step"])
7405+
require.Equal(t, map[string]any{"step2": float64(1)}, metadata["river:resumable_cursor"])
7406+
7407+
require.Equal(t, []string{
7408+
"step1",
7409+
"step2:0",
7410+
"item:1",
7411+
"step2:1",
7412+
"item:2",
7413+
"step3",
7414+
}, worker.Calls())
7415+
})
7416+
73097417
t.Run("JobThatReturnsJobCancelErrorIsImmediatelyCancelled", func(t *testing.T) {
73107418
t.Parallel()
73117419

@@ -7972,7 +8080,7 @@ func Test_NewClient_Validations(t *testing.T) {
79728080
},
79738081
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
79748082
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
7975-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
8083+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
79768084
},
79778085
},
79788086
{
@@ -7983,7 +8091,7 @@ func Test_NewClient_Validations(t *testing.T) {
79838091
},
79848092
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
79858093
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 2)
7986-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
8094+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 3)
79878095
},
79888096
},
79898097
{
@@ -7995,7 +8103,7 @@ func Test_NewClient_Validations(t *testing.T) {
79958103
},
79968104
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
79978105
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
7998-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
8106+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
79998107
},
80008108
},
80018109
{
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"os"
8+
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
11+
"github.com/riverqueue/river"
12+
"github.com/riverqueue/river/riverdbtest"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
"github.com/riverqueue/river/rivershared/util/slogutil"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
)
18+
19+
type ResumableCursorArgs struct {
20+
IDs []int `json:"ids"`
21+
}
22+
23+
func (ResumableCursorArgs) Kind() string { return "resumable_cursor" }
24+
25+
type ResumableCursor struct {
26+
LastProcessedID int `json:"last_processed_id"`
27+
}
28+
29+
type ResumableCursorWorker struct {
30+
river.WorkerDefaults[ResumableCursorArgs]
31+
}
32+
33+
func (w *ResumableCursorWorker) Work(ctx context.Context, job *river.Job[ResumableCursorArgs]) error {
34+
river.ResumableStepCursor(ctx, "process_ids", nil, func(ctx context.Context, cursor ResumableCursor) error {
35+
for _, id := range job.Args.IDs {
36+
if id <= cursor.LastProcessedID {
37+
continue
38+
}
39+
40+
fmt.Printf("Processed %d\n", id)
41+
42+
if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
43+
return err
44+
}
45+
}
46+
47+
return nil
48+
})
49+
50+
return nil
51+
}
52+
53+
// Example_resumableCursor demonstrates the use of a resumable cursor step, a
54+
// step that can store arbitrary JSON state to resume a partially completed loop.
55+
func Example_resumableCursor() {
56+
ctx := context.Background()
57+
58+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
59+
if err != nil {
60+
panic(err)
61+
}
62+
defer dbPool.Close()
63+
64+
workers := river.NewWorkers()
65+
river.AddWorker(workers, &ResumableCursorWorker{})
66+
67+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
68+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
69+
Queues: map[string]river.QueueConfig{
70+
river.QueueDefault: {MaxWorkers: 100},
71+
},
72+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
73+
TestOnly: true, // suitable only for use in tests; remove for live environments
74+
Workers: workers,
75+
})
76+
if err != nil {
77+
panic(err)
78+
}
79+
80+
// Out of example scope, but used to wait until a job is worked.
81+
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
82+
defer subscribeCancel()
83+
84+
if err := riverClient.Start(ctx); err != nil {
85+
panic(err)
86+
}
87+
88+
if _, err = riverClient.Insert(ctx, ResumableCursorArgs{
89+
IDs: []int{1, 2, 3},
90+
}, nil); err != nil {
91+
panic(err)
92+
}
93+
94+
// Wait for jobs to complete. Only needed for purposes of the example test.
95+
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
96+
97+
if err := riverClient.Stop(ctx); err != nil {
98+
panic(err)
99+
}
100+
101+
// Output:
102+
// Processed 1
103+
// Processed 2
104+
// Processed 3
105+
}

example_resumable_job_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"os"
8+
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
11+
"github.com/riverqueue/river"
12+
"github.com/riverqueue/river/riverdbtest"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
"github.com/riverqueue/river/rivershared/util/slogutil"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
)
18+
19+
type ResumableArgs struct{}
20+
21+
func (ResumableArgs) Kind() string { return "resumable" }
22+
23+
type ResumableWorker struct {
24+
river.WorkerDefaults[ResumableArgs]
25+
}
26+
27+
func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
28+
river.ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
29+
fmt.Println("Step 1")
30+
return nil
31+
})
32+
33+
// If River was forced to stop between step1 and step2 or midway into step2,
34+
// the next run of this job skips step1 and picks up from here.
35+
river.ResumableStep(ctx, "step2", nil, func(ctx context.Context) error {
36+
fmt.Println("Step 2")
37+
return nil
38+
})
39+
40+
river.ResumableStep(ctx, "step3", nil, func(ctx context.Context) error {
41+
fmt.Println("Step 3")
42+
return nil
43+
})
44+
45+
return nil
46+
}
47+
48+
// Example_resumable demonstrates the use of a "resumable job", a job that has
49+
// multiple steps, and which can be resumed after each one.
50+
func Example_resumable() {
51+
ctx := context.Background()
52+
53+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
54+
if err != nil {
55+
panic(err)
56+
}
57+
defer dbPool.Close()
58+
59+
workers := river.NewWorkers()
60+
river.AddWorker(workers, &ResumableWorker{})
61+
62+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
63+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
64+
Queues: map[string]river.QueueConfig{
65+
river.QueueDefault: {MaxWorkers: 100},
66+
},
67+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
68+
TestOnly: true, // suitable only for use in tests; remove for live environments
69+
Workers: workers,
70+
})
71+
if err != nil {
72+
panic(err)
73+
}
74+
75+
// Out of example scope, but used to wait until a job is worked.
76+
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
77+
defer subscribeCancel()
78+
79+
if err := riverClient.Start(ctx); err != nil {
80+
panic(err)
81+
}
82+
83+
if _, err = riverClient.Insert(ctx, ResumableArgs{}, nil); err != nil {
84+
panic(err)
85+
}
86+
87+
// Wait for jobs to complete. Only needed for purposes of the example test.
88+
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
89+
90+
if err := riverClient.Stop(ctx); err != nil {
91+
panic(err)
92+
}
93+
94+
// Output:
95+
// Step 1
96+
// Step 2
97+
// Step 3
98+
}

0 commit comments

Comments
 (0)