Skip to content

Commit c52de36

Browse files
committed
Add Config.SoftStopTimeout for a cleaner way to shut a client down
A part of River code that's always bothered me is the graceful shutdown example [1]. It's really, really complicated, and I think that complicated orchestration is very likely a sign that our code isn't quite right. I initially tried putting the graceful shutdown code into a helper like `river.GracefulShutdown`, but I found that also problematic because it's not super clean to call, and even minor customizations would require a user to copy out the entire function. Playing with an LLM a bit, I came up with this alternative: build an understanding of soft/hard stop into the client's default `Stop` function by adding `Config.SoftStopTimeout`. By default, leaving this value unconfigured keeps the existing behavior of River today where a call to `Stop` waits unbounded time for jobs to finish working. Adding a `SoftStopTimeout` value brings in functionality similar to the graceful shutdown example: stopping proceeds normally waiting for jobs to finish, but in case they don't inside of `SoftStopTimeout`, they're cancelled as if `StopAndCancel` had been called. The soft timeout is also respected for a cancelled `Start` context, which makes the basic use of River for soft/hard stop very nice (you don't even need to call `Stop` anymore): // Use signal.NotifyContext to cancel the start context on SIGINT/SIGTERM. // When the signal fires, the client initiates a soft stop. If running jobs // don't finish within SoftStopTimeout, their contexts are automatically // cancelled. ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() if err := riverClient.Start(ctx); err != nil { panic(err) } [1] https://github.com/riverqueue/river/blob/master/example_graceful_shutdown_test.go
1 parent c6d65e5 commit c52de36

3 files changed

Lines changed: 204 additions & 90 deletions

File tree

client.go

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,32 @@ type Config struct {
328328
// setting of Postgres `search_path`.
329329
Schema string
330330

331+
// SoftStopTimeout is the maximum amount of time that the client will wait
332+
// for running jobs to finish during a stop before their contexts are
333+
// cancelled. After the timeout elapses, the client escalates to a hard stop
334+
// by cancelling the context of all running jobs. This applies regardless of
335+
// how stop is initiated — whether by calling Stop, StopAndCancel, or by
336+
// cancelling the context passed to Start.
337+
//
338+
// In combination with signal.NotifyContext on the context passed to Start,
339+
// this can simplify graceful stop to:
340+
//
341+
// ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
342+
// defer stop()
343+
//
344+
// if err := client.Start(ctx); err != nil { ... }
345+
// <-client.Stopped()
346+
//
347+
// The signal cancels the Start context, which initiates a soft stop. If
348+
// running jobs haven't finished after SoftStopTimeout, their contexts are
349+
// automatically cancelled to trigger a hard stop.
350+
//
351+
// StopAndCancel bypasses the timeout entirely and cancels job contexts
352+
// immediately.
353+
//
354+
// Defaults to no timeout (wait indefinitely for jobs to finish).
355+
SoftStopTimeout time.Duration
356+
331357
// SkipJobKindValidation causes the job kind format validation check to be
332358
// skipped. This is available as an interim stopgap for users that have
333359
// invalid job kind names, but would rather disable the check rather than
@@ -457,6 +483,7 @@ func (c *Config) WithDefaults() *Config {
457483
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
458484
RetryPolicy: retryPolicy,
459485
Schema: c.Schema,
486+
SoftStopTimeout: c.SoftStopTimeout,
460487
SkipJobKindValidation: c.SkipJobKindValidation,
461488
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
462489
Test: c.Test,
@@ -1081,10 +1108,19 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
10811108
return err
10821109
}
10831110

1084-
// We use separate contexts for fetching and working to allow for a graceful
1085-
// stop. Both inherit from the provided context, so if it's cancelled, a
1086-
// more aggressive stop will be initiated.
1087-
workCtx, workCancel := context.WithCancelCause(ctx)
1111+
// We use separate contexts for fetching and working to allow for a
1112+
// graceful stop. When SoftStopTimeout is configured, the work context
1113+
// is detached from the start context so that cancelling the start
1114+
// context initiates a soft stop (with timeout escalation) rather than
1115+
// an immediate hard stop. When SoftStopTimeout is not configured, the
1116+
// work context inherits from the start context to preserve the
1117+
// existing behavior where cancelling the start context is equivalent
1118+
// to StopAndCancel.
1119+
workParentCtx := ctx
1120+
if c.config.SoftStopTimeout > 0 {
1121+
workParentCtx = context.WithoutCancel(ctx)
1122+
}
1123+
workCtx, workCancel := context.WithCancelCause(workParentCtx)
10881124

10891125
// Client available to executors and to various service hooks.
10901126
fetchCtx := withClient(fetchCtx, c)
@@ -1148,6 +1184,18 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
11481184
c.queues.startStopMu.Lock()
11491185
defer c.queues.startStopMu.Unlock()
11501186

1187+
// If SoftStopTimeout is configured, start a timer that will cancel
1188+
// the work context (escalating to a hard stop) if producers don't
1189+
// finish in time. StopAndCancel also calls workCancel, in which case
1190+
// this timer is a harmless no-op because the context is already done.
1191+
if c.config.SoftStopTimeout > 0 {
1192+
softStopTimer := time.AfterFunc(c.config.SoftStopTimeout, func() {
1193+
c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Soft stop timeout; cancelling remaining job contexts", slog.Duration("soft_stop_timeout", c.config.SoftStopTimeout))
1194+
c.workCancel(rivercommon.ErrStop)
1195+
})
1196+
defer softStopTimer.Stop()
1197+
}
1198+
11511199
// On stop, have the producers stop fetching first of all.
11521200
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
11531201
startstop.StopAllParallel(producersAsServices()...)
@@ -1180,6 +1228,10 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
11801228
// complete before exiting. If the provided context is done before shutdown has
11811229
// completed, Stop will return immediately with the context's error.
11821230
//
1231+
// If SoftStopTimeout is configured, running job contexts will be automatically
1232+
// cancelled after the timeout elapses, escalating to a hard stop. This also
1233+
// applies when stop is initiated by cancelling the context passed to Start.
1234+
//
11831235
// There's no need to call this method if a hard stop has already been initiated
11841236
// by cancelling the context passed to Start or by calling StopAndCancel.
11851237
func (c *Client[TTx]) Stop(ctx context.Context) error {
@@ -1208,6 +1260,12 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
12081260
// This can also be initiated by cancelling the context passed to Start. There is
12091261
// no need to call this method if the context passed to Start is cancelled
12101262
// instead.
1263+
//
1264+
// In most cases, using Stop with SoftStopTimeout configured is preferable to
1265+
// calling StopAndCancel directly. SoftStopTimeout gives running jobs a chance
1266+
// to finish before automatically escalating to context cancellation, providing
1267+
// graceful stop semantics without requiring manual orchestration of Stop and
1268+
// StopAndCancel.
12111269
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
12121270
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
12131271
c.workCancel(rivercommon.ErrStop)

client_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1933,6 +1933,122 @@ func Test_Client_StopAndCancel(t *testing.T) {
19331933
})
19341934
}
19351935

1936+
func Test_Client_SoftStopTimeout(t *testing.T) {
1937+
t.Parallel()
1938+
1939+
ctx := context.Background()
1940+
1941+
type JobArgs struct {
1942+
testutil.JobArgsReflectKind[JobArgs]
1943+
}
1944+
1945+
t.Run("EscalatesToHardStopAfterTimeout", func(t *testing.T) {
1946+
t.Parallel()
1947+
1948+
config := newTestConfig(t, "")
1949+
config.SoftStopTimeout = 100 * time.Millisecond
1950+
1951+
jobDoneChan := make(chan struct{})
1952+
jobStartedChan := make(chan struct{})
1953+
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
1954+
close(jobStartedChan)
1955+
<-ctx.Done() // only finishes when context is cancelled
1956+
close(jobDoneChan)
1957+
return nil
1958+
}))
1959+
1960+
client := runNewTestClient(ctx, t, config)
1961+
1962+
_, err := client.Insert(ctx, JobArgs{}, nil)
1963+
require.NoError(t, err)
1964+
1965+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
1966+
1967+
// Stop initiates a soft stop. The job won't finish on its own, but
1968+
// SoftStopTimeout should escalate to a hard stop after 100ms.
1969+
require.NoError(t, client.Stop(ctx))
1970+
1971+
// Verify the job's context was indeed cancelled.
1972+
select {
1973+
case <-jobDoneChan:
1974+
default:
1975+
t.Fatal("expected job to have been cancelled by soft stop timeout")
1976+
}
1977+
})
1978+
1979+
t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) {
1980+
t.Parallel()
1981+
1982+
config := newTestConfig(t, "")
1983+
config.SoftStopTimeout = 5 * time.Second
1984+
1985+
jobStartedChan := make(chan struct{})
1986+
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
1987+
close(jobStartedChan)
1988+
return nil // finishes immediately
1989+
}))
1990+
1991+
client := runNewTestClient(ctx, t, config)
1992+
1993+
_, err := client.Insert(ctx, JobArgs{}, nil)
1994+
require.NoError(t, err)
1995+
1996+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
1997+
1998+
// Stop should succeed quickly since the job finishes on its own.
1999+
// The 5s timeout should not fire.
2000+
require.NoError(t, client.Stop(ctx))
2001+
})
2002+
2003+
t.Run("ContextCancellationEscalatesAfterTimeout", func(t *testing.T) {
2004+
t.Parallel()
2005+
2006+
config := newTestConfig(t, "")
2007+
config.SoftStopTimeout = 100 * time.Millisecond
2008+
2009+
jobDoneChan := make(chan struct{})
2010+
jobStartedChan := make(chan struct{})
2011+
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
2012+
close(jobStartedChan)
2013+
<-ctx.Done()
2014+
close(jobDoneChan)
2015+
return nil
2016+
}))
2017+
2018+
var (
2019+
dbPool = riversharedtest.DBPool(ctx, t)
2020+
driver = riverpgxv5.New(dbPool)
2021+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
2022+
)
2023+
config.Schema = schema
2024+
2025+
client, err := NewClient(driver, config)
2026+
require.NoError(t, err)
2027+
2028+
startCtx, startCtxCancel := context.WithCancel(ctx)
2029+
defer startCtxCancel()
2030+
2031+
require.NoError(t, client.Start(startCtx))
2032+
2033+
_, err = client.Insert(ctx, JobArgs{}, nil)
2034+
require.NoError(t, err)
2035+
2036+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
2037+
2038+
// Cancel the start context. This should initiate a soft stop, then
2039+
// escalate to hard stop after SoftStopTimeout.
2040+
startCtxCancel()
2041+
2042+
riversharedtest.WaitOrTimeout(t, client.Stopped())
2043+
2044+
select {
2045+
case <-jobDoneChan:
2046+
default:
2047+
t.Fatal("expected job to have been cancelled by soft stop timeout")
2048+
}
2049+
})
2050+
}
2051+
19362052
type callbackWithCustomTimeoutArgs struct {
19372053
TimeoutValue time.Duration `json:"timeout"`
19382054
}

example_graceful_shutdown_test.go

Lines changed: 26 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package river_test
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"log/slog"
87
"os"
@@ -44,13 +43,10 @@ func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[Wait
4443
return ctx.Err()
4544
}
4645

47-
// Example_gracefulShutdown demonstrates a realistic-looking stop loop for
48-
// River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C
49-
// locally or on a platform like Heroku to stop a process) and when received,
50-
// tries a soft stop that waits for work to finish. If it doesn't finish in
51-
// time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs
52-
// using context cancellation. A third will give up on the stop procedure and
53-
// exit uncleanly.
46+
// Example_gracefulShutdown demonstrates graceful stop using SoftStopTimeout.
47+
// When a SIGINT/SIGTERM arrives, the start context is cancelled, which
48+
// initiates a soft stop. If running jobs don't finish within the configured
49+
// SoftStopTimeout, their contexts are automatically cancelled (hard stop).
5450
func Example_gracefulShutdown() {
5551
ctx := context.Background()
5652

@@ -66,13 +62,14 @@ func Example_gracefulShutdown() {
6662
river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted})
6763

6864
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
69-
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTimeJobID})),
65+
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError, ReplaceAttr: slogutil.NoLevelTimeJobID})),
7066
Queues: map[string]river.QueueConfig{
7167
river.QueueDefault: {MaxWorkers: 100},
7268
},
73-
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
74-
TestOnly: true, // suitable only for use in tests; remove for live environments
75-
Workers: workers,
69+
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
70+
SoftStopTimeout: 100 * time.Millisecond,
71+
TestOnly: true, // suitable only for use in tests; remove for live environments
72+
Workers: workers,
7673
})
7774
if err != nil {
7875
panic(err)
@@ -83,89 +80,32 @@ func Example_gracefulShutdown() {
8380
panic(err)
8481
}
8582

83+
// Use signal.NotifyContext to cancel the start context on SIGINT/SIGTERM.
84+
// When the signal fires, the client initiates a soft stop. If running jobs
85+
// don't finish within SoftStopTimeout, their contexts are automatically
86+
// cancelled.
87+
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
88+
defer stop()
89+
8690
if err := riverClient.Start(ctx); err != nil {
8791
panic(err)
8892
}
8993

90-
sigintOrTerm := make(chan os.Signal, 1)
91-
signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)
92-
93-
// This is meant to be a realistic-looking stop goroutine that might go in a
94-
// real program. It waits for SIGINT/SIGTERM and when received, tries to stop
95-
// gracefully by allowing a chance for jobs to finish. But if that isn't
96-
// working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and
97-
// it'll issue a hard stop that cancels the context of all active jobs. In
98-
// case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure
99-
// completely and exits uncleanly.
100-
go func() {
101-
<-sigintOrTerm
102-
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")
103-
104-
softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
105-
defer softStopCtxCancel()
106-
107-
go func() {
108-
select {
109-
case <-sigintOrTerm:
110-
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")
111-
softStopCtxCancel()
112-
case <-softStopCtx.Done():
113-
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")
114-
}
115-
}()
116-
117-
err := riverClient.Stop(softStopCtx)
118-
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
119-
panic(err)
120-
}
121-
if err == nil {
122-
fmt.Printf("Soft stop succeeded\n")
123-
return
124-
}
125-
126-
hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
127-
defer hardStopCtxCancel()
128-
129-
// As long as all jobs respect context cancellation, StopAndCancel will
130-
// always work. However, in the case of a bug where a job blocks despite
131-
// being cancelled, it may be necessary to either ignore River's stop
132-
// result (what's shown here) or have a supervisor kill the process.
133-
err = riverClient.StopAndCancel(hardStopCtx)
134-
if err != nil && errors.Is(err, context.DeadlineExceeded) {
135-
fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")
136-
} else if err != nil {
137-
panic(err)
138-
}
139-
140-
// hard stop succeeded
141-
}()
142-
14394
// Make sure our job starts being worked before doing anything else.
14495
<-jobStarted
14596

146-
// Cheat a little by sending a SIGTERM manually for the purpose of this
147-
// example (normally this will be sent by user or supervisory process). The
148-
// first SIGTERM tries a soft stop in which jobs are given a chance to
149-
// finish up.
150-
sigintOrTerm <- syscall.SIGTERM
151-
152-
// The soft stop will never work in this example because our job only
153-
// respects context cancellation, but wait a short amount of time to give it
154-
// a chance. After it elapses, send another SIGTERM to initiate a hard stop.
155-
select {
156-
case <-riverClient.Stopped():
157-
// Will never be reached in this example because our job will only ever
158-
// finish on context cancellation.
159-
fmt.Printf("Soft stop succeeded\n")
160-
161-
case <-time.After(100 * time.Millisecond):
162-
sigintOrTerm <- syscall.SIGTERM
163-
<-riverClient.Stopped()
164-
}
97+
// Cheat a little by sending ourselves a SIGTERM for the purpose of this
98+
// example (normally this is sent by user or supervisory process). The signal
99+
// cancels the start context, initiating a soft stop.
100+
selfProcess, _ := os.FindProcess(os.Getpid())
101+
selfProcess.Signal(syscall.SIGTERM)
102+
103+
// Wait for the client to stop. The job won't finish on its own (it only
104+
// responds to context cancellation), so after SoftStopTimeout elapses the
105+
// client will automatically cancel job contexts (hard stop).
106+
<-riverClient.Stopped()
165107

166108
// Output:
167109
// Working job that doesn't finish until cancelled
168-
// Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)
169-
// Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)
170110
// Job cancelled
171111
}

0 commit comments

Comments
 (0)