Skip to content

Commit 010be75

Browse files
committed
Implement resumable jobs
Here, implement "resumable" jobs, which are jobs that can checkpoint their progress so that in case they have to stop early, they're picked up from a point that lets them skip work that's already been done. This is especially useful for long running jobs that are at risk of being interrupted from something like a deploy. Here's roughly the shape of the API, with the same normal `Work` function that all jobs implement, and with a series of `ResumableStep` calls within, each of which take a name for the step and function representing it: func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error { river.ResumableStep(ctx, "step1", func(ctx context.Context) error { fmt.Println("Step 1") return nil }) river.ResumableStep(ctx, "step2", func(ctx context.Context) error { fmt.Println("Step 2") return nil }) river.ResumableStep(ctx, "step3", func(ctx context.Context) error { fmt.Println("Step 3") return nil }) return nil } We also provide a cursor API for more granularity. This lets a step set an arbitrary cursor value periodically as it's doing something like looping over records in a set: river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error { for _, id := range job.Args.IDs { if id <= cursor.LastProcessedID { continue } fmt.Printf("Processed %d\n", id) if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil { return err } } return nil }) The function is `ResumableStepCursor[TCursor any]` where `TCursor` can be defined arbitrarily by the user. This could be a simple scalar value representing an ID, or a more complex `struct` value containing multiple IDs, enabling nested loops that set inner and outer IDs at the same time. `ResumableStep` and `ResumableStepCursor` steps can be freely intermingled, and multiple `ResumableStepCursor` steps with different cursor types are supported. Cursors must be JSON marshable because they're stored to a job's metadata. Lastly, we provide `ResumableSetStepTx` and `ResumableSetStepCursorTx` for cases where a transaction guarantee is necessary. Normally, resumable step and cursor are set as a job's being completed, but there's a chance this is never called in case of sudden failure. `ResumableSetStepTx` (and its cursor version) is available to durably persist a step at the cost of an extra database operation similar to how `JobCompleteTx` does the same for job completion. One neat aspect the implementation here is that I was able to make it entirely middleware-only. So all the resumable job logic goes in an internal `resumableMiddleware` that's included in all clients by default. This is kind of nice because it keeps its code highly modular and will hopefully act as a template for future features.
1 parent a4143a0 commit 010be75

13 files changed

Lines changed: 1182 additions & 8 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added "resumable jobs" that can be broken down into multiple steps and with a step persisted after it finishes that lets them skip work that's already been done. This is particularly useful for long running jobs that may experience a cancellation (like in the event of a deploy) during the span of their run. [PR #1226](https://github.com/riverqueue/river/pull/1226).
13+
1014
## [0.35.0] - 2026-04-18
1115

1216
### Changed

client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
780780
// the more abstract config.Middleware for middleware are set, but not both,
781781
// so in practice we never append all three of these to each other.
782782
{
783-
middleware := config.Middleware
783+
middleware := defaultMiddleware()
784+
middleware = append(middleware, config.Middleware...)
784785
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
785786
middleware = append(middleware, jobInsertMiddleware)
786787
}
@@ -1002,6 +1003,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
10021003
return client, nil
10031004
}
10041005

1006+
func defaultMiddleware() []rivertype.Middleware {
1007+
return []rivertype.Middleware{&resumableMiddleware{}}
1008+
}
1009+
10051010
// Start starts the client's job fetching and working loops. Once this is called,
10061011
// the client will run in a background goroutine until stopped. All jobs are
10071012
// run with a context inheriting from the provided context, but with a timeout

client_test.go

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,62 @@ func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs])
8282
return nil
8383
}
8484

85+
type resumableClientTestArgs struct{}
86+
87+
func (resumableClientTestArgs) Kind() string { return "resumable_client_test" }
88+
89+
type resumableClientTestWorker struct {
90+
WorkerDefaults[resumableClientTestArgs]
91+
92+
calls []string
93+
callsMu sync.Mutex
94+
failedOnce atomic.Bool
95+
}
96+
97+
func (w *resumableClientTestWorker) Calls() []string {
98+
w.callsMu.Lock()
99+
defer w.callsMu.Unlock()
100+
101+
return append([]string(nil), w.calls...)
102+
}
103+
104+
func (w *resumableClientTestWorker) Work(ctx context.Context, job *Job[resumableClientTestArgs]) error {
105+
appendCall := func(call string) {
106+
w.callsMu.Lock()
107+
defer w.callsMu.Unlock()
108+
109+
w.calls = append(w.calls, call)
110+
}
111+
112+
ResumableStep(ctx, "step1", func(ctx context.Context) error {
113+
appendCall("step1")
114+
return nil
115+
})
116+
117+
ResumableStepCursor(ctx, "step2", func(ctx context.Context, cursor int) error {
118+
appendCall("step2:" + strconv.Itoa(cursor))
119+
120+
for itemID := cursor + 1; itemID <= 2; itemID++ {
121+
appendCall("item:" + strconv.Itoa(itemID))
122+
if err := ResumableSetCursor(ctx, itemID); err != nil {
123+
return err
124+
}
125+
if !w.failedOnce.Swap(true) {
126+
return errors.New("retry me")
127+
}
128+
}
129+
130+
return nil
131+
})
132+
133+
ResumableStep(ctx, "step3", func(ctx context.Context) error {
134+
appendCall("step3")
135+
return nil
136+
})
137+
138+
return nil
139+
}
140+
85141
func makeAwaitWorker[T JobArgs](startedCh chan<- int64, doneCh chan struct{}) Worker[T] {
86142
return WorkFunc(func(ctx context.Context, job *Job[T]) error {
87143
client := ClientFromContext[pgx.Tx](ctx)
@@ -6936,6 +6992,58 @@ func Test_Client_JobCompletion(t *testing.T) {
69366992
require.Nil(t, reloadedJob.FinalizedAt)
69376993
})
69386994

6995+
t.Run("ResumableJobRetriesAndResumes", func(t *testing.T) {
6996+
t.Parallel()
6997+
6998+
config := newTestConfig(t, "")
6999+
config.RetryPolicy = &retrypolicytest.RetryPolicyNoJitter{}
7000+
7001+
worker := &resumableClientTestWorker{}
7002+
AddWorker(config.Workers, worker)
7003+
7004+
client, bundle := setup(t, config)
7005+
7006+
insertRes, err := client.Insert(ctx, resumableClientTestArgs{}, nil)
7007+
require.NoError(t, err)
7008+
7009+
// Wait for the first attempt to fail after step2 checkpoints cursor
7010+
// progress and intentionally returns "retry me", leaving the job queued
7011+
// for retry.
7012+
eventFailed := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
7013+
require.Equal(t, EventKindJobFailed, eventFailed.Kind)
7014+
require.Equal(t, insertRes.Job.ID, eventFailed.Job.ID)
7015+
7016+
var retryableMetadata map[string]any
7017+
require.Contains(t, []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRetryable}, eventFailed.Job.State)
7018+
require.NoError(t, json.Unmarshal(eventFailed.Job.Metadata, &retryableMetadata))
7019+
require.Equal(t, "step1", retryableMetadata["river:resumable_step"])
7020+
require.Equal(t, map[string]any{"step2": float64(1)}, retryableMetadata["river:resumable_cursor"])
7021+
7022+
// Wait for the retried attempt to resume and then complete successfully.
7023+
eventCompleted := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
7024+
require.Equal(t, EventKindJobCompleted, eventCompleted.Kind)
7025+
require.Equal(t, insertRes.Job.ID, eventCompleted.Job.ID)
7026+
7027+
reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID)
7028+
require.NoError(t, err)
7029+
require.Equal(t, rivertype.JobStateCompleted, reloadedJob.State)
7030+
require.Len(t, reloadedJob.Errors, 1)
7031+
7032+
var metadata map[string]any
7033+
require.NoError(t, json.Unmarshal(reloadedJob.Metadata, &metadata))
7034+
require.Equal(t, "step1", metadata["river:resumable_step"])
7035+
require.Equal(t, map[string]any{"step2": float64(1)}, metadata["river:resumable_cursor"])
7036+
7037+
require.Equal(t, []string{
7038+
"step1",
7039+
"step2:0",
7040+
"item:1",
7041+
"step2:1",
7042+
"item:2",
7043+
"step3",
7044+
}, worker.Calls())
7045+
})
7046+
69397047
t.Run("JobThatReturnsJobCancelErrorIsImmediatelyCancelled", func(t *testing.T) {
69407048
t.Parallel()
69417049

@@ -7602,7 +7710,7 @@ func Test_NewClient_Validations(t *testing.T) {
76027710
},
76037711
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
76047712
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
7605-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
7713+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
76067714
},
76077715
},
76087716
{
@@ -7613,7 +7721,7 @@ func Test_NewClient_Validations(t *testing.T) {
76137721
},
76147722
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
76157723
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 2)
7616-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
7724+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 3)
76177725
},
76187726
},
76197727
{
@@ -7625,7 +7733,7 @@ func Test_NewClient_Validations(t *testing.T) {
76257733
},
76267734
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
76277735
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
7628-
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
7736+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
76297737
},
76307738
},
76317739
{
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"os"
8+
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
11+
"github.com/riverqueue/river"
12+
"github.com/riverqueue/river/riverdbtest"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
"github.com/riverqueue/river/rivershared/util/slogutil"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
)
18+
19+
type ResumableCursorArgs struct {
20+
IDs []int `json:"ids"`
21+
}
22+
23+
func (ResumableCursorArgs) Kind() string { return "resumable_cursor" }
24+
25+
type ResumableCursor struct {
26+
LastProcessedID int `json:"last_processed_id"`
27+
}
28+
29+
type ResumableCursorWorker struct {
30+
river.WorkerDefaults[ResumableCursorArgs]
31+
}
32+
33+
func (w *ResumableCursorWorker) Work(ctx context.Context, job *river.Job[ResumableCursorArgs]) error {
34+
river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
35+
for _, id := range job.Args.IDs {
36+
if id <= cursor.LastProcessedID {
37+
continue
38+
}
39+
40+
fmt.Printf("Processed %d\n", id)
41+
42+
if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
43+
return err
44+
}
45+
}
46+
47+
return nil
48+
})
49+
50+
return nil
51+
}
52+
53+
// Example_resumableCursor demonstrates the use of a resumable cursor step, a
54+
// step that can store arbitrary JSON state to resume a partially completed loop.
55+
func Example_resumableCursor() { //nolint:dupl
56+
ctx := context.Background()
57+
58+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
59+
if err != nil {
60+
panic(err)
61+
}
62+
defer dbPool.Close()
63+
64+
workers := river.NewWorkers()
65+
river.AddWorker(workers, &ResumableCursorWorker{})
66+
67+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
68+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
69+
Queues: map[string]river.QueueConfig{
70+
river.QueueDefault: {MaxWorkers: 100},
71+
},
72+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
73+
TestOnly: true, // suitable only for use in tests; remove for live environments
74+
Workers: workers,
75+
})
76+
if err != nil {
77+
panic(err)
78+
}
79+
80+
// Out of example scope, but used to wait until a job is worked.
81+
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
82+
defer subscribeCancel()
83+
84+
if err := riverClient.Start(ctx); err != nil {
85+
panic(err)
86+
}
87+
88+
if _, err = riverClient.Insert(ctx, ResumableCursorArgs{
89+
IDs: []int{1, 2, 3},
90+
}, nil); err != nil {
91+
panic(err)
92+
}
93+
94+
// Wait for jobs to complete. Only needed for purposes of the example test.
95+
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
96+
97+
if err := riverClient.Stop(ctx); err != nil {
98+
panic(err)
99+
}
100+
101+
// Output:
102+
// Processed 1
103+
// Processed 2
104+
// Processed 3
105+
}

example_resumable_job_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"os"
8+
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
11+
"github.com/riverqueue/river"
12+
"github.com/riverqueue/river/riverdbtest"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
"github.com/riverqueue/river/rivershared/util/slogutil"
16+
"github.com/riverqueue/river/rivershared/util/testutil"
17+
)
18+
19+
type ResumableArgs struct{}
20+
21+
func (ResumableArgs) Kind() string { return "resumable" }
22+
23+
type ResumableWorker struct {
24+
river.WorkerDefaults[ResumableArgs]
25+
}
26+
27+
func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
28+
river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
29+
fmt.Println("Step 1")
30+
return nil
31+
})
32+
33+
river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
34+
fmt.Println("Step 2")
35+
return nil
36+
})
37+
38+
river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
39+
fmt.Println("Step 3")
40+
return nil
41+
})
42+
43+
return nil
44+
}
45+
46+
// Example_resumable demonstrates the use of a "resumable job", a job that has
47+
// multiple steps, and which can be resumed after each one.
48+
func Example_resumable() { //nolint:dupl
49+
ctx := context.Background()
50+
51+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
52+
if err != nil {
53+
panic(err)
54+
}
55+
defer dbPool.Close()
56+
57+
workers := river.NewWorkers()
58+
river.AddWorker(workers, &ResumableWorker{})
59+
60+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
61+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
62+
Queues: map[string]river.QueueConfig{
63+
river.QueueDefault: {MaxWorkers: 100},
64+
},
65+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
66+
TestOnly: true, // suitable only for use in tests; remove for live environments
67+
Workers: workers,
68+
})
69+
if err != nil {
70+
panic(err)
71+
}
72+
73+
// Out of example scope, but used to wait until a job is worked.
74+
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
75+
defer subscribeCancel()
76+
77+
if err := riverClient.Start(ctx); err != nil {
78+
panic(err)
79+
}
80+
81+
if _, err = riverClient.Insert(ctx, ResumableArgs{}, nil); err != nil {
82+
panic(err)
83+
}
84+
85+
// Wait for jobs to complete. Only needed for purposes of the example test.
86+
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
87+
88+
if err := riverClient.Stop(ctx); err != nil {
89+
panic(err)
90+
}
91+
92+
// Output:
93+
// Step 1
94+
// Step 2
95+
// Step 3
96+
}

0 commit comments

Comments
 (0)