Skip to content

Commit 9a3f021

Browse files
authored
Add new HookWorkEnd interface that runs after workers finish (#863)
Here, add a new complimentary pair for `HookWorkBegin`: `HookWorkEnd`, which runs after workers finish, taking in an error result. `HookWorkEnd` hooks may or may not modify the error result, choosing to suppress an error on pass it along the stack unchanged. This is driven by trying to add a new `nilerror` contrib package [1] that helps detect nil error-compliant structs that return non-nil error interfaces, which is a common footgun in Go [2]. [1] riverqueue/rivercontrib#25 [2] https://go.dev/doc/faq#nil_error
1 parent 85cc50b commit 9a3f021

8 files changed

Lines changed: 326 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
- Added `river/riverlog` containing middleware that injects a context logger to workers that collates log output and persists it with job metadata. This is paired with a River UI enhancement that shows logs in the UI. [PR #844](https://github.com/riverqueue/river/pull/844).
1515
- Added `JobInsertMiddlewareFunc` and `WorkerMiddlewareFunc` to easily implement middleware with a function instead of a struct. [PR #844](https://github.com/riverqueue/river/pull/844).
1616
- Added `Config.Schema` which lets a non-default schema be injected explicitly into a River client that'll be used for all database operations. This may be particularly useful for proxies like PgBouncer that may not respect a schema configured in `search_path`. [PR #848](https://github.com/riverqueue/river/pull/848).
17+
- Added `rivertype.HookWorkEnd` hook interface that runs after a job has been worked. [PR #863](https://github.com/riverqueue/river/pull/863).
1718

1819
### Changed
1920

client_test.go

Lines changed: 121 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -741,33 +741,110 @@ func Test_Client(t *testing.T) {
741741
require.True(t, workBeginHookCalled)
742742
})
743743

744+
t.Run("WithGlobalWorkEndHook", func(t *testing.T) {
745+
t.Parallel()
746+
747+
_, bundle := setup(t)
748+
749+
workEndHookCalled := false
750+
751+
bundle.config.Hooks = []rivertype.Hook{
752+
HookWorkEndFunc(func(ctx context.Context, err error) error {
753+
workEndHookCalled = true
754+
return err
755+
}),
756+
}
757+
758+
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error {
759+
return nil
760+
}))
761+
762+
client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
763+
require.NoError(t, err)
764+
765+
subscribeChan := subscribe(t, client)
766+
startClient(ctx, t, client)
767+
768+
insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
769+
require.NoError(t, err)
770+
771+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
772+
require.Equal(t, EventKindJobCompleted, event.Kind)
773+
require.Equal(t, insertRes.Job.ID, event.Job.ID)
774+
775+
require.True(t, workEndHookCalled)
776+
})
777+
744778
t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) {
745779
t.Parallel()
746780

747781
_, bundle := setup(t)
748782

749-
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[jobArgsWithCustomHook]) error {
783+
type JobArgs struct {
784+
JobArgsReflectKind[JobArgs]
785+
hookEmbed[metadataHookInsertBegin]
786+
}
787+
788+
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
750789
return nil
751790
}))
752791

753792
client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
754793
require.NoError(t, err)
755794

756-
insertRes, err := client.Insert(ctx, jobArgsWithCustomHook{}, nil)
795+
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
757796
require.NoError(t, err)
758797

759798
var metadataMap map[string]any
760799
err = json.Unmarshal(insertRes.Job.Metadata, &metadataMap)
761800
require.NoError(t, err)
762-
require.Equal(t, "called", metadataMap["insert_begin_hook"])
801+
require.Equal(t, metadataHookCalled, metadataMap[metadataHookInsertBeginKey])
802+
})
803+
804+
t.Run("WithWorkBeginHookOnJobArgs", func(t *testing.T) { //nolint:dupl
805+
t.Parallel()
806+
807+
_, bundle := setup(t)
808+
809+
type JobArgs struct {
810+
JobArgsReflectKind[JobArgs]
811+
hookEmbed[metadataHookWorkBegin]
812+
}
813+
814+
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
815+
return nil
816+
}))
817+
818+
client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
819+
require.NoError(t, err)
820+
821+
subscribeChan := subscribe(t, client)
822+
startClient(ctx, t, client)
823+
824+
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
825+
require.NoError(t, err)
826+
827+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
828+
require.Equal(t, EventKindJobCompleted, event.Kind)
829+
require.Equal(t, insertRes.Job.ID, event.Job.ID)
830+
831+
var metadataMap map[string]any
832+
err = json.Unmarshal(event.Job.Metadata, &metadataMap)
833+
require.NoError(t, err)
834+
require.Equal(t, metadataHookCalled, metadataMap[metadataHookWorkBeginKey])
763835
})
764836

765-
t.Run("WithWorkBeginHookOnJobArgs", func(t *testing.T) {
837+
t.Run("WithWorkEndHookOnJobArgs", func(t *testing.T) { //nolint:dupl
766838
t.Parallel()
767839

768840
_, bundle := setup(t)
769841

770-
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[jobArgsWithCustomHook]) error {
842+
type JobArgs struct {
843+
JobArgsReflectKind[JobArgs]
844+
hookEmbed[metadataHookWorkEnd]
845+
}
846+
847+
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
771848
return nil
772849
}))
773850

@@ -777,7 +854,7 @@ func Test_Client(t *testing.T) {
777854
subscribeChan := subscribe(t, client)
778855
startClient(ctx, t, client)
779856

780-
insertRes, err := client.Insert(ctx, jobArgsWithCustomHook{}, nil)
857+
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
781858
require.NoError(t, err)
782859

783860
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
@@ -787,7 +864,7 @@ func Test_Client(t *testing.T) {
787864
var metadataMap map[string]any
788865
err = json.Unmarshal(event.Job.Metadata, &metadataMap)
789866
require.NoError(t, err)
790-
require.Equal(t, "called", metadataMap["work_begin_hook"])
867+
require.Equal(t, metadataHookCalled, metadataMap[metadataHookWorkEndKey])
791868
})
792869

793870
t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) {
@@ -1167,30 +1244,31 @@ func Test_Client(t *testing.T) {
11671244
})
11681245
}
11691246

1170-
type jobArgsWithCustomHook struct{}
1171-
1172-
func (jobArgsWithCustomHook) Kind() string { return "with_custom_hook" }
1247+
// hookEmbed can be embedded on a JobArgs to add a hook to it in such a way that
1248+
// it can be encapsulated within a test case.
1249+
type hookEmbed[T rivertype.Hook] struct{}
11731250

1174-
func (jobArgsWithCustomHook) Hooks() []rivertype.Hook {
1175-
return []rivertype.Hook{
1176-
&testHookInsertAndWorkBegin{},
1177-
}
1251+
func (f hookEmbed[T]) Hooks() []rivertype.Hook {
1252+
var hook T
1253+
return []rivertype.Hook{hook}
11781254
}
11791255

1180-
var (
1181-
_ rivertype.HookInsertBegin = &testHookInsertAndWorkBegin{}
1182-
_ rivertype.HookWorkBegin = &testHookInsertAndWorkBegin{}
1256+
const (
1257+
metadataHookCalled = "called"
1258+
metadataHookInsertBeginKey = "insert_begin"
1259+
metadataHookWorkBeginKey = "work_begin"
1260+
metadataHookWorkEndKey = "work_end"
11831261
)
11841262

1185-
type testHookInsertAndWorkBegin struct{ HookDefaults }
1263+
type metadataHookInsertBegin struct{ rivertype.Hook }
11861264

1187-
func (t *testHookInsertAndWorkBegin) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error {
1265+
func (metadataHookInsertBegin) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error {
11881266
var metadataMap map[string]any
11891267
if err := json.Unmarshal(params.Metadata, &metadataMap); err != nil {
11901268
return err
11911269
}
11921270

1193-
metadataMap["insert_begin_hook"] = "called"
1271+
metadataMap[metadataHookInsertBeginKey] = metadataHookCalled
11941272

11951273
var err error
11961274
params.Metadata, err = json.Marshal(metadataMap)
@@ -1201,17 +1279,38 @@ func (t *testHookInsertAndWorkBegin) InsertBegin(ctx context.Context, params *ri
12011279
return nil
12021280
}
12031281

1204-
func (t *testHookInsertAndWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
1282+
type metadataHookWorkBegin struct{ rivertype.Hook }
1283+
1284+
func (metadataHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
12051285
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
12061286
if !hasMetadataUpdates {
12071287
panic("expected to be called from within job executor")
12081288
}
12091289

1210-
metadataUpdates["work_begin_hook"] = "called"
1290+
metadataUpdates[metadataHookWorkBeginKey] = metadataHookCalled
12111291

12121292
return nil
12131293
}
12141294

1295+
type metadataHookWorkEnd struct{ rivertype.Hook }
1296+
1297+
func (metadataHookWorkEnd) WorkEnd(ctx context.Context, err error) error {
1298+
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
1299+
if !hasMetadataUpdates {
1300+
panic("expected to be called from within job executor")
1301+
}
1302+
1303+
metadataUpdates[metadataHookWorkEndKey] = metadataHookCalled
1304+
1305+
return err
1306+
}
1307+
1308+
var (
1309+
_ rivertype.HookInsertBegin = metadataHookInsertBegin{}
1310+
_ rivertype.HookWorkBegin = metadataHookWorkBegin{}
1311+
_ rivertype.HookWorkEnd = metadataHookWorkEnd{}
1312+
)
1313+
12151314
type workerWithMiddleware[T JobArgs] struct {
12161315
WorkerDefaults[T]
12171316
workFunc func(context.Context, *Job[T]) error

hook_defaults_funcs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,13 @@ func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow)
3232
}
3333

3434
func (f HookWorkBeginFunc) IsHook() bool { return true }
35+
36+
// HookWorkEndFunc is a convenience helper for implementing
37+
// rivertype.HookworkEnd using a simple function instead of a struct.
38+
type HookWorkEndFunc func(ctx context.Context, err error) error
39+
40+
func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error {
41+
return f(ctx, err)
42+
}
43+
44+
func (f HookWorkEndFunc) IsHook() bool { return true }

internal/hooklookup/hook_lookup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type HookKind string
1515
const (
1616
HookKindInsertBegin HookKind = "insert_begin"
1717
HookKindWorkBegin HookKind = "work_begin"
18+
HookKindWorkEnd HookKind = "work_end"
1819
)
1920

2021
//
@@ -88,6 +89,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
8889
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
8990
}
9091
}
92+
case HookKindWorkEnd:
93+
for _, hook := range c.hooks {
94+
if typedHook, ok := hook.(rivertype.HookWorkEnd); ok {
95+
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
96+
}
97+
}
9198
}
9299

93100
return c.hooksByKind[kind]

internal/hooklookup/hook_lookup_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func TestHookLookup(t *testing.T) {
2222
&testHookInsertAndWorkBegin{},
2323
&testHookInsertBegin{},
2424
&testHookWorkBegin{},
25+
&testHookWorkEnd{},
2526
}).(*hookLookup), &testBundle{}
2627
}
2728

@@ -38,8 +39,11 @@ func TestHookLookup(t *testing.T) {
3839
&testHookInsertAndWorkBegin{},
3940
&testHookWorkBegin{},
4041
}, hookLookup.ByHookKind(HookKindWorkBegin))
42+
require.Equal(t, []rivertype.Hook{
43+
&testHookWorkEnd{},
44+
}, hookLookup.ByHookKind(HookKindWorkEnd))
4145

42-
require.Len(t, hookLookup.hooksByKind, 2)
46+
require.Len(t, hookLookup.hooksByKind, 3)
4347

4448
// Repeat lookups to make sure we get the same result.
4549
require.Equal(t, []rivertype.Hook{
@@ -50,6 +54,9 @@ func TestHookLookup(t *testing.T) {
5054
&testHookInsertAndWorkBegin{},
5155
&testHookWorkBegin{},
5256
}, hookLookup.ByHookKind(HookKindWorkBegin))
57+
require.Equal(t, []rivertype.Hook{
58+
&testHookWorkEnd{},
59+
}, hookLookup.ByHookKind(HookKindWorkEnd))
5360
})
5461

5562
t.Run("Stress", func(t *testing.T) {
@@ -118,6 +125,7 @@ func TestJobHookLookup(t *testing.T) {
118125

119126
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin))
120127
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin))
128+
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd))
121129
require.Equal(t, []rivertype.Hook{
122130
&testHookInsertAndWorkBegin{},
123131
&testHookInsertBegin{},
@@ -126,12 +134,16 @@ func TestJobHookLookup(t *testing.T) {
126134
&testHookInsertAndWorkBegin{},
127135
&testHookWorkBegin{},
128136
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin))
137+
require.Equal(t, []rivertype.Hook{
138+
&testHookWorkEnd{},
139+
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd))
129140

130141
require.Len(t, jobHookLookup.hookLookupByKind, 2)
131142

132143
// Repeat lookups to make sure we get the same result.
133144
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindInsertBegin))
134145
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkBegin))
146+
require.Nil(t, jobHookLookup.ByJobArgs(&jobArgsNoHooks{}).ByHookKind(HookKindWorkEnd))
135147
require.Equal(t, []rivertype.Hook{
136148
&testHookInsertAndWorkBegin{},
137149
&testHookInsertBegin{},
@@ -140,6 +152,9 @@ func TestJobHookLookup(t *testing.T) {
140152
&testHookInsertAndWorkBegin{},
141153
&testHookWorkBegin{},
142154
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkBegin))
155+
require.Equal(t, []rivertype.Hook{
156+
&testHookWorkEnd{},
157+
}, jobHookLookup.ByJobArgs(&jobArgsWithCustomHooks{}).ByHookKind(HookKindWorkEnd))
143158
})
144159

145160
t.Run("Stress", func(t *testing.T) {
@@ -195,6 +210,7 @@ func (jobArgsWithCustomHooks) Hooks() []rivertype.Hook {
195210
&testHookInsertAndWorkBegin{},
196211
&testHookInsertBegin{},
197212
&testHookWorkBegin{},
213+
&testHookWorkEnd{},
198214
}
199215
}
200216

@@ -242,3 +258,15 @@ type testHookWorkBegin struct{ rivertype.Hook }
242258
func (t *testHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
243259
return nil
244260
}
261+
262+
//
263+
// testHookWorkEnd
264+
//
265+
266+
var _ rivertype.HookWorkEnd = &testHookWorkEnd{}
267+
268+
type testHookWorkEnd struct{ rivertype.Hook }
269+
270+
func (t *testHookWorkEnd) WorkEnd(ctx context.Context, err error) error {
271+
return nil
272+
}

internal/jobexecutor/job_executor.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,18 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
208208
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
209209
defer cancel()
210210

211-
return e.WorkUnit.Work(ctx)
211+
err := e.WorkUnit.Work(ctx)
212+
213+
{
214+
for _, hook := range append(
215+
e.HookLookupGlobal.ByHookKind(hooklookup.HookKindWorkEnd),
216+
e.WorkUnit.HookLookup(e.HookLookupByJob).ByHookKind(hooklookup.HookKindWorkEnd)...,
217+
) {
218+
err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) //nolint:forcetypeassert
219+
}
220+
}
221+
222+
return err
212223
})
213224

214225
executeFunc := execution.MiddlewareChain(

0 commit comments

Comments
 (0)