Skip to content

Commit 73a8918

Browse files
committed
[SEA-NodeJS] Sync execute: eager cancellable handle + close-drives commit; accept-and-ignore unsupported options
The default sync path (runAsync:false) now returns the operation handle immediately and kicks off the inline-materialise result() in the background, instead of blocking executeStatement until the statement is terminal. This restores mid-run cancellation on the sync path WITHOUT the runAsync submit/poll/refetch latency tax: the kernel execute() publishes the statement id mid-flight, so a concurrent op.cancel() interrupts the running execute via the StatementCanceller. Fire-and-forget DDL/DML (execute then close without a fetch) and dependent-statement ordering still "just work": the operation backend's close() drives the same memoised result() to terminal before releasing (committing the statement), UNLESS the op was cancelled — in which case it releases without committing. Early-window cancel correctness (cancel issued before the server id is published) depends on the kernel StatementCanceller intent-hold (databricks/databricks-sql-kernel#133): the canceller holds the cancel intent until the id appears, then dispatches the real CancelStatement, so there is no orphaned server statement. No napi surface change. Also retains the option no-ops: accept-and-ignore for unsupported per-statement options (useCloudFetch / useLZ4Compression / stagingAllowedLocalPath) and queryTimeout (no-op + TODO), matching the Python kernel client rather than hard-failing the connection. Validated e2e (pecotesting): CREATE/INSERT fire-and-forget commit, dependent ordering, 100k read, mid-run cancel (late + early intent-hold window). Unit: SEA suite 250 passing. Latency vs Thrift: SELECT 1 ~parity (-2%..+8%), 100k rows -16%..-20% (faster). Concurrency: SELECT 1 conc=40 -19%, 100k conc=10 -7% (faster). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 4b9e16e commit 73a8918

3 files changed

Lines changed: 155 additions & 118 deletions

File tree

lib/sea/SeaOperationBackend.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,15 +218,25 @@ export default class SeaOperationBackend implements IOperationBackend {
218218
// Lifecycle surface. The async/metadata handles expose both cancel/close.
219219
// The sync-execute path uses a composite: `cancel()` always routes to the
220220
// cancellable execution (lock-free, interrupts a running `result()`
221-
// mid-compute and is a no-op once terminal); `close()` closes the resolved
222-
// terminal statement once `result()` produced it, OR — if `result()` is
223-
// still in flight — proactively cancels the running execution so the server
224-
// stops computing immediately rather than running on until the kernel's
225-
// drop-guard fires whenever this handle is eventually GC'd.
221+
// mid-compute and is a no-op once terminal); `close()` drives the statement
222+
// to terminal first (fire-and-forget commit) unless the op was cancelled.
226223
this.lifecycleHandle = cancellableExecution
227224
? {
228225
cancel: () => cancellableExecution.cancel(),
229-
close: () => (this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel()),
226+
// Fire-and-forget commit semantics. A submitted statement is only
227+
// guaranteed to run server-side once `result()` reaches terminal.
228+
// On close, if the op was NOT cancelled, drive `result()` to
229+
// completion first (the eager kick-off in the session backend means
230+
// this is usually already in flight, so we just await the memoised
231+
// promise) so a `CREATE`/`INSERT` issued without a fetch still
232+
// commits — then close the resolved terminal statement. A cancelled
233+
// op skips the drive and releases the (already-cancelling) execution.
234+
close: async () => {
235+
if (!this.lifecycle.isCancelled && !this.blockingStatement) {
236+
await this.getFetchHandle().catch(() => undefined);
237+
}
238+
return this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel();
239+
},
230240
}
231241
: ((asyncStatement ?? statement) as SeaStatementHandle);
232242
this.context = context;

lib/sea/SeaSessionBackend.ts

Lines changed: 58 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import ParameterError from '../errors/ParameterError';
3535
import { LogLevel } from '../contracts/IDBSQLLogger';
3636
import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader';
3737
import { decodeNapiKernelError } from './SeaErrorMapping';
38-
import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend';
3938
import SeaOperationBackend from './SeaOperationBackend';
4039
import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams';
4140
import { seaServerInfoValue } from './SeaServerInfo';
@@ -122,38 +121,41 @@ export default class SeaSessionBackend implements ISessionBackend {
122121
* Per-statement options forwarded to the kernel `ExecuteOptions`:
123122
* - `ordinalParameters` / `namedParameters` → bound params (mutually
124123
* exclusive — the kernel binds one placeholder style per statement);
125-
* - `queryTimeout` → enforced client-side by the operation backend's poll
126-
* deadline (the kernel ignores `queryTimeoutSecs` on the async submit
127-
* path), NOT forwarded to the napi options;
128124
* - `rowLimit` → `rowLimit` (SEA-only server-side row cap);
129125
* - `queryTags` → serialised into the conf overlay's reserved
130126
* `query_tags` key (the same wire shape Thrift's `serializeQueryTags`
131127
* produces), merged with any explicit `statementConf`.
132128
*
133-
* Still rejected (genuinely unsupported on SEA, rather than silently
134-
* dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a
135-
* per-statement knob), `useLZ4Compression` (kernel owns result compression),
136-
* and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by
137-
* the facade at fetch time, so it is intentionally not handled here.
129+
* Accepted but IGNORED (no-op — the kernel exposes no per-statement knob, so
130+
* we drop rather than reject; see the body for details and TODOs):
131+
* `useCloudFetch`, `useLZ4Compression`, `stagingAllowedLocalPath`, and
132+
* `queryTimeout`. `maxRows` is applied by the facade at fetch time, so it is
133+
* intentionally not handled here.
138134
*/
139135
public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise<IOperationBackend> {
140136
this.failIfClosed();
141137

142-
if (options.useCloudFetch !== undefined) {
143-
throw new HiveDriverError(
144-
'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA',
145-
);
146-
}
147-
if (options.useLZ4Compression !== undefined) {
148-
throw new HiveDriverError(
149-
'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)',
150-
);
151-
}
152-
if (options.stagingAllowedLocalPath !== undefined) {
153-
throw new HiveDriverError(
154-
'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA',
155-
);
156-
}
138+
// `useCloudFetch`, `useLZ4Compression`, and `stagingAllowedLocalPath` are
139+
// accepted and IGNORED (no-op) on the kernel-backed SEA path rather than
140+
// rejected — the kernel exposes no per-statement knob for any of them, so a
141+
// hard failure would break callers that set these options globally. This
142+
// mirrors the Python connector's kernel backend
143+
// (`KernelDatabricksClient.execute_command`), which takes the same flags and
144+
// never reads them.
145+
//
146+
// - `useCloudFetch`: result transport is governed by the session-level
147+
// kernel `ResultConfig.cloudfetch_enabled` (default: CloudFetch on);
148+
// there is no per-statement override on the napi surface.
149+
// - `useLZ4Compression`: the kernel transparently decodes whatever
150+
// compression the server returns (`manifest.result_compression`) and
151+
// exposes no compression-request knob.
152+
// - `stagingAllowedLocalPath`: the kernel has no Volume (PUT/GET/REMOVE)
153+
// API yet, so `SeaOperationBackend` always reports
154+
// `isStagingOperation: false` and `DBSQLSession` treats such statements
155+
// as ordinary queries. Non-staging queries that set the option run
156+
// normally (parity with Thrift).
157+
// TODO(SEA): wire real volume operations once the kernel exposes a
158+
// Volume API + napi `is_volume_operation`.
157159

158160
// `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel-
159161
// specific use of the option — the Thrift backend hardcodes `runAsync: true`
@@ -166,26 +168,27 @@ export default class SeaSessionBackend implements ISessionBackend {
166168
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
167169
// `executeStatementCancellable`: the kernel blocks on `execute()`
168170
// (server-side direct-results / poll-to-terminal), which is faster and,
169-
// with the napi sync canceller, fully cancellable mid-COMPUTE. The
170-
// blocking drive runs in the operation backend's `result()` (inside
171-
// `waitUntilReady`, which the facade invokes lazily at first fetch).
172-
// `queryTimeoutSecs` IS honoured on this path (forwarded to the napi
173-
// options below) since the kernel `execute()` consults it.
171+
// with the napi sync canceller, fully cancellable mid-COMPUTE.
174172
//
175173
// - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server
176174
// returns a pending `AsyncStatement` immediately while the query runs;
177175
// the backend polls `status()` to terminal in `waitUntilReady()` and
178-
// materialises results via `awaitResult()`. `queryTimeoutSecs` is
179-
// ignored by the kernel on submit, so it is enforced client-side by the
180-
// operation backend's poll-loop deadline instead.
176+
// materialises results via `awaitResult()`.
177+
//
178+
// TODO(SEA): `queryTimeout` is intentionally a NO-OP here. It must NOT be
179+
// mapped to the SEA `wait_timeout` wire field: `wait_timeout` is the
180+
// inline-result wait knob (valid range {0} ∪ [5,50]s, paired with
181+
// `on_wait_timeout`), a different concept from a server statement-execution
182+
// timeout, and out-of-range values fail with HTTP 400. The correct SEA
183+
// mechanism is the `STATEMENT_TIMEOUT` session configuration (seconds); the
184+
// Python connector forwards no per-statement timeout at all. Wiring this
185+
// properly (STATEMENT_TIMEOUT and/or a client-side poll deadline) is
186+
// deferred — until then the option is accepted and ignored.
181187
const runAsync = options.runAsync ?? false;
182-
const queryTimeoutSecs =
183-
options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined;
188+
189+
const execOptions = this.buildExecuteOptions(options);
184190

185191
if (!runAsync) {
186-
// Sync path: forward `queryTimeoutSecs` to the napi options — the kernel
187-
// `execute()` honours it (server statement timeout).
188-
const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs);
189192
let cancellableExecution;
190193
try {
191194
cancellableExecution =
@@ -195,19 +198,26 @@ export default class SeaSessionBackend implements ISessionBackend {
195198
} catch (err) {
196199
throw this.logAndMapError('executeStatement', err);
197200
}
198-
return new SeaOperationBackend({
201+
const op = new SeaOperationBackend({
199202
cancellableExecution: cancellableExecution!,
200203
context: this.context,
201-
// The kernel honours `queryTimeoutSecs` on the sync `execute` path, so
202-
// it is forwarded via the napi options (see `buildExecuteOptions`); the
203-
// backend also keeps it as a deadline guard for parity with async.
204-
queryTimeoutSecs,
205204
});
205+
// Eager-cancellable sync path: kick off the inline-materialise `result()`
206+
// in the background and return the handle IMMEDIATELY — do NOT await it.
207+
// The kernel `execute()` publishes the statement id mid-execute, so a
208+
// concurrent `op.cancel()` interrupts the running execute via the
209+
// StatementCanceller (and, if the id has not been published yet, the
210+
// canceller holds the cancel intent until it is, then dispatches the real
211+
// CancelStatement — so there is no orphaned server statement). This gives
212+
// mid-run cancel on the SYNC path WITHOUT `runAsync`'s submit/poll/refetch
213+
// tax; `fetchAll()` awaits the same memoised `result()` (so small queries
214+
// stay fast / inline). Fire-and-forget DDL/DML (execute then close without
215+
// a fetch) still commits: the operation backend's `close()` drives this
216+
// same `result()` to terminal before releasing, unless the op was cancelled.
217+
op.waitUntilReady().catch(() => undefined);
218+
return op;
206219
}
207220

208-
// Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on
209-
// submit — `wait_timeout=0s`); it is enforced client-side by the poll loop.
210-
const execOptions = this.buildExecuteOptions(options);
211221
let asyncStatement;
212222
try {
213223
asyncStatement =
@@ -217,16 +227,9 @@ export default class SeaSessionBackend implements ISessionBackend {
217227
} catch (err) {
218228
throw this.logAndMapError('executeStatement', err);
219229
}
220-
// `queryTimeout` is enforced client-side by the operation backend's poll
221-
// loop: the kernel ignores `queryTimeoutSecs` on the async submit path
222-
// (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward
223-
// it to the napi options — passing it there would be a silent no-op.
224230
return new SeaOperationBackend({
225231
asyncStatement: asyncStatement!,
226232
context: this.context,
227-
// `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()`
228-
// coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf).
229-
queryTimeoutSecs,
230233
});
231234
}
232235

@@ -235,10 +238,7 @@ export default class SeaSessionBackend implements ISessionBackend {
235238
* `ExecuteOptions`, returning `undefined` when nothing is set so the
236239
* no-options call shape (`executeStatement(sql)`) is preserved.
237240
*/
238-
private buildExecuteOptions(
239-
options: ExecuteStatementOptions,
240-
queryTimeoutSecs?: number,
241-
): SeaNativeExecuteOptions | undefined {
241+
private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined {
242242
// Positional (`?`) and named (`:name`) parameters are mutually exclusive —
243243
// the kernel binds one placeholder style per statement. Use the SAME error
244244
// type and message as the Thrift backend (`ThriftSessionBackend`) so a
@@ -256,14 +256,8 @@ export default class SeaSessionBackend implements ISessionBackend {
256256
if (namedParams !== undefined) {
257257
execOptions.namedParams = namedParams;
258258
}
259-
// `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes
260-
// it in): the kernel `execute()` consults it as the server statement
261-
// timeout. On the async submit path the caller omits it (the kernel ignores
262-
// it under `wait_timeout=0s`), so it is enforced client-side by the
263-
// operation backend's poll-loop deadline instead (see executeStatement).
264-
if (queryTimeoutSecs !== undefined) {
265-
execOptions.queryTimeoutSecs = queryTimeoutSecs;
266-
}
259+
// `queryTimeout` is intentionally NOT forwarded — it is a no-op on SEA (see
260+
// the TODO in executeStatement). It must not become the SEA `wait_timeout`.
267261
if (options.rowLimit !== undefined) {
268262
execOptions.rowLimit = Number(options.rowLimit);
269263
}

0 commit comments

Comments
 (0)