Skip to content

Commit 790c67e

Browse files
authored
Add Config.SoftStopTimeout for a cleaner way to shut a client down (#1239)
A part of River code that's always bothered me is the graceful shutdown example [1]. It's really, really complicated, and I think that complicated orchestration is very likely a sign that our code isn't quite right. I initially tried putting the graceful shutdown code into a helper like `river.GracefulShutdown`, but I found that also problematic because it's not super clean to call, and even minor customizations would require a user to copy out the entire function. Playing with an LLM a bit, I came up with this alternative: build an understanding of soft/hard stop into the client's default `Stop` function by adding `Config.SoftStopTimeout`. By default, leaving this value unconfigured keeps the existing behavior of River today where a call to `Stop` waits unbounded time for jobs to finish working. Adding a `SoftStopTimeout` value brings in functionality similar to the graceful shutdown example: stopping proceeds normally waiting for jobs to finish, but in case they don't inside of `SoftStopTimeout`, they're cancelled as if `StopAndCancel` had been called. The soft timeout is also respected for a cancelled `Start` context, which makes the basic use of River for soft/hard stop very nice (you don't even need to call `Stop` anymore): // Use signal.NotifyContext to cancel the start context on SIGINT/SIGTERM. // When the signal fires, the client initiates a soft stop. If running jobs // don't finish within SoftStopTimeout, their contexts are automatically // cancelled. ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() if err := riverClient.Start(ctx); err != nil { panic(err) } [1] https://github.com/riverqueue/river/blob/master/example_graceful_shutdown_test.go
1 parent 6e9d05c commit 790c67e

5 files changed

Lines changed: 337 additions & 94 deletions

File tree

client.go

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,32 @@ type Config struct {
329329
// setting of Postgres `search_path`.
330330
Schema string
331331

332+
// SoftStopTimeout is the maximum amount of time that the client will wait
333+
// for running jobs to finish during a stop before their contexts are
334+
// cancelled. After the timeout elapses, the client escalates to a hard stop
335+
// by cancelling the context of all running jobs. This applies regardless of
336+
// how stop is initiated — whether by calling Stop, StopAndCancel, or by
337+
// cancelling the context passed to Start.
338+
//
339+
// In combination with signal.NotifyContext on the context passed to Start,
340+
// this can simplify graceful stop to:
341+
//
342+
// ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
343+
// defer stop()
344+
//
345+
// if err := client.Start(ctx); err != nil { ... }
346+
// <-client.Stopped()
347+
//
348+
// The signal cancels the Start context, which initiates a soft stop. If
349+
// running jobs haven't finished after SoftStopTimeout, their contexts are
350+
// automatically cancelled to trigger a hard stop.
351+
//
352+
// StopAndCancel bypasses the timeout entirely and cancels job contexts
353+
// immediately.
354+
//
355+
// Defaults to no timeout (wait indefinitely for jobs to finish).
356+
SoftStopTimeout time.Duration
357+
332358
// SkipJobKindValidation causes the job kind format validation check to be
333359
// skipped. This is available as an interim stopgap for users that have
334360
// invalid job kind names, but would rather disable the check rather than
@@ -458,6 +484,7 @@ func (c *Config) WithDefaults() *Config {
458484
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
459485
RetryPolicy: retryPolicy,
460486
Schema: c.Schema,
487+
SoftStopTimeout: c.SoftStopTimeout,
461488
SkipJobKindValidation: c.SkipJobKindValidation,
462489
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
463490
Test: c.Test,
@@ -1084,10 +1111,19 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
10841111
return err
10851112
}
10861113

1087-
// We use separate contexts for fetching and working to allow for a graceful
1088-
// stop. Both inherit from the provided context, so if it's cancelled, a
1089-
// more aggressive stop will be initiated.
1090-
workCtx, workCancel := context.WithCancelCause(ctx)
1114+
// We use separate contexts for fetching and working to allow for a
1115+
// graceful stop. When SoftStopTimeout is configured, the work context
1116+
// is detached from the start context so that cancelling the start
1117+
// context initiates a soft stop (with timeout escalation) rather than
1118+
// an immediate hard stop. When SoftStopTimeout is not configured, the
1119+
// work context inherits from the start context to preserve the
1120+
// existing behavior where cancelling the start context is equivalent
1121+
// to StopAndCancel.
1122+
workParentCtx := ctx
1123+
if c.config.SoftStopTimeout > 0 {
1124+
workParentCtx = context.WithoutCancel(ctx)
1125+
}
1126+
workCtx, workCancel := context.WithCancelCause(workParentCtx)
10911127

10921128
// Client available to executors and to various service hooks.
10931129
fetchCtx := withClient(fetchCtx, c)
@@ -1151,6 +1187,18 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
11511187
c.queues.startStopMu.Lock()
11521188
defer c.queues.startStopMu.Unlock()
11531189

1190+
// If SoftStopTimeout is configured, start a timer that will cancel
1191+
// the work context (escalating to a hard stop) if producers don't
1192+
// finish in time. StopAndCancel also calls workCancel, in which case
1193+
// this timer is a harmless no-op because the context is already done.
1194+
if c.config.SoftStopTimeout > 0 {
1195+
softStopTimer := time.AfterFunc(c.config.SoftStopTimeout, func() {
1196+
c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Soft stop timeout; cancelling remaining job contexts", slog.Duration("soft_stop_timeout", c.config.SoftStopTimeout))
1197+
c.workCancel(rivercommon.ErrStop)
1198+
})
1199+
defer softStopTimer.Stop()
1200+
}
1201+
11541202
// On stop, have the producers stop fetching first of all.
11551203
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
11561204
startstop.StopAllParallel(producersAsServices()...)
@@ -1183,6 +1231,10 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
11831231
// complete before exiting. If the provided context is done before shutdown has
11841232
// completed, Stop will return immediately with the context's error.
11851233
//
1234+
// If SoftStopTimeout is configured, running job contexts will be automatically
1235+
// cancelled after the timeout elapses, escalating to a hard stop. This also
1236+
// applies when stop is initiated by cancelling the context passed to Start.
1237+
//
11861238
// There's no need to call this method if a hard stop has already been initiated
11871239
// by cancelling the context passed to Start or by calling StopAndCancel.
11881240
func (c *Client[TTx]) Stop(ctx context.Context) error {
@@ -1211,6 +1263,12 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
12111263
// This can also be initiated by cancelling the context passed to Start. There is
12121264
// no need to call this method if the context passed to Start is cancelled
12131265
// instead.
1266+
//
1267+
// In most cases, using Stop with SoftStopTimeout configured is preferable to
1268+
// calling StopAndCancel directly. SoftStopTimeout gives running jobs a chance
1269+
// to finish before automatically escalating to context cancellation, providing
1270+
// graceful stop semantics without requiring manual orchestration of Stop and
1271+
// StopAndCancel.
12141272
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
12151273
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
12161274
c.workCancel(rivercommon.ErrStop)

client_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2361,6 +2361,122 @@ func Test_Client_StopAndCancel(t *testing.T) {
23612361
})
23622362
}
23632363

2364+
func Test_Client_SoftStopTimeout(t *testing.T) {
2365+
t.Parallel()
2366+
2367+
ctx := context.Background()
2368+
2369+
type JobArgs struct {
2370+
testutil.JobArgsReflectKind[JobArgs]
2371+
}
2372+
2373+
t.Run("EscalatesToHardStopAfterTimeout", func(t *testing.T) {
2374+
t.Parallel()
2375+
2376+
config := newTestConfig(t, "")
2377+
config.SoftStopTimeout = 100 * time.Millisecond
2378+
2379+
jobDoneChan := make(chan struct{})
2380+
jobStartedChan := make(chan struct{})
2381+
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
2382+
close(jobStartedChan)
2383+
<-ctx.Done() // only finishes when context is cancelled
2384+
close(jobDoneChan)
2385+
return nil
2386+
}))
2387+
2388+
client := runNewTestClient(ctx, t, config)
2389+
2390+
_, err := client.Insert(ctx, JobArgs{}, nil)
2391+
require.NoError(t, err)
2392+
2393+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
2394+
2395+
// Stop initiates a soft stop. The job won't finish on its own, but
2396+
// SoftStopTimeout should escalate to a hard stop after 100ms.
2397+
require.NoError(t, client.Stop(ctx))
2398+
2399+
// Verify the job's context was indeed cancelled.
2400+
select {
2401+
case <-jobDoneChan:
2402+
default:
2403+
t.Fatal("expected job to have been cancelled by soft stop timeout")
2404+
}
2405+
})
2406+
2407+
t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) {
2408+
t.Parallel()
2409+
2410+
config := newTestConfig(t, "")
2411+
config.SoftStopTimeout = 5 * time.Second
2412+
2413+
jobStartedChan := make(chan struct{})
2414+
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
2415+
close(jobStartedChan)
2416+
return nil // finishes immediately
2417+
}))
2418+
2419+
client := runNewTestClient(ctx, t, config)
2420+
2421+
_, err := client.Insert(ctx, JobArgs{}, nil)
2422+
require.NoError(t, err)
2423+
2424+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
2425+
2426+
// Stop should succeed quickly since the job finishes on its own.
2427+
// The 5s timeout should not fire.
2428+
require.NoError(t, client.Stop(ctx))
2429+
})
2430+
2431+
t.Run("ContextCancellationEscalatesAfterTimeout", func(t *testing.T) {
2432+
t.Parallel()
2433+
2434+
config := newTestConfig(t, "")
2435+
config.SoftStopTimeout = 100 * time.Millisecond
2436+
2437+
jobDoneChan := make(chan struct{})
2438+
jobStartedChan := make(chan struct{})
2439+
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
2440+
close(jobStartedChan)
2441+
<-ctx.Done()
2442+
close(jobDoneChan)
2443+
return nil
2444+
}))
2445+
2446+
var (
2447+
dbPool = riversharedtest.DBPool(ctx, t)
2448+
driver = riverpgxv5.New(dbPool)
2449+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
2450+
)
2451+
config.Schema = schema
2452+
2453+
client, err := NewClient(driver, config)
2454+
require.NoError(t, err)
2455+
2456+
startCtx, startCtxCancel := context.WithCancel(ctx)
2457+
defer startCtxCancel()
2458+
2459+
require.NoError(t, client.Start(startCtx))
2460+
2461+
_, err = client.Insert(ctx, JobArgs{}, nil)
2462+
require.NoError(t, err)
2463+
2464+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
2465+
2466+
// Cancel the start context. This should initiate a soft stop, then
2467+
// escalate to hard stop after SoftStopTimeout.
2468+
startCtxCancel()
2469+
2470+
riversharedtest.WaitOrTimeout(t, client.Stopped())
2471+
2472+
select {
2473+
case <-jobDoneChan:
2474+
default:
2475+
t.Fatal("expected job to have been cancelled by soft stop timeout")
2476+
}
2477+
})
2478+
}
2479+
23642480
type callbackWithCustomTimeoutArgs struct {
23652481
TimeoutValue time.Duration `json:"timeout"`
23662482
}

docs/README.md

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,39 @@ that inserted job kinds have a worker that can run them.
8787

8888
### Stopping
8989

90-
The client should also be stopped on program shutdown:
90+
The client should also be stopped on program shutdown. There's a number of ways
91+
to go about this (see [graceful shutdown]), but the shortest is to cancel the
92+
context send to `Start` when the program's ready to stop. For example, to stop
93+
on `SIGINT`/`SIGTERM`:
9194

9295
```go
93-
// Stop fetching new work and wait for active jobs to finish.
94-
if err := riverClient.Stop(ctx); err != nil {
96+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
97+
SoftStopTimeout: 10 * time.Second,
98+
...
99+
})
100+
if err != nil {
101+
panic(err)
102+
}
103+
104+
signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
105+
defer stop()
106+
107+
// Stop fetching new work and wait for active jobs to finish. Cancel jobs after
108+
// SoftStopTimeout elapses.
109+
if err := riverClient.Start(signalCtx); err != nil {
95110
panic(err)
96111
}
112+
113+
<-riverClient.Stopped()
97114
```
98115

99-
There are some complexities around ensuring clients stop cleanly, but also in a
100-
timely manner. See [graceful shutdown] for more details on River's stop modes.
116+
Alternatively, use an explicit call to `Stop`:
117+
118+
```go
119+
if err := riverClient.Stop(ctx); err != nil {
120+
panic(err)
121+
}
122+
```
101123

102124
[Insert-only clients](/docs/insert-only-clients) will insert jobs, but not work
103125
them, and don't need to be started or stopped.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package river_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log/slog"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
"time"
12+
13+
"github.com/jackc/pgx/v5/pgxpool"
14+
15+
"github.com/riverqueue/river"
16+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
17+
"github.com/riverqueue/river/rivershared/riversharedtest"
18+
"github.com/riverqueue/river/rivershared/util/slogutil"
19+
)
20+
21+
// Example_gracefulShutdownStopCancel demonstrates graceful stop with explicit
22+
// fallback to StopAndCancel. When a SIGINT/SIGTERM arrives, Stop initiates a
23+
// soft stop. If running jobs don't finish before the soft stop context expires,
24+
// StopAndCancel cancels their contexts (hard stop). This example is intended to
25+
// demonstrate advanced use of StopAndCancel. Generally, prefer the method shown
26+
// in Example_gracefulShutdown over the one here.
27+
func Example_gracefulShutdownStopAndCancel() {
28+
ctx := context.Background()
29+
30+
jobStarted := make(chan struct{})
31+
32+
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
33+
if err != nil {
34+
panic(err)
35+
}
36+
defer dbPool.Close()
37+
38+
workers := river.NewWorkers()
39+
river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted})
40+
41+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
42+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError, ReplaceAttr: slogutil.NoLevelTimeJobID})),
43+
Queues: map[string]river.QueueConfig{
44+
river.QueueDefault: {MaxWorkers: 100},
45+
},
46+
Workers: workers,
47+
}))
48+
if err != nil {
49+
panic(err)
50+
}
51+
52+
_, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil)
53+
if err != nil {
54+
panic(err)
55+
}
56+
57+
if err := riverClient.Start(ctx); err != nil {
58+
panic(err)
59+
}
60+
61+
// Use signal.NotifyContext to detect SIGINT/SIGTERM, but don't pass the
62+
// signal context to Start. Cancelling the Start context cancels running job
63+
// contexts immediately, which is equivalent to StopAndCancel.
64+
signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
65+
defer stop()
66+
67+
// Make sure our job starts being worked before doing anything else.
68+
<-jobStarted
69+
70+
// Cheat by sending ourselves a SIGTERM for the purpose of this example
71+
// (normally this is sent by user or supervisory process).
72+
selfProcess, _ := os.FindProcess(os.Getpid())
73+
_ = selfProcess.Signal(syscall.SIGTERM)
74+
75+
// Wait for the first signal handler to consume the SIGTERM and cancel the
76+
// start context before tearing it down.
77+
<-signalCtx.Done()
78+
stop()
79+
80+
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (waiting for jobs to finish)\n")
81+
82+
softStopCtx, softStopCancel := context.WithTimeout(ctx, 100*time.Millisecond)
83+
defer softStopCancel()
84+
85+
if err := riverClient.Stop(softStopCtx); err != nil {
86+
if !errors.Is(err, context.DeadlineExceeded) {
87+
panic(err)
88+
}
89+
90+
fmt.Printf("Soft stop timeout; cancelling remaining jobs\n")
91+
92+
if err := riverClient.StopAndCancel(ctx); err != nil {
93+
panic(err)
94+
}
95+
}
96+
97+
// Output:
98+
// Working job that doesn't finish until cancelled
99+
// Received SIGINT/SIGTERM; initiating soft stop (waiting for jobs to finish)
100+
// Soft stop timeout; cancelling remaining jobs
101+
// Job cancelled
102+
}

0 commit comments

Comments
 (0)