Skip to content

Commit 39c4f1f

Browse files
committed
Validation to detect duplicate step names in resumable jobs
Follows up #1226 to tighten things up a little with a validation. If a user accidentally puts in duplicate step names, detect this problem and return an error. A duplicate step name represents a step that can never be resumed from, so it makes sense to find out about this sooner rather than later.
1 parent a5db4c6 commit 39c4f1f

4 files changed

Lines changed: 93 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Changed
1111

1212
- Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276)
13+
- Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281)
1314

1415
### Fixed
1516

internal/rivermiddleware/middleware.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func (*ResumableMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doI
3434
}
3535

3636
state := &ResumableState{
37+
AllStepNames: make(map[string]struct{}),
3738
Cursors: make(map[string]json.RawMessage),
3839
ResumeMatched: true,
3940
ResumeStep: gjson.GetBytes(job.Metadata, rivercommon.MetadataKeyResumableStep).Str,
@@ -80,6 +81,7 @@ func (*ResumableMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doI
8081
// ResumableState holds the state for a resumable job execution. It is stored in
8182
// the context and accessed by ResumableStep and ResumableStepCursor.
8283
type ResumableState struct {
84+
AllStepNames map[string]struct{}
8385
CompletedStep string
8486
Cursors map[string]json.RawMessage
8587
Err error

resumable.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type StepOpts struct{}
4545

4646
// ResumableStep runs a resumable step, skipping the step on a later retry if
4747
// an earlier attempt already completed it successfully.
48+
// Step names must be unique across all ResumableStep and ResumableStepCursor
49+
// calls in the same Worker execution.
4850
//
4951
// After a step returns an error, no subsequent steps will be run and the
5052
// overall job will be marked as failed with that error. Be careful to put all
@@ -57,6 +59,9 @@ func ResumableStep(ctx context.Context, name string, opts *StepOpts, stepFunc fu
5759
if state.Err != nil {
5860
return
5961
}
62+
if !registerResumableStepName(state, name) {
63+
return
64+
}
6065

6166
if !state.ResumeMatched {
6267
if name == state.ResumeStep {
@@ -81,6 +86,8 @@ func ResumableStep(ctx context.Context, name string, opts *StepOpts, stepFunc fu
8186
// ResumableStepCursor runs a resumable step that also receives a persisted
8287
// cursor value from an earlier failed attempt, if one was recorded with
8388
// ResumableSetCursor.
89+
// Step names must be unique across all ResumableStep and ResumableStepCursor
90+
// calls in the same Worker execution.
8491
//
8592
// The cursor type T is user-specified. It may be a primitive value like an
8693
// integer ID, or a more complex type like a struct with multiple fields. It's
@@ -102,6 +109,9 @@ func ResumableStepCursor[TCursor any](ctx context.Context, name string, opts *St
102109
if state.Err != nil {
103110
return
104111
}
112+
if !registerResumableStepName(state, name) {
113+
return
114+
}
105115

106116
if !state.ResumeMatched {
107117
if name == state.ResumeStep {
@@ -149,6 +159,16 @@ func mustResumableState(ctx context.Context) *rivermiddleware.ResumableState {
149159
return state
150160
}
151161

162+
func registerResumableStepName(state *rivermiddleware.ResumableState, name string) bool {
163+
if _, ok := state.AllStepNames[name]; ok {
164+
state.Err = fmt.Errorf("river: duplicate resumable step name %q", name)
165+
return false
166+
}
167+
168+
state.AllStepNames[name] = struct{}{}
169+
return true
170+
}
171+
152172
func resumableStateFromContext(ctx context.Context) (*rivermiddleware.ResumableState, bool) {
153173
state := ctx.Value(rivermiddleware.ResumableContextKey{})
154174
if state == nil {

resumable_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,54 @@ func TestResumableStep(t *testing.T) {
2626
return ctx, metadataUpdates, &rivertype.JobRow{Metadata: []byte(metadata)}
2727
}
2828

29+
t.Run("DuplicateStepName", func(t *testing.T) {
30+
t.Parallel()
31+
32+
ctx, _, job := setup(t, `{}`)
33+
34+
var ran []string
35+
err := (&rivermiddleware.ResumableMiddleware{}).Work(ctx, job, func(ctx context.Context) error {
36+
ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
37+
ran = append(ran, "first")
38+
return nil
39+
})
40+
ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
41+
ran = append(ran, "second")
42+
return nil
43+
})
44+
45+
return nil
46+
})
47+
require.EqualError(t, err, `river: duplicate resumable step name "step1"`)
48+
require.Equal(t, []string{"first"}, ran)
49+
})
50+
51+
t.Run("DuplicateStepNameWhenSkippingCompletedSteps", func(t *testing.T) {
52+
t.Parallel()
53+
54+
ctx, _, job := setup(t, `{"river:resumable_step":"step2"}`)
55+
56+
var ran []string
57+
err := (&rivermiddleware.ResumableMiddleware{}).Work(ctx, job, func(ctx context.Context) error {
58+
ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
59+
ran = append(ran, "first")
60+
return nil
61+
})
62+
ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
63+
ran = append(ran, "second")
64+
return nil
65+
})
66+
ResumableStep(ctx, "step2", nil, func(ctx context.Context) error {
67+
ran = append(ran, "third")
68+
return nil
69+
})
70+
71+
return nil
72+
})
73+
require.EqualError(t, err, `river: duplicate resumable step name "step1"`)
74+
require.Empty(t, ran)
75+
})
76+
2977
t.Run("PanicsOutsideWorker", func(t *testing.T) {
3078
t.Parallel()
3179

@@ -131,6 +179,28 @@ func TestResumableStepCursor(t *testing.T) {
131179
return ctx, metadataUpdates, &rivertype.JobRow{Metadata: []byte(metadata)}
132180
}
133181

182+
t.Run("DuplicateStepNameSharedWithCursorStep", func(t *testing.T) {
183+
t.Parallel()
184+
185+
ctx, _, job := setup(t, `{}`)
186+
187+
var ran []string
188+
err := (&rivermiddleware.ResumableMiddleware{}).Work(ctx, job, func(ctx context.Context) error {
189+
ResumableStep(ctx, "step1", nil, func(ctx context.Context) error {
190+
ran = append(ran, "step")
191+
return nil
192+
})
193+
ResumableStepCursor(ctx, "step1", nil, func(ctx context.Context, cursor resumableCursor) error {
194+
ran = append(ran, "cursor")
195+
return nil
196+
})
197+
198+
return nil
199+
})
200+
require.EqualError(t, err, `river: duplicate resumable step name "step1"`)
201+
require.Equal(t, []string{"step"}, ran)
202+
})
203+
134204
t.Run("ResumesCursor", func(t *testing.T) {
135205
t.Parallel()
136206

0 commit comments

Comments
 (0)