Skip to content

Commit 8bb7def

Browse files
committed
[SEA-NodeJS] Extend eager-execute parity to the async (runAsync) path
The prior commit fixed fire-and-forget DDL/DML on the default sync path. The opt-in async path (`runAsync: true`) had the same defect from a different cause: `submitStatement` starts the statement server-side, but `asyncStatement.close()` CANCELS a still-running statement — so `executeStatement(..., {runAsync:true})` followed by `close()` with no fetch aborted the statement before it committed. `SeaOperationBackend` now wraps the async handle in the same composite-close shape as the sync path: unless the caller explicitly `cancel()`ed, `close()` drives the result to terminal (`getFetchHandle()` → the kernel `awaitResult()`) so the side effect lands, then closes. The metadata path (already-terminal statement) still closes directly. Verified: 249 SEA unit tests pass (added async drive-on-close and close-after-cancel coverage); e2e confirms fire-and-forget CREATE/INSERT via runAsync:true now persists on SEA, with no change to the cancel or normal-read paths. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent f106f9b commit 8bb7def

2 files changed

Lines changed: 65 additions & 28 deletions

File tree

lib/sea/SeaOperationBackend.ts

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -215,33 +215,50 @@ export default class SeaOperationBackend implements IOperationBackend {
215215
this.asyncStatement = asyncStatement;
216216
this.cancellableExecution = cancellableExecution;
217217
this.blockingStatement = statement;
218-
// Lifecycle surface. The async/metadata handles expose both cancel/close.
219-
// The sync-execute path uses a composite: `cancel()` always routes to the
220-
// cancellable execution (lock-free, interrupts a running `result()`
221-
// mid-compute and is a no-op once terminal); `close()` closes the resolved
222-
// terminal statement once `result()` produced it, OR — if `result()` is
223-
// still in flight — proactively cancels the running execution so the server
224-
// stops computing immediately rather than running on until the kernel's
225-
// drop-guard fires whenever this handle is eventually GC'd.
226-
this.lifecycleHandle = cancellableExecution
227-
? {
228-
cancel: () => cancellableExecution.cancel(),
229-
close: async () => {
230-
// Eager-execute parity with Thrift and Python `use_kernel`: a
231-
// statement run via `executeStatement(...).close()` with NO
232-
// intervening fetch must still take effect server-side (DDL/DML side
233-
// effects). If the execution was never driven to a result and the
234-
// caller did NOT explicitly cancel, drive it to completion here
235-
// rather than cancelling an un-run statement. Errors are swallowed —
236-
// `close()` is best-effort cleanup and must not throw; the read path
237-
// (fetch / finished) still surfaces execution errors.
238-
if (!this.lifecycle.isCancelled && this.blockingStatement === undefined) {
239-
await this.getFetchHandle().catch(() => {});
240-
}
241-
return this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel();
242-
},
243-
}
244-
: ((asyncStatement ?? statement) as SeaStatementHandle);
218+
// Lifecycle surface. Both query paths use a composite handle so that
219+
// `close()` honours eager-execute parity with Thrift / Python `use_kernel`:
220+
// a statement run via `executeStatement(...).close()` with NO intervening
221+
// fetch must still take effect server-side (DDL/DML side effects). In both
222+
// cases, unless the caller explicitly `cancel()`ed, `close()` drives the
223+
// statement to a terminal result before closing rather than cancelling an
224+
// un-run statement. Errors are swallowed — `close()` is best-effort cleanup
225+
// and must not throw; the read path (fetch / finished) still surfaces them.
226+
// The metadata path's statement is already terminal, so it closes directly.
227+
if (cancellableExecution) {
228+
// Sync (`runAsync: false`): `cancel()` routes to the cancellable execution
229+
// (lock-free, interrupts a running `result()` mid-compute, no-op once
230+
// terminal). `close()` drives `result()` to the terminal statement (set on
231+
// `blockingStatement`) and closes that; if `result()` already resolved it
232+
// closes directly; if cancelled it cancels.
233+
this.lifecycleHandle = {
234+
cancel: () => cancellableExecution.cancel(),
235+
close: async () => {
236+
if (!this.lifecycle.isCancelled && this.blockingStatement === undefined) {
237+
await this.getFetchHandle().catch(() => {});
238+
}
239+
return this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel();
240+
},
241+
};
242+
} else if (asyncStatement) {
243+
// Async (`runAsync: true`): `submitStatement` already started the statement
244+
// server-side, but `asyncStatement.close()` CANCELS a still-running
245+
// statement — so `executeStatement(...).close()` with no fetch would abort
246+
// a DDL/DML before it commits. Mirror the sync path: unless explicitly
247+
// cancelled, drive the result to terminal (`getFetchHandle()` → the kernel
248+
// `awaitResult()`) so the side effect lands, then close.
249+
this.lifecycleHandle = {
250+
cancel: () => asyncStatement.cancel(),
251+
close: async () => {
252+
if (!this.lifecycle.isCancelled) {
253+
await this.getFetchHandle().catch(() => {});
254+
}
255+
return asyncStatement.close();
256+
},
257+
};
258+
} else {
259+
// Metadata path: the kernel statement is already terminal — close directly.
260+
this.lifecycleHandle = statement as SeaStatementHandle;
261+
}
245262
this.context = context;
246263
this._id =
247264
id ?? asyncStatement?.statementId ?? statement?.statementId ?? cancellableExecution?.statementId ?? uuidv4();

tests/unit/sea/execution.test.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ class FakeAsyncStatement {
8888

8989
public statusCalls = 0;
9090

91+
public awaitCalls = 0;
92+
9193
public awaitResultError: Error | null = null;
9294

9395
// Successive status() returns drain this queue; the last value sticks.
@@ -108,6 +110,7 @@ class FakeAsyncStatement {
108110
}
109111

110112
public async awaitResult(): Promise<FakeNativeStatement> {
113+
this.awaitCalls += 1;
111114
if (this.awaitResultError) {
112115
throw this.awaitResultError;
113116
}
@@ -1011,10 +1014,27 @@ describe('SeaOperationBackend — async (submitStatement) path', () => {
10111014
expect(thrown).to.be.an('error');
10121015
});
10131016

1014-
it('close() forwards to the async statement', async () => {
1017+
it('close() drives an un-fetched async statement to completion, then closes it (eager-execute parity)', async () => {
1018+
// submitStatement starts the statement server-side, but asyncStatement.close()
1019+
// CANCELS a still-running statement — so executeStatement(runAsync:true).close()
1020+
// with no fetch would abort a DDL/DML before it commits. close() must first
1021+
// drive the result to terminal (awaitResult), then close.
1022+
const stmt = new FakeAsyncStatement();
1023+
const op = makeAsyncOp(stmt);
1024+
await op.close();
1025+
expect(stmt.awaitCalls, 'close() drives awaitResult() to terminal').to.equal(1);
1026+
expect(stmt.closed).to.equal(true);
1027+
expect(stmt.cancelled).to.equal(false);
1028+
});
1029+
1030+
it('close() after an explicit cancel() does NOT drive the async statement', async () => {
10151031
const stmt = new FakeAsyncStatement();
10161032
const op = makeAsyncOp(stmt);
1033+
await op.cancel();
10171034
await op.close();
1035+
expect(stmt.cancelled).to.equal(true);
1036+
// Already being torn down — close() must not await a fresh result drive.
1037+
expect(stmt.awaitCalls).to.equal(0);
10181038
expect(stmt.closed).to.equal(true);
10191039
});
10201040
});

0 commit comments

Comments
 (0)