Skip to content

Commit d7b2366

Browse files
committed
Add new HookWorkEnd interface that runs after workers finish
Here, add a new complimentary pair for `HookWorkBegin`: `HookWorkEnd`, which runs after workers finish, taking in an error result. `HookWorkEnd` hooks may or may not modify the error result, choosing to suppress an error on pass it along the stack unchanged. This is driven by trying to add a new `nilerror` contrib package [1] that helps detect nil error-compliant structs that return non-nil error interfaces, which is a common footgun in Go [2]. [1] riverqueue/rivercontrib#25 [2] https://go.dev/doc/faq#nil_error
1 parent 85cc50b commit d7b2366

6 files changed

Lines changed: 86 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
- Added `river/riverlog` containing middleware that injects a context logger to workers that collates log output and persists it with job metadata. This is paired with a River UI enhancement that shows logs in the UI. [PR #844](https://github.com/riverqueue/river/pull/844).
1515
- Added `JobInsertMiddlewareFunc` and `WorkerMiddlewareFunc` to easily implement middleware with a function instead of a struct. [PR #844](https://github.com/riverqueue/river/pull/844).
1616
- Added `Config.Schema` which lets a non-default schema be injected explicitly into a River client that'll be used for all database operations. This may be particularly useful for proxies like PgBouncer that may not respect a schema configured in `search_path`. [PR #848](https://github.com/riverqueue/river/pull/848).
17+
- Added `rivertype.HookWorkEnd` hook interface that runs after a job has been worked. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).
1718

1819
### Changed
1920

hook_defaults_funcs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,13 @@ func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow)
3232
}
3333

3434
func (f HookWorkBeginFunc) IsHook() bool { return true }
35+
36+
// HookWorkEndFunc is a convenience helper for implementing
37+
// rivertype.HookworkEnd using a simple function instead of a struct.
38+
type HookWorkEndFunc func(ctx context.Context, err error) error
39+
40+
func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error {
41+
return f(ctx, err)
42+
}
43+
44+
func (f HookWorkEndFunc) IsHook() bool { return true }

internal/hooklookup/hook_lookup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type HookKind string
1515
const (
1616
HookKindInsertBegin HookKind = "insert_begin"
1717
HookKindWorkBegin HookKind = "work_begin"
18+
HookKindWorkEnd HookKind = "work_end"
1819
)
1920

2021
//
@@ -88,6 +89,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
8889
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
8990
}
9091
}
92+
case HookKindWorkEnd:
93+
for _, hook := range c.hooks {
94+
if typedHook, ok := hook.(rivertype.HookWorkEnd); ok {
95+
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
96+
}
97+
}
9198
}
9299

93100
return c.hooksByKind[kind]

internal/hooklookup/hook_lookup_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func TestHookLookup(t *testing.T) {
2222
&testHookInsertAndWorkBegin{},
2323
&testHookInsertBegin{},
2424
&testHookWorkBegin{},
25+
&testHookWorkEnd{},
2526
}).(*hookLookup), &testBundle{}
2627
}
2728

@@ -38,8 +39,11 @@ func TestHookLookup(t *testing.T) {
3839
&testHookInsertAndWorkBegin{},
3940
&testHookWorkBegin{},
4041
}, hookLookup.ByHookKind(HookKindWorkBegin))
42+
require.Equal(t, []rivertype.Hook{
43+
&testHookWorkEnd{},
44+
}, hookLookup.ByHookKind(HookKindWorkEnd))
4145

42-
require.Len(t, hookLookup.hooksByKind, 2)
46+
require.Len(t, hookLookup.hooksByKind, 3)
4347

4448
// Repeat lookups to make sure we get the same result.
4549
require.Equal(t, []rivertype.Hook{
@@ -50,6 +54,9 @@ func TestHookLookup(t *testing.T) {
5054
&testHookInsertAndWorkBegin{},
5155
&testHookWorkBegin{},
5256
}, hookLookup.ByHookKind(HookKindWorkBegin))
57+
require.Equal(t, []rivertype.Hook{
58+
&testHookWorkEnd{},
59+
}, hookLookup.ByHookKind(HookKindWorkEnd))
5360
})
5461

5562
t.Run("Stress", func(t *testing.T) {
@@ -118,6 +125,7 @@ func TestJobHookLookup(t *testing.T) {
118125

119126
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin))
120127
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin))
128+
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd))
121129
require.Equal(t, []rivertype.Hook{
122130
&testHookInsertAndWorkBegin{},
123131
&testHookInsertBegin{},
@@ -126,12 +134,16 @@ func TestJobHookLookup(t *testing.T) {
126134
&testHookInsertAndWorkBegin{},
127135
&testHookWorkBegin{},
128136
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin))
137+
require.Equal(t, []rivertype.Hook{
138+
&testHookWorkEnd{},
139+
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd))
129140

130141
require.Len(t, jobHookLookup.hookLookupByKind, 2)
131142

132143
// Repeat lookups to make sure we get the same result.
133144
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin))
134145
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin))
146+
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd))
135147
require.Equal(t, []rivertype.Hook{
136148
&testHookInsertAndWorkBegin{},
137149
&testHookInsertBegin{},
@@ -140,6 +152,9 @@ func TestJobHookLookup(t *testing.T) {
140152
&testHookInsertAndWorkBegin{},
141153
&testHookWorkBegin{},
142154
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin))
155+
require.Equal(t, []rivertype.Hook{
156+
&testHookWorkEnd{},
157+
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd))
143158
})
144159

145160
t.Run("Stress", func(t *testing.T) {
@@ -195,6 +210,7 @@ func (jobArgsWithCustomHooks) Hooks() []rivertype.Hook {
195210
&testHookInsertAndWorkBegin{},
196211
&testHookInsertBegin{},
197212
&testHookWorkBegin{},
213+
&testHookWorkEnd{},
198214
}
199215
}
200216

@@ -242,3 +258,15 @@ type testHookWorkBegin struct{ rivertype.Hook }
242258
func (t *testHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
243259
return nil
244260
}
261+
262+
//
263+
// testHookWorkEnd
264+
//
265+
266+
var _ rivertype.HookWorkEnd = &testHookWorkEnd{}
267+
268+
type testHookWorkEnd struct{ rivertype.Hook }
269+
270+
func (t *testHookWorkEnd) WorkEnd(ctx context.Context) error {
271+
return nil
272+
}

internal/jobexecutor/job_executor.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,18 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
208208
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
209209
defer cancel()
210210

211-
return e.WorkUnit.Work(ctx)
211+
err := e.WorkUnit.Work(ctx)
212+
213+
{
214+
for _, hook := range append(
215+
e.HookLookupGlobal.ByHookKind(hooklookup.HookKindWorkEnd),
216+
e.WorkUnit.HookLookup(e.HookLookupByJob).ByHookKind(hooklookup.HookKindWorkEnd)...,
217+
) {
218+
err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) //nolint:forcetypeassert
219+
}
220+
}
221+
222+
return err
212223
})
213224

214225
executeFunc := execution.MiddlewareChain(

rivertype/river_type.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,33 @@ type HookWorkBegin interface {
325325
WorkBegin(ctx context.Context, job *JobRow) error
326326
}
327327

328+
// HookWorkEnd is an interface to a hook that runs after a job has been worked.
329+
type HookWorkEnd interface {
330+
Hook
331+
332+
// WorkEnd is invoked after a job has been worked with the error result of
333+
// the worked job. It's invoked after any middleware has already run.
334+
//
335+
// WorkEnd may modify a returned work error or pass it through unchanged.
336+
// Each returned error is passed through to the next hook and the final
337+
// error result is returned from the job executor:
338+
//
339+
// err := e.WorkUnit.Work(ctx)
340+
// for _, hook := range hooks {
341+
// err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err)
342+
// }
343+
// return err
344+
//
345+
// If a hook does not want to modify an error result, it should make sure to
346+
// return whatever error value it received as its argument whether that
347+
// error is nil or not.
348+
//
349+
// Will not receive a common context related to HookWorkBegin because
350+
// WorkBegin doesn't return a context. Middleware should be used for this
351+
// sort of shared context instead.
352+
WorkEnd(ctx context.Context, error error) error
353+
}
354+
328355
// Middleware is an arbitrary interface for a struct which will execute some
329356
// arbitrary code at a predefined step in the job lifecycle.
330357
//

0 commit comments

Comments
 (0)