Skip to content

Commit bff796e

Browse files
committed
Add CLI statement-timeout flag
River CLI Postgres commands had fixed session defaults and no explicit flag for statement timeout. For long-running migrations, callers either accepted the default or had to encode `statement_timeout` into the connection string. This could be difficult in deployed environments where i.e. a database connection string was shared with an application and the user did not want to accidentally affect timeouts for other processes using a shared connection string. This commit adds a root `--statement-timeout` flag and thread it through command bootstrapping into Postgres pool configuration. Statement timeout now uses explicit precedence: flag value first, then connection-string query parameter, then built-in default. `idle_in_transaction_session_timeout` keeps its existing behavior and remains URL/default driven.
1 parent 686f5b6 commit bff796e

5 files changed

Lines changed: 167 additions & 26 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added root River CLI flag `--statement-timeout` so Postgres session statement timeout can be set explicitly for commands like migrations. Explicit flag values take priority over database URL query params, and query params still take priority over built-in defaults. [PR #XXXX](https://github.com/riverqueue/river/pull/XXXX).
13+
1014
### Fixed
1115

1216
- `JobCountByQueueAndState` now returns consistent results across drivers, including requested queues with zero jobs, and deduplicates repeated queue names in input. This resolves an issue with the sqlite driver in River UI reported in [riverqueue/riverui#496](https://github.com/riverqueue/riverui#496). [PR #1140](https://github.com/riverqueue/river/pull/1140).

cmd/river/rivercli/command.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rivercli
22

33
import (
4+
"cmp"
45
"context"
56
"database/sql"
67
"fmt"
@@ -46,11 +47,12 @@ type CommandOpts interface {
4647

4748
// RunCommandBundle is a bundle of utilities for RunCommand.
4849
type RunCommandBundle struct {
49-
DatabaseURL *string
50-
DriverProcurer DriverProcurer
51-
Logger *slog.Logger
52-
OutStd io.Writer
53-
Schema string
50+
DatabaseURL *string
51+
DriverProcurer DriverProcurer
52+
Logger *slog.Logger
53+
OutStd io.Writer
54+
Schema string
55+
StatementTimeout *time.Duration
5456
}
5557

5658
// RunCommand bootstraps and runs a River CLI subcommand.
@@ -81,7 +83,7 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle
8183
if databaseURL != nil {
8284
switch protocol {
8385
case "postgres", "postgresql":
84-
dbPool, err := openPgxV5DBPool(ctx, *databaseURL)
86+
dbPool, err := openPgxV5DBPool(ctx, *databaseURL, bundle.StatementTimeout)
8587
if err != nil {
8688
return false, err
8789
}
@@ -128,7 +130,7 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle
128130
return nil
129131
}
130132

131-
func openPgxV5DBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) {
133+
func openPgxV5DBPool(ctx context.Context, databaseURL string, statementTimeout *time.Duration) (*pgxpool.Pool, error) {
132134
const (
133135
defaultIdleInTransactionSessionTimeout = 11 * time.Second // should be greater than statement timeout because statements count towards idle-in-transaction
134136
defaultStatementTimeout = 10 * time.Second
@@ -149,9 +151,27 @@ func openPgxV5DBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, er
149151
runtimeParams[name] = val
150152
}
151153

152-
setParamIfUnset(pgxConfig.ConnConfig.RuntimeParams, "application_name", "river CLI")
153-
setParamIfUnset(pgxConfig.ConnConfig.RuntimeParams, "idle_in_transaction_session_timeout", strconv.Itoa(int(defaultIdleInTransactionSessionTimeout.Milliseconds())))
154-
setParamIfUnset(pgxConfig.ConnConfig.RuntimeParams, "statement_timeout", strconv.Itoa(int(defaultStatementTimeout.Milliseconds())))
154+
millisecondsStringFromDurationPtr := func(duration *time.Duration) string {
155+
if duration == nil {
156+
return ""
157+
}
158+
159+
return strconv.Itoa(int(duration.Milliseconds()))
160+
}
161+
162+
runtimeParams := pgxConfig.ConnConfig.RuntimeParams
163+
if runtimeParams == nil {
164+
runtimeParams = make(map[string]string)
165+
pgxConfig.ConnConfig.RuntimeParams = runtimeParams
166+
}
167+
168+
setParamIfUnset(runtimeParams, "application_name", "river CLI")
169+
setParamIfUnset(runtimeParams, "idle_in_transaction_session_timeout", strconv.Itoa(int(defaultIdleInTransactionSessionTimeout.Milliseconds())))
170+
runtimeParams["statement_timeout"] = cmp.Or(
171+
millisecondsStringFromDurationPtr(statementTimeout),
172+
runtimeParams["statement_timeout"],
173+
strconv.Itoa(int(defaultStatementTimeout.Milliseconds())),
174+
)
155175

156176
dbPool, err := pgxpool.NewWithConfig(ctx, pgxConfig)
157177
if err != nil {

cmd/river/rivercli/river_cli.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ func (c *CLI) BaseCommandSet() *cobra.Command {
6060
ctx := context.Background()
6161

6262
var globalOpts struct {
63-
Debug bool
64-
Verbose bool
63+
Debug bool
64+
StatementTimeout time.Duration
65+
Verbose bool
6566
}
67+
var rootCmd *cobra.Command
6668

6769
makeLogger := func() *slog.Logger {
6870
switch {
@@ -77,16 +79,21 @@ func (c *CLI) BaseCommandSet() *cobra.Command {
7779

7880
// Make a bundle for RunCommand. Takes a database URL pointer because not every command is required to take a database URL.
7981
makeCommandBundle := func(databaseURL *string, schema string) *RunCommandBundle {
82+
var statementTimeout *time.Duration
83+
if rootCmd.PersistentFlags().Changed("statement-timeout") {
84+
statementTimeout = &globalOpts.StatementTimeout
85+
}
86+
8087
return &RunCommandBundle{
81-
DatabaseURL: databaseURL,
82-
DriverProcurer: c.driverProcurer,
83-
Logger: makeLogger(),
84-
OutStd: c.out,
85-
Schema: schema,
88+
DatabaseURL: databaseURL,
89+
DriverProcurer: c.driverProcurer,
90+
Logger: makeLogger(),
91+
OutStd: c.out,
92+
Schema: schema,
93+
StatementTimeout: statementTimeout,
8694
}
8795
}
8896

89-
var rootCmd *cobra.Command
9097
{
9198
var rootOpts struct {
9299
Version bool
@@ -116,6 +123,7 @@ PG* vars if it's been specified.
116123
rootCmd.SetOut(c.out)
117124

118125
rootCmd.PersistentFlags().BoolVar(&globalOpts.Debug, "debug", false, "output maximum logging verbosity (debug level)")
126+
rootCmd.PersistentFlags().DurationVar(&globalOpts.StatementTimeout, "statement-timeout", 0, "override Postgres statement_timeout (Go duration, e.g. 10s, 1m)")
119127
rootCmd.PersistentFlags().BoolVarP(&globalOpts.Verbose, "verbose", "v", false, "output additional logging verbosity (info level)")
120128
rootCmd.MarkFlagsMutuallyExclusive("debug", "verbose")
121129

@@ -151,7 +159,7 @@ before starting the client, and works until all jobs are finished.
151159
152160
The database in --database-url will have its jobs table truncated, so make sure
153161
to use a development database only.
154-
`),
162+
`),
155163
RunE: func(cmd *cobra.Command, args []string) error {
156164
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &bench{}, &opts)
157165
},
@@ -192,7 +200,7 @@ Defaults to running a single down migration. This behavior can be changed with
192200
SQL being run can be output using --show-sql, and executing real database
193201
operations can be prevented with --dry-run. Combine --show-sql and --dry-run to
194202
dump prospective migrations that would be applied to stdout.
195-
`),
203+
`),
196204
RunE: func(cmd *cobra.Command, args []string) error {
197205
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &migrateDown{}, &opts)
198206
},
@@ -235,7 +243,7 @@ Returns migrations for Postgres by default. Use --database-url to hint a
235243
non-Postgres alternative:
236244
237245
river migrate-get --version 6 --up --database-url sqlite:// > river_6.up.sql
238-
`),
246+
`),
239247
RunE: func(cmd *cobra.Command, args []string) error {
240248
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, ""), &migrateGet{}, &opts)
241249
},
@@ -265,7 +273,7 @@ non-Postgres alternative:
265273
Long: strings.TrimSpace(`
266274
List available migrations for the given line, showing the currently applied
267275
migration as determined by the database in --database-url.
268-
`),
276+
`),
269277
RunE: func(cmd *cobra.Command, args []string) error {
270278
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &migrateList{}, &opts)
271279
},
@@ -292,7 +300,7 @@ restricted with --max-steps or --target-version.
292300
SQL being run can be output using --show-sql, and executing real database
293301
operations can be prevented with --dry-run. Combine --show-sql and --dry-run to
294302
dump prospective migrations that would be applied to stdout.
295-
`),
303+
`),
296304
RunE: func(cmd *cobra.Command, args []string) error {
297305
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &migrateUp{}, &opts)
298306
},
@@ -314,7 +322,7 @@ are outstanding migrations that still need to be run.
314322
315323
Can be paired with river migrate-up --dry-run --show-sql to dump information on
316324
migrations that need to be run, but without running them.
317-
`),
325+
`),
318326
RunE: func(cmd *cobra.Command, args []string) error {
319327
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &validate{}, &opts)
320328
},
@@ -332,7 +340,7 @@ migrations that need to be run, but without running them.
332340
Short: "Print version information",
333341
Long: strings.TrimSpace(`
334342
Print River and Go version information.
335-
`),
343+
`),
336344
RunE: func(cmd *cobra.Command, args []string) error {
337345
return RunCommand(ctx, makeCommandBundle(nil, ""), &version{}, &versionOpts{Name: c.name})
338346
},

cmd/river/rivercli/river_cli_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"cmp"
66
"context"
77
"fmt"
8+
"maps"
89
"net/url"
910
"runtime/debug"
1011
"strings"
@@ -261,6 +262,109 @@ func TestBaseCommandSetNonParallel(t *testing.T) {
261262
cmd.SetArgs([]string{"migrate-up", "--schema", schema})
262263
require.NoError(t, cmd.Execute())
263264
})
265+
266+
t.Run("PostgresTimeoutPrecedence", func(t *testing.T) {
267+
type testCase struct {
268+
args []string
269+
expectedStatementTimeoutMS string
270+
name string
271+
}
272+
273+
makeCommandAndParams := func(t *testing.T) (*cobra.Command, func() map[string]string) {
274+
t.Helper()
275+
276+
var capturedRuntimeParams map[string]string
277+
278+
migratorStub := &MigratorStub{}
279+
migratorStub.allVersionsStub = func() []rivermigrate.Migration { return []rivermigrate.Migration{testMigration01} }
280+
migratorStub.getVersionStub = func(version int) (rivermigrate.Migration, error) {
281+
if version == 1 {
282+
return testMigration01, nil
283+
}
284+
285+
return rivermigrate.Migration{}, fmt.Errorf("unknown version: %d", version)
286+
}
287+
migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil }
288+
289+
cli := NewCLI(&Config{
290+
DriverProcurer: &DriverProcurerStub{
291+
getMigratorStub: func(config *rivermigrate.Config) (MigratorInterface, error) {
292+
return migratorStub, nil
293+
},
294+
initPgxV5Stub: func(pool *pgxpool.Pool) {
295+
capturedRuntimeParams = maps.Clone(pool.Config().ConnConfig.RuntimeParams)
296+
},
297+
},
298+
Name: "River",
299+
})
300+
301+
var out bytes.Buffer
302+
cli.SetOut(&out)
303+
304+
return cli.BaseCommandSet(), func() map[string]string {
305+
return capturedRuntimeParams
306+
}
307+
}
308+
309+
makeBaseDatabaseURL := func(t *testing.T) *url.URL {
310+
t.Helper()
311+
312+
testDatabaseURL := riversharedtest.TestDatabaseURL()
313+
parsedDatabaseURL, err := url.Parse(testDatabaseURL)
314+
require.NoError(t, err)
315+
316+
return parsedDatabaseURL
317+
}
318+
319+
testCases := []testCase{
320+
{
321+
name: "DefaultsAppliedWhenNothingSpecified",
322+
args: []string{},
323+
expectedStatementTimeoutMS: "10000",
324+
},
325+
{
326+
name: "DatabaseURLQueryParamsOverrideDefaults",
327+
args: []string{},
328+
expectedStatementTimeoutMS: "11234",
329+
},
330+
{
331+
name: "ExplicitFlagsOverrideDatabaseURLQueryParams",
332+
args: []string{
333+
"--statement-timeout", "1m3.123s",
334+
},
335+
expectedStatementTimeoutMS: "63123",
336+
},
337+
}
338+
339+
for _, tc := range testCases {
340+
t.Run(tc.name, func(t *testing.T) {
341+
cmd, getRuntimeParams := makeCommandAndParams(t)
342+
343+
databaseURL := makeBaseDatabaseURL(t)
344+
switch tc.name {
345+
case "DatabaseURLQueryParamsOverrideDefaults":
346+
queryValues := databaseURL.Query()
347+
queryValues.Set("statement_timeout", "11234")
348+
databaseURL.RawQuery = queryValues.Encode()
349+
case "ExplicitFlagsOverrideDatabaseURLQueryParams":
350+
queryValues := databaseURL.Query()
351+
queryValues.Set("statement_timeout", "12345")
352+
databaseURL.RawQuery = queryValues.Encode()
353+
}
354+
355+
args := append([]string{
356+
"migrate-get", "--up", "--version", "1", "--database-url", databaseURL.String(),
357+
}, tc.args...)
358+
cmd.SetArgs(args)
359+
require.NoError(t, cmd.Execute())
360+
361+
runtimeParams := getRuntimeParams()
362+
require.NotNil(t, runtimeParams)
363+
364+
require.Equal(t, tc.expectedStatementTimeoutMS, runtimeParams["statement_timeout"])
365+
})
366+
}
367+
})
264368
}
265369

266370
func TestBaseCommandSetDriverProcurerPgxV5(t *testing.T) {

docs/development.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ To run programs locally outside of tests, create and raise a development databas
3030
createdb river_dev
3131
go run ./cmd/river migrate-up --database-url postgres:///river_dev --line main
3232

33+
If needed, override Postgres timeouts for long-running migrations with root CLI
34+
flags:
35+
36+
go run ./cmd/river --statement-timeout 2m migrate-up --database-url postgres:///river_dev --line main
37+
3338
## Releasing a new version
3439

3540
1. Fetch changes to the repo and any new tags. Export `VERSION` by incrementing the last tag. Execute `update-mod-version` to add it the project's `go.mod` files:
@@ -71,4 +76,4 @@ Modify `go.work` so it contains the new desired version in `go` and/or `toolchai
7176

7277
```shell
7378
make update-mod-go
74-
```
79+
```

0 commit comments

Comments
 (0)