Skip to content

Commit e85ae6a

Browse files
authored
Logging package to add context logger for workers and stores output to metadata (#844)
After adding support for job output, it got me thinking that it might not be _that_ bad if we put in a way for users to be able to store limited job-specific logging to job records. Like with output, this would go in TOAST, and although it could be bad if too much data accumulated, it shouldn't generally effect performance. Here, prototype what that would look like. It's implemented as a middleware in a separate `riverlog` package, which must be installed to a client or worker to become active. Once it is, logging looks like: func (w *LoggingWorker) Work(ctx context.Context, job *river.Job[LoggingArgs]) error { riverlog.Logger(ctx).InfoContext(ctx, "Logged from worker") return nil } The middleware takes a function that returns a slog handler, which has the purpose of letting the user select between JSON, text, or something else, and lets them select options like logging level. I've left in enough flexibility that we should be able to extend the interface only slightly to implement a system that sends logs to S3 instead of metadata, but metadata is a good default. I figure that if we go down this route we can add special handling for it to River UI to make logs easily accessible for installations even without a full logging platform. I also ended up adding support to `Client` and `rivertest.Worker` for `rivershared.BaseService` in hooks and middleware so that if a hook or middleware has a base service embedded, it gets its archetype initialized just like other services in the client. This gives us an easy way to inherit a client's log instead of a separate one having to be injected which is _very_ convenient. This shouldn't be used outside of River core packages, but it should be okay if we make use it. Also added convenience helpers to make middleware like we already had for hooks: * `JobInsertMiddlewareFunc` * `WorkerMiddlewareFunc ` I put these in because they're great for creating a hook or middleware that's scoped to a single test case and which is a struct that can embed something like `baseservice.BaseService`.
1 parent 57a6b46 commit e85ae6a

19 files changed

Lines changed: 799 additions & 19 deletions

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added `river/riverlog` containing middleware that injects a context logger to workers that collates log output and persists it with job metadata. This is paired with a River UI enhancement that shows logs in the UI. [PR #844](https://github.com/riverqueue/river/pull/844).
13+
- Added `JobInsertMiddlewareFunc` and `WorkerMiddlewareFunc` to easily implement middleware with a function instead of a struct. [PR #844](https://github.com/riverqueue/river/pull/844).
14+
1015
### Changed
1116

1217
- Client no longer returns an error if stopped before startup could complete (previously, it returned the unexported `ErrShutdown`). [PR #841](https://github.com/riverqueue/river/pull/841).

client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
598598
}
599599
}
600600

601+
for _, hook := range config.Hooks {
602+
if withBaseService, ok := hook.(baseservice.WithBaseService); ok {
603+
baseservice.Init(archetype, withBaseService)
604+
}
605+
}
606+
601607
client := &Client[TTx]{
602608
config: config,
603609
driver: driver,
@@ -637,6 +643,13 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
637643

638644
middleware = append(middleware, workerMiddleware)
639645
}
646+
647+
for _, middleware := range middleware {
648+
if withBaseService, ok := middleware.(baseservice.WithBaseService); ok {
649+
baseservice.Init(archetype, withBaseService)
650+
}
651+
}
652+
640653
client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware)
641654
}
642655

client_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,40 @@ func Test_Client(t *testing.T) {
649649
require.True(t, insertBeginHookCalled)
650650
})
651651

652+
t.Run("HookArchetypeInitialized", func(t *testing.T) {
653+
t.Parallel()
654+
655+
_, bundle := setup(t)
656+
657+
type HookWithBaseService struct {
658+
baseservice.BaseService
659+
HookInsertBeginFunc
660+
}
661+
662+
var (
663+
hook = &HookWithBaseService{}
664+
hookCalled = false
665+
)
666+
hook.HookInsertBeginFunc = func(ctx context.Context, params *rivertype.JobInsertParams) error {
667+
hookCalled = true
668+
require.NotEmpty(t, hook.Name) // if name is non-empty, it means the base service was initialized properly
669+
return nil
670+
}
671+
672+
AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error {
673+
return nil
674+
}))
675+
676+
bundle.config.Hooks = []rivertype.Hook{hook}
677+
client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
678+
require.NoError(t, err)
679+
680+
_, err = client.Insert(ctx, callbackArgs{}, nil)
681+
require.NoError(t, err)
682+
683+
require.True(t, hookCalled)
684+
})
685+
652686
t.Run("WithGlobalWorkBeginHook", func(t *testing.T) {
653687
t.Parallel()
654688

@@ -2852,6 +2886,41 @@ func Test_Client_InsertManyTx(t *testing.T) {
28522886
require.JSONEq(t, `{"middleware": "called"}`, string(results[0].Job.Metadata))
28532887
})
28542888

2889+
t.Run("MiddlewareArchetypeInitialized", func(t *testing.T) {
2890+
t.Parallel()
2891+
2892+
_, bundle := setup(t)
2893+
2894+
config := newTestConfig(t, nil)
2895+
config.Queues = nil
2896+
2897+
type MiddlewareWithBaseService struct {
2898+
baseservice.BaseService
2899+
JobInsertMiddlewareFunc
2900+
}
2901+
2902+
var (
2903+
middleware = &MiddlewareWithBaseService{}
2904+
middlewareCalled bool
2905+
)
2906+
middleware.JobInsertMiddlewareFunc = func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
2907+
middlewareCalled = true
2908+
require.NotEmpty(t, middleware.Name) // if name is non-empty, it means the base service was initialized properly
2909+
return doInner(ctx)
2910+
}
2911+
2912+
config.Middleware = []rivertype.Middleware{middleware}
2913+
driver := riverpgxv5.New(nil)
2914+
client, err := NewClient(driver, config)
2915+
require.NoError(t, err)
2916+
2917+
results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{Args: noOpArgs{}}})
2918+
require.NoError(t, err)
2919+
require.Len(t, results, 1)
2920+
2921+
require.True(t, middlewareCalled)
2922+
})
2923+
28552924
t.Run("WithUniqueOpts", func(t *testing.T) {
28562925
t.Parallel()
28572926

common_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77

88
"github.com/riverqueue/river"
99
"github.com/riverqueue/river/rivershared/riversharedtest"
10+
"github.com/riverqueue/river/rivershared/util/sliceutil"
11+
"github.com/riverqueue/river/rivertype"
1012
)
1113

1214
//
@@ -29,7 +31,7 @@ func (w *NoOpWorker) Work(ctx context.Context, job *river.Job[NoOpArgs]) error {
2931

3032
// Wait on the given subscription channel for numJobs. Times out with a panic if
3133
// jobs take too long to be received.
32-
func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) {
34+
func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) []*rivertype.JobRow { //nolint:unparam
3335
var (
3436
timeout = riversharedtest.WaitTimeout()
3537
deadline = time.Now().Add(timeout)
@@ -42,7 +44,7 @@ func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) {
4244
events = append(events, event)
4345

4446
if len(events) >= numJobs {
45-
return
47+
return sliceutil.Map(events, func(e *river.Event) *rivertype.JobRow { return e.Job })
4648
}
4749

4850
case <-time.After(time.Until(deadline)):

example_graceful_shutdown_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,5 +169,5 @@ func Example_gracefulShutdown() {
169169
// Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)
170170
// Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)
171171
// Job cancelled
172-
// JobExecutor: Job errored; retrying
172+
// jobexecutor.JobExecutor: Job errored; retrying
173173
}

example_job_cancel_from_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,5 @@ func Example_jobCancelFromClient() {
9999
}
100100

101101
// Output:
102-
// JobExecutor: job cancelled remotely
102+
// jobexecutor.JobExecutor: job cancelled remotely
103103
}

hook_defaults_funcs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ type HookDefaults struct{}
1313

1414
func (d *HookDefaults) IsHook() bool { return true }
1515

16-
// HookInsertBeginFunc is a convenience helper for implementing HookInsertBegin
17-
// using a simple function instead of a struct.
16+
// HookInsertBeginFunc is a convenience helper for implementing
17+
// rivertype.HookInsertBegin using a simple function instead of a struct.
1818
type HookInsertBeginFunc func(ctx context.Context, params *rivertype.JobInsertParams) error
1919

2020
func (f HookInsertBeginFunc) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error {
@@ -23,8 +23,8 @@ func (f HookInsertBeginFunc) InsertBegin(ctx context.Context, params *rivertype.
2323

2424
func (f HookInsertBeginFunc) IsHook() bool { return true }
2525

26-
// HookWorkBeginFunc is a convenience helper for implementing HookworkBegin
27-
// using a simple function instead of a struct.
26+
// HookWorkBeginFunc is a convenience helper for implementing
27+
// rivertype.HookworkBegin using a simple function instead of a struct.
2828
type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error
2929

3030
func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {

middleware_defaults.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,16 @@ func (d *JobInsertMiddlewareDefaults) InsertMany(ctx context.Context, manyParams
2525
return doInner(ctx)
2626
}
2727

28+
// JobInsertMiddlewareFunc is a convenience helper for implementing
29+
// rivertype.JobInsertMiddleware using a simple function instead of a struct.
30+
type JobInsertMiddlewareFunc func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error)
31+
32+
func (f JobInsertMiddlewareFunc) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
33+
return f(ctx, manyParams, doInner)
34+
}
35+
36+
func (f JobInsertMiddlewareFunc) IsMiddleware() bool { return true }
37+
2838
// WorkerInsertMiddlewareDefaults is an embeddable struct that provides default
2939
// implementations for the rivertype.WorkerMiddleware. Use of this struct is
3040
// recommended in case rivertype.WorkerMiddleware is expanded in the future so
@@ -36,3 +46,13 @@ type WorkerMiddlewareDefaults struct{ MiddlewareDefaults }
3646
func (d *WorkerMiddlewareDefaults) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
3747
return doInner(ctx)
3848
}
49+
50+
// WorkerMiddlewareFunc is a convenience helper for implementing
51+
// rivertype.WorkerMiddleware using a simple function instead of a struct.
52+
type WorkerMiddlewareFunc func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error
53+
54+
func (f WorkerMiddlewareFunc) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
55+
return f(ctx, job, doInner)
56+
}
57+
58+
func (f WorkerMiddlewareFunc) IsMiddleware() bool { return true }

middleware_defaults_test.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,26 @@
11
package river
22

3-
import "github.com/riverqueue/river/rivertype"
3+
import (
4+
"context"
5+
6+
"github.com/riverqueue/river/rivertype"
7+
)
48

59
var (
610
_ rivertype.JobInsertMiddleware = &JobInsertMiddlewareDefaults{}
711
_ rivertype.WorkerMiddleware = &WorkerMiddlewareDefaults{}
12+
13+
_ rivertype.JobInsertMiddleware = JobInsertMiddlewareFunc(func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
14+
return doInner(ctx)
15+
})
16+
_ rivertype.Middleware = JobInsertMiddlewareFunc(func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
17+
return doInner(ctx)
18+
})
19+
20+
_ rivertype.Middleware = WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
21+
return doInner(ctx)
22+
})
23+
_ rivertype.WorkerMiddleware = WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
24+
return doInner(ctx)
25+
})
826
)

recorded_output.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
"github.com/riverqueue/river/rivertype"
1111
)
1212

13-
const maxOutputSize = 32 * 1024 * 1024 // 32MB
13+
const (
14+
maxOutputSizeMB = 32
15+
maxOutputSizeBytes = maxOutputSizeMB * 1024 * 1024
16+
)
1417

1518
// RecordOutput records output JSON from a job. The "output" can be any
1619
// JSON-encodable value and will be stored in the database on the job row after
@@ -55,8 +58,8 @@ func RecordOutput(ctx context.Context, output any) error {
5558

5659
// Postgres JSONB is limited to 255MB, but it would be a bad idea to get
5760
// anywhere close to that limit here.
58-
if len(metadataUpdatesBytes) > maxOutputSize {
59-
return fmt.Errorf("output is too large: %d bytes (max 32 MB)", len(metadataUpdatesBytes))
61+
if len(metadataUpdatesBytes) > maxOutputSizeBytes {
62+
return fmt.Errorf("output is too large: %d bytes (max %d MB)", len(metadataUpdatesBytes), maxOutputSizeMB)
6063
}
6164

6265
metadataUpdates[rivertype.MetadataKeyOutput] = json.RawMessage(metadataUpdatesBytes)

0 commit comments

Comments
 (0)