Skip to content

Commit f106f9b

Browse files
committed
[SEA-NodeJS] Accept-and-ignore unsupported per-statement options; eager-execute for fire-and-forget DDL/DML
The SEA backend hard-failed on four per-statement options the kernel does not expose at statement granularity, and it executed statements lazily (only at first fetch), so a side-effecting statement run via `executeStatement(...)` then `close()` with no fetch was silently dropped. Both diverged from the Thrift backend and from the Python connector's kernel backend. Options (SeaSessionBackend): `useCloudFetch`, `useLZ4Compression`, and `stagingAllowedLocalPath` are now accepted and ignored (no-op) instead of throwing — matching `KernelDatabricksClient.execute_command`, which takes the same flags and never reads them. The kernel governs CloudFetch via the session-level `ResultConfig`, decodes compression from the result manifest, and has no Volume API yet. `queryTimeout` is now a no-op (with a TODO): it must not be mapped onto the SEA `wait_timeout` wire field (valid range {0} ∪ [5,50]s, a different concept from a statement timeout — out-of-range values fail HTTP 400); the correct mechanism is the `STATEMENT_TIMEOUT` session config. Eager execute (SeaOperationBackend): the sync path now kicks off `result()` at construction so the statement runs server-side immediately, `close()` drives an un-fetched execution to completion (instead of cancelling an un-run statement) unless the caller explicitly cancelled, `waitUntilReadyCancellable` always awaits the memoised handle, and `status()` reports Succeeded off the resolved statement rather than the (now eagerly-set) fetch-handle promise. This restores Thrift / Python `use_kernel` eager-execute semantics while preserving mid-run cancellation. No extra round-trips: `result()` is still called exactly once. Verified: 248 SEA unit tests pass; e2e on a SQL warehouse confirms fire-and-forget DDL/DML now persists, cancel() still interrupts a running query, and the common execute/fetch/close path is unchanged (parity with Thrift). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 4b9e16e commit f106f9b

3 files changed

Lines changed: 157 additions & 124 deletions

File tree

lib/sea/SeaOperationBackend.ts

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,43 @@ export default class SeaOperationBackend implements IOperationBackend {
226226
this.lifecycleHandle = cancellableExecution
227227
? {
228228
cancel: () => cancellableExecution.cancel(),
229-
close: () => (this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel()),
229+
close: async () => {
230+
// Eager-execute parity with Thrift and Python `use_kernel`: a
231+
// statement run via `executeStatement(...).close()` with NO
232+
// intervening fetch must still take effect server-side (DDL/DML side
233+
// effects). If the execution was never driven to a result and the
234+
// caller did NOT explicitly cancel, drive it to completion here
235+
// rather than cancelling an un-run statement. Errors are swallowed —
236+
// `close()` is best-effort cleanup and must not throw; the read path
237+
// (fetch / finished) still surfaces execution errors.
238+
if (!this.lifecycle.isCancelled && this.blockingStatement === undefined) {
239+
await this.getFetchHandle().catch(() => {});
240+
}
241+
return this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel();
242+
},
230243
}
231244
: ((asyncStatement ?? statement) as SeaStatementHandle);
232245
this.context = context;
233246
this._id =
234247
id ?? asyncStatement?.statementId ?? statement?.statementId ?? cancellableExecution?.statementId ?? uuidv4();
235248
this.queryTimeoutMs = queryTimeoutSecs !== undefined && queryTimeoutSecs > 0 ? queryTimeoutSecs * 1000 : undefined;
249+
250+
// Eager-execute parity (Thrift / Python `use_kernel`): on the sync
251+
// (`runAsync: false`) path, start driving the blocking `result()` now so the
252+
// statement runs server-side immediately instead of lazily at first fetch —
253+
// otherwise an `executeStatement(...).close()` with no fetch would never run
254+
// the statement. `getFetchHandle()` memoises the in-flight execution, so
255+
// fetch / waitUntilReady / close all await this same drive and never
256+
// dispatch a second `result()`. The detached `.catch` only suppresses an
257+
// unhandled-rejection warning for the fire-and-forget case; genuine awaiters
258+
// still observe the rejection through the memoised promise.
259+
if (this.cancellableExecution !== undefined) {
260+
// Fire-and-forget: the `.catch` swallows the rejection ONLY on this
261+
// detached continuation to avoid an unhandled-rejection warning when
262+
// nobody fetches; the memoised `fetchHandlePromise` stays rejected so real
263+
// awaiters (fetch / waitUntilReady / close) still observe the error.
264+
this.getFetchHandle().catch(() => {});
265+
}
236266
}
237267

238268
public get id(): string {
@@ -384,9 +414,12 @@ export default class SeaOperationBackend implements IOperationBackend {
384414
if (this.cancellableExecution) {
385415
// Sync (`runAsync: false`) path: the kernel `execute()` blocks and polls
386416
// server-side; there is no per-status RPC to query while it runs. Report
387-
// Running until `result()` has materialised the terminal statement, then
417+
// Running until `result()` has RESOLVED the terminal statement, then
388418
// Succeeded — mirroring the kernel's blocking-then-terminal lifecycle.
389-
const state = this.fetchHandlePromise ? OperationState.Succeeded : OperationState.Running;
419+
// NOTE: key this off `blockingStatement` (set only once `result()`
420+
// resolves), NOT `fetchHandlePromise` — the constructor's eager kick-off
421+
// sets `fetchHandlePromise` immediately, so it no longer signals terminal.
422+
const state = this.blockingStatement ? OperationState.Succeeded : OperationState.Running;
390423
return { state, hasResultSet: true };
391424
}
392425
// Metadata path: the kernel statement is already terminal.
@@ -537,15 +570,15 @@ export default class SeaOperationBackend implements IOperationBackend {
537570
* metadata path's single completion tick.
538571
*/
539572
private async waitUntilReadyCancellable(options?: IOperationBackendWaitOptions): Promise<void> {
540-
// Already materialised → terminal-and-ready, nothing to wait for.
541-
if (this.fetchHandlePromise) {
542-
return;
543-
}
544573
// A JS-initiated cancel/close before we start short-circuits to the typed
545574
// state error rather than dispatching the blocking execute.
546575
failIfNotActive(this.lifecycle);
547-
// `getFetchHandle()` drives `result()` and memoises the resolved Statement
548-
// (also stored on `blockingStatement` so `close()` can reach it).
576+
// Drive (or, after the constructor's eager kick-off, await the in-flight)
577+
// `result()` to the terminal `Statement`. `getFetchHandle()` is memoised, so
578+
// this awaits the same execution and never dispatches a second `result()`.
579+
// NOTE: we must NOT early-return merely because `fetchHandlePromise` is set —
580+
// with eager execution it is set immediately but not yet resolved, so a bare
581+
// return would report "ready" before the statement actually finished.
549582
await this.getFetchHandle();
550583
// Single completion tick, matching the metadata path.
551584
if (options?.callback) {

lib/sea/SeaSessionBackend.ts

Lines changed: 43 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import ParameterError from '../errors/ParameterError';
3535
import { LogLevel } from '../contracts/IDBSQLLogger';
3636
import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader';
3737
import { decodeNapiKernelError } from './SeaErrorMapping';
38-
import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend';
3938
import SeaOperationBackend from './SeaOperationBackend';
4039
import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams';
4140
import { seaServerInfoValue } from './SeaServerInfo';
@@ -122,38 +121,41 @@ export default class SeaSessionBackend implements ISessionBackend {
122121
* Per-statement options forwarded to the kernel `ExecuteOptions`:
123122
* - `ordinalParameters` / `namedParameters` → bound params (mutually
124123
* exclusive — the kernel binds one placeholder style per statement);
125-
* - `queryTimeout` → enforced client-side by the operation backend's poll
126-
* deadline (the kernel ignores `queryTimeoutSecs` on the async submit
127-
* path), NOT forwarded to the napi options;
128124
* - `rowLimit` → `rowLimit` (SEA-only server-side row cap);
129125
* - `queryTags` → serialised into the conf overlay's reserved
130126
* `query_tags` key (the same wire shape Thrift's `serializeQueryTags`
131127
* produces), merged with any explicit `statementConf`.
132128
*
133-
* Still rejected (genuinely unsupported on SEA, rather than silently
134-
* dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a
135-
* per-statement knob), `useLZ4Compression` (kernel owns result compression),
136-
* and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by
137-
* the facade at fetch time, so it is intentionally not handled here.
129+
* Accepted but IGNORED (no-op — the kernel exposes no per-statement knob, so
130+
* we drop rather than reject; see the body for details and TODOs):
131+
* `useCloudFetch`, `useLZ4Compression`, `stagingAllowedLocalPath`, and
132+
* `queryTimeout`. `maxRows` is applied by the facade at fetch time, so it is
133+
* intentionally not handled here.
138134
*/
139135
public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise<IOperationBackend> {
140136
this.failIfClosed();
141137

142-
if (options.useCloudFetch !== undefined) {
143-
throw new HiveDriverError(
144-
'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA',
145-
);
146-
}
147-
if (options.useLZ4Compression !== undefined) {
148-
throw new HiveDriverError(
149-
'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)',
150-
);
151-
}
152-
if (options.stagingAllowedLocalPath !== undefined) {
153-
throw new HiveDriverError(
154-
'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA',
155-
);
156-
}
138+
// `useCloudFetch`, `useLZ4Compression`, and `stagingAllowedLocalPath` are
139+
// accepted and IGNORED (no-op) on the kernel-backed SEA path rather than
140+
// rejected — the kernel exposes no per-statement knob for any of them, so a
141+
// hard failure would break callers that set these options globally. This
142+
// mirrors the Python connector's kernel backend
143+
// (`KernelDatabricksClient.execute_command`), which takes the same flags and
144+
// never reads them.
145+
//
146+
// - `useCloudFetch`: result transport is governed by the session-level
147+
// kernel `ResultConfig.cloudfetch_enabled` (default: CloudFetch on);
148+
// there is no per-statement override on the napi surface.
149+
// - `useLZ4Compression`: the kernel transparently decodes whatever
150+
// compression the server returns (`manifest.result_compression`) and
151+
// exposes no compression-request knob.
152+
// - `stagingAllowedLocalPath`: the kernel has no Volume (PUT/GET/REMOVE)
153+
// API yet, so `SeaOperationBackend` always reports
154+
// `isStagingOperation: false` and `DBSQLSession` treats such statements
155+
// as ordinary queries. Non-staging queries that set the option run
156+
// normally (parity with Thrift).
157+
// TODO(SEA): wire real volume operations once the kernel exposes a
158+
// Volume API + napi `is_volume_operation`.
157159

158160
// `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel-
159161
// specific use of the option — the Thrift backend hardcodes `runAsync: true`
@@ -166,26 +168,27 @@ export default class SeaSessionBackend implements ISessionBackend {
166168
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
167169
// `executeStatementCancellable`: the kernel blocks on `execute()`
168170
// (server-side direct-results / poll-to-terminal), which is faster and,
169-
// with the napi sync canceller, fully cancellable mid-COMPUTE. The
170-
// blocking drive runs in the operation backend's `result()` (inside
171-
// `waitUntilReady`, which the facade invokes lazily at first fetch).
172-
// `queryTimeoutSecs` IS honoured on this path (forwarded to the napi
173-
// options below) since the kernel `execute()` consults it.
171+
// with the napi sync canceller, fully cancellable mid-COMPUTE.
174172
//
175173
// - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server
176174
// returns a pending `AsyncStatement` immediately while the query runs;
177175
// the backend polls `status()` to terminal in `waitUntilReady()` and
178-
// materialises results via `awaitResult()`. `queryTimeoutSecs` is
179-
// ignored by the kernel on submit, so it is enforced client-side by the
180-
// operation backend's poll-loop deadline instead.
176+
// materialises results via `awaitResult()`.
177+
//
178+
// TODO(SEA): `queryTimeout` is intentionally a NO-OP here. It must NOT be
179+
// mapped to the SEA `wait_timeout` wire field: `wait_timeout` is the
180+
// inline-result wait knob (valid range {0} ∪ [5,50]s, paired with
181+
// `on_wait_timeout`), a different concept from a server statement-execution
182+
// timeout, and out-of-range values fail with HTTP 400. The correct SEA
183+
// mechanism is the `STATEMENT_TIMEOUT` session configuration (seconds); the
184+
// Python connector forwards no per-statement timeout at all. Wiring this
185+
// properly (STATEMENT_TIMEOUT and/or a client-side poll deadline) is
186+
// deferred — until then the option is accepted and ignored.
181187
const runAsync = options.runAsync ?? false;
182-
const queryTimeoutSecs =
183-
options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined;
188+
189+
const execOptions = this.buildExecuteOptions(options);
184190

185191
if (!runAsync) {
186-
// Sync path: forward `queryTimeoutSecs` to the napi options — the kernel
187-
// `execute()` honours it (server statement timeout).
188-
const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs);
189192
let cancellableExecution;
190193
try {
191194
cancellableExecution =
@@ -198,16 +201,9 @@ export default class SeaSessionBackend implements ISessionBackend {
198201
return new SeaOperationBackend({
199202
cancellableExecution: cancellableExecution!,
200203
context: this.context,
201-
// The kernel honours `queryTimeoutSecs` on the sync `execute` path, so
202-
// it is forwarded via the napi options (see `buildExecuteOptions`); the
203-
// backend also keeps it as a deadline guard for parity with async.
204-
queryTimeoutSecs,
205204
});
206205
}
207206

208-
// Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on
209-
// submit — `wait_timeout=0s`); it is enforced client-side by the poll loop.
210-
const execOptions = this.buildExecuteOptions(options);
211207
let asyncStatement;
212208
try {
213209
asyncStatement =
@@ -217,16 +213,9 @@ export default class SeaSessionBackend implements ISessionBackend {
217213
} catch (err) {
218214
throw this.logAndMapError('executeStatement', err);
219215
}
220-
// `queryTimeout` is enforced client-side by the operation backend's poll
221-
// loop: the kernel ignores `queryTimeoutSecs` on the async submit path
222-
// (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward
223-
// it to the napi options — passing it there would be a silent no-op.
224216
return new SeaOperationBackend({
225217
asyncStatement: asyncStatement!,
226218
context: this.context,
227-
// `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()`
228-
// coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf).
229-
queryTimeoutSecs,
230219
});
231220
}
232221

@@ -235,10 +224,7 @@ export default class SeaSessionBackend implements ISessionBackend {
235224
* `ExecuteOptions`, returning `undefined` when nothing is set so the
236225
* no-options call shape (`executeStatement(sql)`) is preserved.
237226
*/
238-
private buildExecuteOptions(
239-
options: ExecuteStatementOptions,
240-
queryTimeoutSecs?: number,
241-
): SeaNativeExecuteOptions | undefined {
227+
private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined {
242228
// Positional (`?`) and named (`:name`) parameters are mutually exclusive —
243229
// the kernel binds one placeholder style per statement. Use the SAME error
244230
// type and message as the Thrift backend (`ThriftSessionBackend`) so a
@@ -256,14 +242,8 @@ export default class SeaSessionBackend implements ISessionBackend {
256242
if (namedParams !== undefined) {
257243
execOptions.namedParams = namedParams;
258244
}
259-
// `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes
260-
// it in): the kernel `execute()` consults it as the server statement
261-
// timeout. On the async submit path the caller omits it (the kernel ignores
262-
// it under `wait_timeout=0s`), so it is enforced client-side by the
263-
// operation backend's poll-loop deadline instead (see executeStatement).
264-
if (queryTimeoutSecs !== undefined) {
265-
execOptions.queryTimeoutSecs = queryTimeoutSecs;
266-
}
245+
// `queryTimeout` is intentionally NOT forwarded — it is a no-op on SEA (see
246+
// the TODO in executeStatement). It must not become the SEA `wait_timeout`.
267247
if (options.rowLimit !== undefined) {
268248
execOptions.rowLimit = Number(options.rowLimit);
269249
}

0 commit comments

Comments
 (0)