Skip to content

Commit 09fb870

Browse files
committed
[SEA-NodeJS] SEA async execute (submit / poll / awaitResult)
Switch the SEA query path from the blocking `executeStatement` to the kernel's async `submitStatement`, matching the Thrift backend's always-async (`runAsync: true`) model. `submitStatement` returns immediately with a pending `AsyncStatement` (kernel `wait_timeout=0s`) while the query runs server-side. SeaOperationBackend becomes dual-mode (exactly one of): - `asyncStatement` (query path): `waitUntilReady()` polls `status()` to a terminal state on a 100ms cadence (matching Thrift), firing the progress callback each tick. Polling `status()` rather than blocking on `awaitResult()` keeps `cancel()` responsive — a blocking awaitResult would hold the kernel statement mutex for the whole query and queue cancel behind it. On Succeeded it materialises the result handle (first fetch is free); on Failed it drives `awaitResult()` to surface the kernel's typed SQL-error envelope; on a server-side Cancelled/Closed/Unknown it throws a clear error. `status()` reports the real Pending/Running/Succeeded state. - `statement` (metadata path): the kernel `list*`/`get*` statement is already terminal, so `waitUntilReady()` stays the one-shot completion tick. The fetch pipeline is shared: `awaitResult()`'s `AsyncResultHandle` and the metadata `Statement` expose the same `fetchNextBatch()` / `schema()` surface, so `SeaResultsProvider` → `ArrowResultConverter` → `ResultSlicer` consume either interchangeably via a single memoised fetch handle. cancel()/close() route through a `lifecycleHandle` abstraction over whichever handle backs the op. Re-exports the kernel `AsyncStatement` / `AsyncResultHandle` types from `SeaNativeLoader`. Validated against a live warehouse: async fetchAll correctness, multi-row drain (5000 rows), long-running aggregate (count over 20M), kernel SQL-error surfacing, and cancellation mid-execution. PR1's params/metadata/getInfo all still pass through the new async path. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 36f05bf commit 09fb870

4 files changed

Lines changed: 377 additions & 54 deletions

File tree

lib/sea/SeaNativeLoader.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import type {
3636
ExecuteOptions as NativeExecuteOptions,
3737
TypedValueInput as NativeTypedValueInput,
3838
NamedTypedValueInput as NativeNamedTypedValueInput,
39+
AsyncStatement as NativeAsyncStatement,
40+
AsyncResultHandle as NativeAsyncResultHandle,
3941
} from '../../native/sea';
4042

4143
// SEA-prefixed re-exports. The kernel-generated `.d.ts` keeps the
@@ -59,6 +61,14 @@ export type SeaNativeExecuteOptions = NativeExecuteOptions;
5961
export type SeaNativeTypedValueInput = NativeTypedValueInput;
6062
export type SeaNativeNamedTypedValueInput = NativeNamedTypedValueInput;
6163

64+
// Async-submit surface: `Connection.submitStatement` returns an
65+
// `AsyncStatement` (status / awaitResult / cancel / close); `awaitResult`
66+
// yields an `AsyncResultHandle` whose `fetchNextBatch` / `schema` match the
67+
// blocking `Statement`'s fetch surface, so the results pipeline consumes
68+
// either interchangeably.
69+
export type SeaNativeAsyncStatement = NativeAsyncStatement;
70+
export type SeaNativeAsyncResultHandle = NativeAsyncResultHandle;
71+
6272
/**
6373
* The full native binding surface, derived from the generated module
6474
* so it can never drift from the `.d.ts` contract: when the kernel

lib/sea/SeaOperationBackend.ts

Lines changed: 190 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import ResultSlicer from '../result/ResultSlicer';
5050
import SeaResultsProvider from './SeaResultsProvider';
5151
import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './SeaArrowIpc';
5252
import { decodeNapiKernelError } from './SeaErrorMapping';
53-
import { SeaStatement } from './SeaNativeLoader';
53+
import { SeaStatement, SeaNativeAsyncStatement, SeaNativeAsyncResultHandle } from './SeaNativeLoader';
5454
import {
5555
SeaStatementHandle,
5656
SeaOperationLifecycleState,
@@ -71,23 +71,69 @@ import {
7171
export type SeaOperationStatement = SeaStatementHandle & Partial<SeaStatement>;
7272

7373
/**
74-
* Constructor options for `SeaOperationBackend`.
74+
* The fetch surface shared by the blocking metadata `Statement` and the async
75+
* query path's `AsyncResultHandle` (from `awaitResult()`): both expose
76+
* `fetchNextBatch()` + a synchronous `schema()`, so the results pipeline
77+
* (`SeaResultsProvider` → `ArrowResultConverter` → `ResultSlicer`) consumes
78+
* either interchangeably.
79+
*/
80+
type SeaFetchHandle = Pick<SeaStatement, 'fetchNextBatch' | 'schema'>;
81+
82+
/** Poll cadence for the async `status()` loop — matches the Thrift backend's 100ms. */
83+
const STATUS_POLL_INTERVAL_MS = 100;
84+
85+
function delay(ms: number): Promise<void> {
86+
return new Promise((resolve) => {
87+
setTimeout(resolve, ms);
88+
});
89+
}
90+
91+
/**
92+
* Map a kernel `AsyncStatement.status()` string to the backend-neutral
93+
* `OperationState`. The kernel variant names (`Pending` / `Running` /
94+
* `Succeeded` / `Failed` / `Cancelled` / `Closed` / `Unknown`) line up 1:1
95+
* with the enum; `Canceled` (one-L spelling) is mapped defensively, and any
96+
* unrecognised value collapses to `Unknown`.
97+
*/
98+
function statusStringToOperationState(state: string): OperationState {
99+
if (state === 'Canceled') {
100+
return OperationState.Cancelled;
101+
}
102+
if ((Object.values(OperationState) as string[]).includes(state)) {
103+
return state as OperationState;
104+
}
105+
return OperationState.Unknown;
106+
}
107+
108+
/**
109+
* Constructor options for `SeaOperationBackend`. Exactly one of
110+
* `asyncStatement` (query path — `Connection.submitStatement`) or `statement`
111+
* (metadata path — `Connection.list*` / `get*`, already terminal) must be set.
75112
*/
76113
export interface SeaOperationBackendOptions {
77-
/** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */
78-
statement: SeaOperationStatement;
114+
/** The pending napi `AsyncStatement` from `Connection.submitStatement(...)`. */
115+
asyncStatement?: SeaNativeAsyncStatement;
116+
/** The terminal napi `Statement` from a metadata call. */
117+
statement?: SeaOperationStatement;
79118
context: IClientContext;
80119
/**
81-
* Optional override for `id`. When not provided a fresh UUIDv4 is
82-
* generated upstream (in `SeaSessionBackend.executeStatement`); the
83-
* kernel does not yet surface its internal statement-id at the napi
84-
* boundary. Once it does, the JS layer can thread it through here.
120+
* Optional override for `id`. Defaults to the napi statement-id when the
121+
* handle exposes one, else a fresh UUIDv4.
85122
*/
86123
id?: string;
87124
}
88125

89126
export default class SeaOperationBackend implements IOperationBackend {
90-
private readonly statement: SeaOperationStatement;
127+
// Query path: pending async statement we poll to terminal. Undefined on the
128+
// metadata path.
129+
private readonly asyncStatement?: SeaNativeAsyncStatement;
130+
131+
// Metadata path: terminal statement. Undefined on the query path.
132+
private readonly blockingStatement?: SeaOperationStatement;
133+
134+
// The cancel/close surface — whichever handle backs this operation. Both
135+
// `AsyncStatement` and `Statement` expose `cancel()` / `close()`.
136+
private readonly lifecycleHandle: SeaStatementHandle;
91137

92138
private readonly context: IClientContext;
93139

@@ -103,10 +149,22 @@ export default class SeaOperationBackend implements IOperationBackend {
103149

104150
private metadataPromise?: Promise<ResultMetadata>;
105151

106-
constructor({ statement, context, id }: SeaOperationBackendOptions) {
107-
this.statement = statement;
152+
// Memoised fetch handle: on the async path it is `awaitResult()`'s result
153+
// (resolved once the statement is terminal); on the metadata path it is the
154+
// already-terminal statement. Drives both fetch and result-metadata.
155+
private fetchHandlePromise?: Promise<SeaFetchHandle>;
156+
157+
constructor({ asyncStatement, statement, context, id }: SeaOperationBackendOptions) {
158+
if ((asyncStatement === undefined) === (statement === undefined)) {
159+
throw new HiveDriverError(
160+
'SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided',
161+
);
162+
}
163+
this.asyncStatement = asyncStatement;
164+
this.blockingStatement = statement;
165+
this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle;
108166
this.context = context;
109-
this._id = id ?? uuidv4();
167+
this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4();
110168
}
111169

112170
public get id(): string {
@@ -162,7 +220,7 @@ export default class SeaOperationBackend implements IOperationBackend {
162220
// wedged, so nothing downstream forces another close). We still don't
163221
// mask the original fetch error, but log the close failure at warn so
164222
// the leak is diagnosable rather than completely invisible.
165-
await seaClose(this.lifecycle, this.statement, this.context, this._id).catch((closeErr) => {
223+
await seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id).catch((closeErr) => {
166224
const cause = closeErr instanceof Error ? closeErr.message : String(closeErr);
167225
this.context
168226
.getLogger()
@@ -191,12 +249,16 @@ export default class SeaOperationBackend implements IOperationBackend {
191249
return this.metadataPromise;
192250
}
193251
this.metadataPromise = (async () => {
194-
if (!this.statement.schema) {
195-
throw new HiveDriverError('SeaOperationBackend: statement.schema() is not available on this handle');
252+
// The schema lives on the fetch handle: the metadata `Statement`
253+
// directly, or the async path's `AsyncResultHandle` (materialised by
254+
// `getFetchHandle()` once the statement is terminal).
255+
const handle = await this.getFetchHandle();
256+
if (!handle.schema) {
257+
throw new HiveDriverError('SeaOperationBackend: schema() is not available on this handle');
196258
}
197259
// `schema()` is a synchronous napi getter (returns `ArrowSchema`, not a
198260
// Promise) — no `await` needed.
199-
const arrowSchemaIpc = this.statement.schema();
261+
const arrowSchemaIpc = handle.schema();
200262
const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes);
201263
// `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for
202264
// back-compat with the public `IOperation.getSchema()` surface.
@@ -229,60 +291,146 @@ export default class SeaOperationBackend implements IOperationBackend {
229291
// ---------------------------------------------------------------------------
230292

231293
public async status(_progress: boolean): Promise<OperationStatus> {
232-
// Synthesised — the kernel resolves `Statement::execute().await` before
233-
// it hands back a Statement handle, so by the time a SeaOperationBackend
234-
// exists the statement is terminal. Note there is intentionally no
235-
// `Failed` arm: a failed execution rejects inside `executeStatement`
236-
// (the kernel surfaces the error at submit), so a `Failed` statement
237-
// never becomes a SeaOperationBackend — `status()` only ever observes
238-
// Succeeded, or Cancelled/Closed from a client-side lifecycle call.
239-
// Report Cancelled/Closed if the lifecycle flag is set, else Succeeded.
240-
// Returns the backend-neutral OperationStatus the IOperationBackend
241-
// contract expects, so the DBSQLOperation facade switches on `state`
242-
// identically across backends.
294+
// A client-side cancel/close wins over any server state.
243295
if (this.lifecycle.isCancelled) {
244296
return { state: OperationState.Cancelled, hasResultSet: true };
245297
}
246298
if (this.lifecycle.isClosed) {
247299
return { state: OperationState.Closed, hasResultSet: true };
248300
}
301+
if (this.asyncStatement) {
302+
// Query path: report the real kernel state (single GetStatementStatus
303+
// RPC — no polling here; `waitUntilReady` owns the poll loop).
304+
const state = statusStringToOperationState(await this.asyncStatement.status());
305+
return { state, hasResultSet: true };
306+
}
307+
// Metadata path: the kernel statement is already terminal.
249308
return { state: OperationState.Succeeded, hasResultSet: true };
250309
}
251310

252311
public async waitUntilReady(options?: IOperationBackendWaitOptions): Promise<void> {
253-
// Kernel's `Statement::execute().await` has already resolved by the
254-
// time we hold a Statement handle — there is no pending/running
255-
// state to poll for M0. seaFinished fires the progress callback
256-
// once with a synthesised FINISHED response so progress-UI callers
257-
// see the same one-shot completion tick the Thrift path emits at
258-
// the end of its polling loop.
312+
if (this.asyncStatement) {
313+
return this.waitUntilReadyAsync(options);
314+
}
315+
// Metadata path: the kernel statement has already resolved, so there is
316+
// nothing to poll. seaFinished fires the progress callback once with a
317+
// synthesised completion tick, matching the Thrift path's final tick.
259318
return seaFinished(this.lifecycle, options);
260319
}
261320

262321
public async cancel(): Promise<Status> {
263-
return seaCancel(this.lifecycle, this.statement, this.context, this._id);
322+
return seaCancel(this.lifecycle, this.lifecycleHandle, this.context, this._id);
264323
}
265324

266325
public async close(): Promise<Status> {
267-
return seaClose(this.lifecycle, this.statement, this.context, this._id);
326+
return seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id);
268327
}
269328

270329
// ---------------------------------------------------------------------------
271330
// Internals.
272331
// ---------------------------------------------------------------------------
273332

333+
/**
334+
* Poll the kernel `AsyncStatement` to a terminal state, mirroring the Thrift
335+
* backend's `waitUntilReady` loop (100ms cadence). Polling `status()` rather
336+
* than awaiting `awaitResult()` directly is deliberate: a blocking
337+
* `awaitResult()` holds the kernel statement mutex for the whole query and
338+
* would queue a concurrent `cancel()` behind it, whereas the poll loop
339+
* releases the mutex between ticks so `cancel()` stays responsive. On
340+
* success it materialises the result handle (so the first fetch is free);
341+
* on a bad terminal state it surfaces the real kernel error.
342+
*/
343+
private async waitUntilReadyAsync(options?: IOperationBackendWaitOptions): Promise<void> {
344+
// Already materialised → terminal-and-ready, nothing to wait for.
345+
if (this.fetchHandlePromise) {
346+
return;
347+
}
348+
for (;;) {
349+
// A JS-initiated cancel/close short-circuits before the next poll.
350+
failIfNotActive(this.lifecycle);
351+
352+
// eslint-disable-next-line no-await-in-loop
353+
const state = statusStringToOperationState(await this.asyncStatement!.status());
354+
355+
if (options?.callback) {
356+
// eslint-disable-next-line no-await-in-loop
357+
await Promise.resolve(options.callback({ state, hasResultSet: true }));
358+
}
359+
360+
switch (state) {
361+
case OperationState.Pending:
362+
case OperationState.Running:
363+
break;
364+
case OperationState.Succeeded:
365+
// Materialise the result stream now so the first fetch/metadata call
366+
// doesn't pay an extra await_result round-trip.
367+
// eslint-disable-next-line no-await-in-loop
368+
await this.getFetchHandle();
369+
return;
370+
case OperationState.Failed:
371+
// `status()` collapses Failed to the variant name only; the real
372+
// SQL-error envelope (sql_state / error_code / query_id) rides on
373+
// `awaitResult()`'s rejection — drive it to surface the typed error.
374+
// eslint-disable-next-line no-await-in-loop
375+
await this.throwAsyncError();
376+
break;
377+
case OperationState.Cancelled:
378+
throw new HiveDriverError(`SEA operation ${this._id} was cancelled server-side.`);
379+
case OperationState.Closed:
380+
throw new HiveDriverError(`SEA operation ${this._id} was closed before it produced a result.`);
381+
default:
382+
throw new HiveDriverError(`SEA operation ${this._id} reached an unexpected state: ${state}.`);
383+
}
384+
385+
// eslint-disable-next-line no-await-in-loop
386+
await delay(STATUS_POLL_INTERVAL_MS);
387+
}
388+
}
389+
390+
/**
391+
* Drive `awaitResult()` on a Failed statement to surface the kernel's typed
392+
* SQL-error envelope. Falls back to a generic error if `awaitResult()`
393+
* unexpectedly resolves instead of rejecting.
394+
*/
395+
private async throwAsyncError(): Promise<never> {
396+
try {
397+
await this.asyncStatement!.awaitResult();
398+
} catch (err) {
399+
throw decodeNapiKernelError(err);
400+
}
401+
throw new HiveDriverError(`SEA operation ${this._id} reported Failed but produced a result.`);
402+
}
403+
404+
/**
405+
* Resolve (and memoise) the fetch handle: `awaitResult()`'s `AsyncResultHandle`
406+
* on the query path, or the already-terminal `Statement` on the metadata path.
407+
*/
408+
private getFetchHandle(): Promise<SeaFetchHandle> {
409+
if (!this.fetchHandlePromise) {
410+
if (this.asyncStatement) {
411+
this.fetchHandlePromise = this.asyncStatement.awaitResult().catch((err) => {
412+
throw decodeNapiKernelError(err);
413+
}) as Promise<SeaNativeAsyncResultHandle>;
414+
} else {
415+
const stmt = this.blockingStatement!;
416+
if (!stmt.fetchNextBatch) {
417+
throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle');
418+
}
419+
this.fetchHandlePromise = Promise.resolve(stmt as unknown as SeaFetchHandle);
420+
}
421+
}
422+
return this.fetchHandlePromise;
423+
}
424+
274425
private async getResultSlicer(): Promise<ResultSlicer<any>> {
275426
if (this.resultSlicer) {
276427
return this.resultSlicer;
277428
}
278-
if (!this.statement.fetchNextBatch) {
279-
throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle');
280-
}
281429
const metadata = await this.getResultMetadata();
282-
// The lifecycle subset has cancel/close only; fetch methods exist on
283-
// the full napi Statement. Cast is safe here because we've just
284-
// verified `fetchNextBatch` is callable.
285-
this.resultsProvider = new SeaResultsProvider(this.statement as SeaStatement);
430+
const handle = await this.getFetchHandle();
431+
// SeaResultsProvider consumes only `fetchNextBatch`; both the async result
432+
// handle and the blocking statement satisfy that surface.
433+
this.resultsProvider = new SeaResultsProvider(handle as unknown as SeaStatement);
286434
const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata);
287435
this.resultSlicer = new ResultSlicer(this.context, converter);
288436
return this.resultSlicer;

lib/sea/SeaSessionBackend.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,22 @@ export default class SeaSessionBackend implements ISessionBackend {
143143

144144
const execOptions = this.buildExecuteOptions(options);
145145

146-
let nativeStatement;
146+
// Submit asynchronously (kernel `wait_timeout=0s`): the server returns a
147+
// pending `AsyncStatement` immediately while the query runs, matching the
148+
// Thrift backend's always-async (`runAsync: true`) path. The operation
149+
// backend polls `status()` to terminal in `waitUntilReady()` and
150+
// materialises results via `awaitResult()`, so a long-running query stays
151+
// cancellable mid-flight and `status()` reports real Pending/Running states.
152+
let asyncStatement;
147153
try {
148-
nativeStatement =
154+
asyncStatement =
149155
execOptions === undefined
150-
? await this.connection.executeStatement(statement)
151-
: await this.connection.executeStatement(statement, execOptions);
156+
? await this.connection.submitStatement(statement)
157+
: await this.connection.submitStatement(statement, execOptions);
152158
} catch (err) {
153159
throw decodeNapiKernelError(err);
154160
}
155-
return this.wrapStatement(nativeStatement!);
161+
return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context });
156162
}
157163

158164
/**
@@ -205,7 +211,7 @@ export default class SeaSessionBackend implements ISessionBackend {
205211
return Object.keys(execOptions).length > 0 ? execOptions : undefined;
206212
}
207213

208-
/** Wrap a napi `Statement` (from execute or a metadata call) as an operation backend. */
214+
/** Wrap a napi metadata `Statement` (already terminal) as an operation backend. */
209215
private wrapStatement(nativeStatement: SeaStatement): IOperationBackend {
210216
return new SeaOperationBackend({
211217
statement: nativeStatement,

0 commit comments

Comments
 (0)