Skip to content

Commit e76361a

Browse files
authored
Experimental postgres query (PR 4/4): cancellation, timeout, TUI (#5143)
## PR Stack 1. [#5135](#5135) — PR 1: scaffold + autoscaling targeting + text output 2. [#5136](#5136) — PR 2: provisioned + JSON/CSV streaming + types 3. [#5138](#5138) — PR 3: multi-input + multi-statement rejection + error formatting 4. **PR 4 (this PR)** — [#5143](#5143) — cancellation + timeout + TUI for >30 rows Stacked on PR 3. ## Why PR 3 finished the input ergonomics. The remaining commitments before this command earns the "experimental" label: - A long SELECT shouldn't survive Ctrl+C. Today the pgx default closes the TCP socket but leaves the server-side query running. - CI scripts want `--timeout` so a single statement can't pin a runner. - Interactive users with >30 rows benefit from a scrollable browser instead of a wall of text. ## Changes **Before:** Ctrl+C tears down TCP but the query runs to completion server-side. `--timeout` doesn't exist. >30 rows scroll past in the terminal. **Now:** Ctrl+C cancels the in-flight query at the server. `--timeout 30s` enforces a per-statement deadline. >30 rows on a TTY open the libs/tableview viewer. Specifically: - **Cancellation watcher.** `buildPgxConfig` now installs `CancelRequestContextWatcherHandler` with `CancelRequestDelay=0, DeadlineDelay=5s`. Zero `DeadlineDelay` would race the cancel-request and could leave the connection unusable; 5s gives the cancel round-trip time to land. - **Signal handler.** Per-invocation goroutine watches SIGINT and SIGTERM. Calls cancel; defer'd stop drains the channel. - **--timeout flag.** Per-statement `context.WithTimeout` child of the signal-scoped ctx. `reportCancellation` distinguishes Ctrl+C ("Query cancelled."), timeout ("Query timed out after Xs."), and plain query errors. Returns `(msg, invocationScoped)` so the multi-input loop can drop the source prefix on user-cancel. - **TUI for >30 rows.** `textSink` now has an `interactive` mode; `runQuery` enables it when stdout is a prompt-capable TTY. Static tabwriter table for small results and pipes; libs/tableview for big interactive ones. If `tableview.Run` fails (TUI startup, terminal resize race) the sink falls through to the static tabwriter path so the user still sees the rows. Integration tests aren't included: aitools (the other experimental command) doesn't have them either. Acceptance + unit tests cover argument validation, targeting resolution (SDK-mocked), and output shapes; cancellation/timeout are unit-tested via the seam in `cancel_test.go`. Real-wire integration tests are the right addition when this command graduates from experimental. ## Test plan - [x] `go test ./experimental/postgres/...` (cancel/timeout/signal helpers, race-precedence pinning) - [x] `go test ./acceptance -run TestAccept/cmd/(psql|experimental/postgres)` (no regressions) - [x] `go tool ... golangci-lint run ./experimental/postgres/...` (0 issues) - [x] Manual: Ctrl+C during `SELECT pg_sleep(60)` cancels the server-side query within ~5s. (Smoked on `chatbot-lakebase-dev-simon-faltum` (e2-dogfood): SIGINT to `SELECT pg_sleep(60)` exited the client in 0.53s with `Error: Query cancelled.`; subsequent `pg_stat_activity` query returned zero rows.)
1 parent 67d4a5e commit e76361a

7 files changed

Lines changed: 325 additions & 32 deletions

File tree

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package postgrescmd
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestWithStatementTimeout_ZeroIsPassthrough(t *testing.T) {
13+
parent := t.Context()
14+
got, cancel := withStatementTimeout(parent, 0)
15+
defer cancel()
16+
// Parent and got should compare equal: zero timeout returns the parent
17+
// unchanged (and a no-op cancel).
18+
deadline, ok := got.Deadline()
19+
assert.False(t, ok, "deadline should not be set when timeout is 0")
20+
assert.True(t, deadline.IsZero())
21+
}
22+
23+
func TestWithStatementTimeout_AppliesDeadline(t *testing.T) {
24+
parent := t.Context()
25+
got, cancel := withStatementTimeout(parent, time.Second)
26+
defer cancel()
27+
deadline, ok := got.Deadline()
28+
assert.True(t, ok)
29+
assert.False(t, deadline.IsZero())
30+
}
31+
32+
func TestReportCancellation_SignalCanceled(t *testing.T) {
33+
signalCtx, signalCancel := context.WithCancel(t.Context())
34+
signalCancel()
35+
stmtCtx := signalCtx
36+
msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("anything"), 0)
37+
assert.Equal(t, "Query cancelled.", msg)
38+
assert.True(t, invocationScoped)
39+
}
40+
41+
func TestReportCancellation_TimeoutFired(t *testing.T) {
42+
signalCtx := t.Context()
43+
stmtCtx, stmtCancel := context.WithDeadline(signalCtx, time.Now().Add(-time.Second))
44+
defer stmtCancel()
45+
<-stmtCtx.Done()
46+
msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("query failed"), 5*time.Second)
47+
assert.Equal(t, "Query timed out after 5s.", msg)
48+
assert.True(t, invocationScoped)
49+
}
50+
51+
func TestReportCancellation_GenericError(t *testing.T) {
52+
signalCtx := t.Context()
53+
stmtCtx := signalCtx
54+
msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("syntax error"), 0)
55+
assert.Equal(t, "syntax error", msg)
56+
assert.False(t, invocationScoped)
57+
}
58+
59+
func TestReportCancellation_BothFire_CancelWinsRace(t *testing.T) {
60+
// User cancel and deadline both already done. Precedence: cancel wins
61+
// (the user's intent dominates a coincidental deadline). A future
62+
// reordering of the switch would silently flip this; the test pins it.
63+
signalCtx, signalCancel := context.WithCancel(t.Context())
64+
signalCancel()
65+
stmtCtx, stmtCancel := context.WithDeadline(signalCtx, time.Now().Add(-time.Second))
66+
defer stmtCancel()
67+
<-stmtCtx.Done()
68+
msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("anything"), time.Second)
69+
assert.Equal(t, "Query cancelled.", msg)
70+
assert.True(t, invocationScoped)
71+
}
72+
73+
func TestWatchInterruptSignals_CancelOnStop(t *testing.T) {
74+
// stop should cancel the parent context as a side-effect so the goroutine
75+
// terminates promptly. We don't actually send a SIGINT here (it would
76+
// also kill the test runner); we just verify stop cleans up.
77+
parent, parentCancel := context.WithCancel(t.Context())
78+
defer parentCancel()
79+
80+
cancelled := false
81+
cancel := func() {
82+
cancelled = true
83+
parentCancel()
84+
}
85+
86+
stop := watchInterruptSignals(parent, cancel)
87+
stop()
88+
assert.True(t, cancelled, "stop should call cancel to wake the goroutine")
89+
}

experimental/postgres/cmd/connect.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/databricks/cli/libs/log"
1414
"github.com/jackc/pgx/v5"
1515
"github.com/jackc/pgx/v5/pgconn"
16+
"github.com/jackc/pgx/v5/pgconn/ctxwatch"
1617
)
1718

1819
// defaultConnectTimeout is the dial timeout for a single connect attempt.
@@ -59,6 +60,19 @@ type connectFunc func(ctx context.Context, cfg *pgx.ConnConfig) (*pgx.Conn, erro
5960
// "Invalid protocol version: 196608". User, password, and connect timeout are
6061
// patched as fields because tokens can contain characters that would need
6162
// URL-escaping in userinfo.
63+
//
64+
// The context-watcher handler is overridden so context cancellation issues
65+
// a Postgres CancelRequest on the side-channel rather than only closing the
66+
// underlying TCP connection. Without this override, a Ctrl+C during a long
67+
// SELECT would tear down the TCP socket but leave the server-side query
68+
// running until it noticed the broken connection on its next write.
69+
//
70+
// CancelRequestDelay = 0: send the cancel-request immediately on ctx cancel.
71+
// The user just hit Ctrl+C; we want the server to learn now.
72+
// DeadlineDelay = 5s: if the cancel-request has not gotten the server to
73+
// terminate the query within 5s, fall back to deadlining the connection.
74+
// Zero DeadlineDelay would race the cancel-request and could leave the
75+
// connection unusable.
6276
func buildPgxConfig(c connectConfig) (*pgx.ConnConfig, error) {
6377
dsn := fmt.Sprintf("postgresql://%s/%s?sslmode=require",
6478
net.JoinHostPort(c.Host, strconv.Itoa(c.Port)),
@@ -70,6 +84,14 @@ func buildPgxConfig(c connectConfig) (*pgx.ConnConfig, error) {
7084
cfg.User = c.Username
7185
cfg.Password = c.Password
7286
cfg.ConnectTimeout = c.ConnectTimeout
87+
88+
cfg.BuildContextWatcherHandler = func(pgc *pgconn.PgConn) ctxwatch.Handler {
89+
return &pgconn.CancelRequestContextWatcherHandler{
90+
Conn: pgc,
91+
CancelRequestDelay: 0,
92+
DeadlineDelay: 5 * time.Second,
93+
}
94+
}
7395
return cfg, nil
7496
}
7597

experimental/postgres/cmd/query.go

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type queryFlags struct {
2727
connectTimeout time.Duration
2828
maxRetries int
2929
files []string
30+
timeout time.Duration
3031

3132
// outputFormat is the raw flag value. resolveOutputFormat turns it into
3233
// the effective format (which may differ when stdout is piped).
@@ -96,6 +97,7 @@ Limitations (this release):
9697
cmd.Flags().StringVarP(&f.database, "database", "d", defaultDatabase, "Database name")
9798
cmd.Flags().DurationVar(&f.connectTimeout, "connect-timeout", defaultConnectTimeout, "Connect timeout")
9899
cmd.Flags().IntVar(&f.maxRetries, "max-retries", 3, "Total connect attempts on idle/waking endpoint (must be >= 1; 1 disables retry)")
100+
cmd.Flags().DurationVar(&f.timeout, "timeout", 0, "Per-statement timeout (0 disables)")
99101
cmd.Flags().StringArrayVarP(&f.files, "file", "f", nil, "SQL file path (repeatable)")
100102
cmd.Flags().StringVarP(&f.outputFormat, "output", "o", string(sqlcli.OutputText), "Output format: text, json, or csv")
101103
cmd.RegisterFlagCompletionFunc("output", func(*cobra.Command, []string, string) ([]string, cobra.ShellCompDirective) {
@@ -176,16 +178,27 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla
176178
MaxDelay: 10 * time.Second,
177179
}
178180

181+
// Invocation-scoped context: cancelled by Ctrl+C/SIGTERM. Owns the
182+
// connection lifecycle. Per-statement timeouts are children of this so
183+
// a cancelled invocation also cancels the in-flight statement.
184+
signalCtx, signalCancel := context.WithCancel(ctx)
185+
defer signalCancel()
186+
187+
stopSignals := watchInterruptSignals(signalCtx, signalCancel)
188+
defer stopSignals()
189+
179190
// Spinner clears its line on Close, so the "Connecting to ..." status
180191
// disappears once the connection is up. cmdio.NewSpinner already writes
181192
// to stderr and degrades to a no-op in non-interactive terminals.
182-
sp := cmdio.NewSpinner(ctx)
193+
sp := cmdio.NewSpinner(signalCtx)
183194
sp.Update("Connecting to " + resolved.DisplayName)
184-
conn, err := connectWithRetry(ctx, pgxCfg, rc, pgx.ConnectConfig)
195+
conn, err := connectWithRetry(signalCtx, pgxCfg, rc, pgx.ConnectConfig)
185196
sp.Close()
186197
if err != nil {
187198
return err
188199
}
200+
// Close on a background ctx so a cancelled signalCtx does not abort a
201+
// clean teardown handshake.
189202
defer conn.Close(context.WithoutCancel(ctx))
190203

191204
out := cmd.OutOrStdout()
@@ -196,9 +209,16 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla
196209
// Avoids buffering rows for large exports and matches the v1 single-
197210
// input behaviour PR 2 shipped. Wrap the error so DETAIL / HINT
198211
// from a *pgconn.PgError surface even on the single-input path.
199-
sink := newSink(format, out, stderr)
200-
if err := executeOne(ctx, conn, units[0].SQL, sink); err != nil {
201-
return errors.New(formatPgError(err))
212+
// Promote-to-interactive only when stdout is a prompt-capable TTY so
213+
// a pipe falls back to the static table rather than launching a TUI
214+
// into a dead writer.
215+
sink := newSinkInteractive(format, out, stderr, stdoutTTY && cmdio.IsPromptSupported(ctx))
216+
stmtCtx, stmtCancel := withStatementTimeout(signalCtx, f.timeout)
217+
err := executeOne(stmtCtx, conn, units[0].SQL, sink)
218+
stmtCancel()
219+
if err != nil {
220+
msg, _ := reportCancellation(signalCtx, stmtCtx, err, f.timeout)
221+
return errors.New(msg)
202222
}
203223
return nil
204224
}
@@ -209,7 +229,9 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla
209229
// temp tables) carries across units because we hold the same connection.
210230
results := make([]*unitResult, 0, len(units))
211231
for _, u := range units {
212-
r, err := runUnitBuffered(ctx, conn, u)
232+
stmtCtx, stmtCancel := withStatementTimeout(signalCtx, f.timeout)
233+
r, err := runUnitBuffered(stmtCtx, conn, u)
234+
stmtCancel()
213235
if err != nil {
214236
// Render the successful prefix, then surface the error with
215237
// rich pgError formatting if applicable.
@@ -218,7 +240,14 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla
218240
// error to the user, the renderer error to debug logs.
219241
fmt.Fprintln(stderr, "warning: failed to render partial result:", rerr)
220242
}
221-
return formatExecutionError(u.Source, err)
243+
msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, err, f.timeout)
244+
if invocationScoped {
245+
// User cancel / timeout is invocation-scoped; the source
246+
// prefix is redundant ("--file foo.sql: Query cancelled."
247+
// reads worse than just "Query cancelled.").
248+
return errors.New(msg)
249+
}
250+
return errors.New(u.Source + ": " + msg)
222251
}
223252
results = append(results, r)
224253
}
@@ -231,15 +260,51 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla
231260
}
232261
}
233262

234-
// newSink returns the rowSink for the chosen output format. Kept separate
235-
// from runQuery so tests can build sinks without going through pgx.
236-
func newSink(format sqlcli.Format, out, stderr io.Writer) rowSink {
263+
// withStatementTimeout returns ctx unchanged (and a no-op cancel) when
264+
// timeout is zero, otherwise a child context with the timeout applied. Each
265+
// statement gets its own deadline so cancellation is scoped to one
266+
// statement at a time.
267+
func withStatementTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
268+
if timeout <= 0 {
269+
return parent, func() {}
270+
}
271+
return context.WithTimeout(parent, timeout)
272+
}
273+
274+
// reportCancellation distinguishes the three error cases that look the same
275+
// from `executeOne`'s POV (a wrapped pgconn / network error): user cancelled
276+
// via Ctrl+C, --timeout fired, or the statement just plain errored. Returns
277+
// the human-readable message and whether the cause is invocation-scoped
278+
// (cancel/timeout) rather than statement-scoped.
279+
//
280+
// Precedence: user cancel beats deadline. If both contexts fire near-
281+
// simultaneously (race), we report "cancelled" because the user's intent
282+
// dominates a coincidental timeout.
283+
func reportCancellation(signalCtx, stmtCtx context.Context, err error, timeout time.Duration) (msg string, invocationScoped bool) {
284+
switch {
285+
case errors.Is(signalCtx.Err(), context.Canceled):
286+
return "Query cancelled.", true
287+
case timeout > 0 && errors.Is(stmtCtx.Err(), context.DeadlineExceeded):
288+
return fmt.Sprintf("Query timed out after %s.", timeout), true
289+
default:
290+
return formatPgError(err), false
291+
}
292+
}
293+
294+
// newSinkInteractive returns the rowSink for the chosen output format. When
295+
// interactive is true the text sink may launch the libs/tableview viewer for
296+
// results larger than staticTableThreshold; when false it uses the static
297+
// tabwriter table.
298+
func newSinkInteractive(format sqlcli.Format, out, stderr io.Writer, interactive bool) rowSink {
237299
switch format {
238300
case sqlcli.OutputJSON:
239301
return newJSONSink(out, stderr)
240302
case sqlcli.OutputCSV:
241303
return newCSVSink(out, stderr)
242304
default:
305+
if interactive {
306+
return newInteractiveTextSink(out)
307+
}
243308
return newTextSink(out)
244309
}
245310
}
@@ -258,13 +323,6 @@ func renderPartial(out, stderr io.Writer, format sqlcli.Format, results []*unitR
258323
}
259324
}
260325

261-
// formatExecutionError produces the error returned to cobra when an input
262-
// unit failed. The message includes the source label so the user knows
263-
// which of N inputs blew up.
264-
func formatExecutionError(source string, err error) error {
265-
return fmt.Errorf("%s: %s", source, formatPgError(err))
266-
}
267-
268326
// multiStatementHint is appended to errMultipleStatements so users see the
269327
// recovery path inline.
270328
const multiStatementHint = "\nThis command runs one statement per input. To run multiple statements:\n" +

experimental/postgres/cmd/render.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,44 @@ import (
66
"strings"
77
"text/tabwriter"
88

9+
"github.com/databricks/cli/libs/tableview"
910
"github.com/jackc/pgx/v5/pgconn"
1011
)
1112

13+
// staticTableThreshold is the row count above which we hand off to
14+
// libs/tableview's interactive viewer (when stdout is interactive). Smaller
15+
// results stay in the static tabwriter path so they stream to a pipe
16+
// unchanged. Matches the threshold aitools query uses.
17+
const staticTableThreshold = 30
18+
1219
// textSink renders results as plain text: a tabwriter-aligned table for
1320
// rows-producing statements, the command tag for command-only ones.
1421
//
1522
// Text output buffers all rows because tabwriter needs the widest cell in each
1623
// column before it can align. Streaming output is provided by the JSON and CSV
1724
// sinks; users with huge result sets should pick those.
25+
//
26+
// When interactive is true and the result has more than staticTableThreshold
27+
// rows, End hands off to libs/tableview's scrollable viewer instead of
28+
// emitting the static table. The interactive path requires a real TTY and a
29+
// prompt-capable terminal; the caller decides.
1830
type textSink struct {
19-
out io.Writer
20-
columns []string
21-
rows [][]string
31+
out io.Writer
32+
interactive bool
33+
columns []string
34+
rows [][]string
2235
}
2336

2437
func newTextSink(out io.Writer) *textSink {
2538
return &textSink{out: out}
2639
}
2740

41+
// newInteractiveTextSink returns a text sink that uses the interactive table
42+
// viewer for results larger than staticTableThreshold.
43+
func newInteractiveTextSink(out io.Writer) *textSink {
44+
return &textSink{out: out, interactive: true}
45+
}
46+
2847
func (s *textSink) Begin(fields []pgconn.FieldDescription) error {
2948
s.columns = make([]string, len(fields))
3049
for i, f := range fields {
@@ -61,6 +80,16 @@ func (s *textSink) End(commandTag string) error {
6180
return err
6281
}
6382

83+
if s.interactive && len(s.rows) > staticTableThreshold {
84+
// Try the interactive viewer; on failure (TUI startup, terminal
85+
// resize race, etc.) fall through to the static path so the user
86+
// still sees the rows their query returned. Without this fallback
87+
// a successful query would surface as "viewer failed" with no data.
88+
if err := tableview.Run(s.out, s.columns, s.rows); err == nil {
89+
return nil
90+
}
91+
}
92+
6493
tw := tabwriter.NewWriter(s.out, 0, 0, 2, ' ', 0)
6594
fmt.Fprintln(tw, strings.Join(s.columns, "\t"))
6695
fmt.Fprintln(tw, strings.Join(headerSeparator(s.columns), "\t"))

0 commit comments

Comments
 (0)