Skip to content

Commit c875ce8

Browse files
committed
[SEA-NodeJS] Address #416 review: stable op.id (telemetry), runAsync contract, doc + signal/coverage gaps
Review findings (databricks-sql-nodejs#416): - F2 (HIGH): revert the sync-path op.id mutation. `op.id` flipped from a client UUID to the server statement_id after `result()` resolved, but the facade keys telemetry start/complete on it (DBSQLOperation → MetricsAggregator), so the flip split the records across two keys and dropped the summary. `id` is now stable for the operation's lifetime; the resolved server statement_id is surfaced via a debug log for server/kernel correlation instead. Test updated: asserts id is stable AND the server id is logged. - F1 (HIGH): `runAsync` is the SEA sync/async toggle but was JSDoc-@deprecated, and a comment falsely claimed it "mirrors Thrift's runAsync distinction" (Thrift hardcodes runAsync:true and never reads the option). Replaced the @deprecated tag with the cross-backend contract (Thrift: no-op; kernel: selects sync-default vs async) and corrected the in-code comment. - Doc: SeaSessionBackend class comment still said metadata "defers to M1 — throws"; metadata is fully implemented. Rewritten to list the implemented surface. - F3 (MED): ThriftSessionBackend now debug-logs when rowLimit / statementConf (kernel-only options) are set on the Thrift path, instead of dropping silently. - F4 (MED): added the missing coverage using the previously-dead fakes — sync-path Failed/SQL-error envelope (`resultError`), submit-time error mapping on both paths (`throwOnExecute`), and queryTags-vs-statementConf.query_tags collision precedence. - F5 (MED): the query-timeout best-effort cancel now warn-logs a failed cancel (mirrors the fetch-error cleanup) so a still-running server statement is diagnosable. - F10 (LOW): hoisted `Object.values(OperationState)` to a module const off the 100ms async poll loop. Validated: tsc/eslint/prettier clean; 243 SEA / 1162 full unit tests pass; live smoke confirms op.id is stable across fetch on both paths and both return correct data. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 4ccf0b2 commit c875ce8

5 files changed

Lines changed: 141 additions & 41 deletions

File tree

lib/contracts/IDBSQLSession.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,18 @@ export type ExecuteStatementOptions = {
1212
*/
1313
queryTimeout?: number | bigint | Int64;
1414
/**
15-
* @deprecated This option is no longer supported and will be removed in future releases
15+
* Selects the execution lifecycle. The only observable effect is WHEN
16+
* `executeStatement` resolves; the result data, schema, and error classes are
17+
* identical regardless.
18+
*
19+
* - **Thrift backend:** no-op. The Thrift path always submits asynchronously
20+
* (`runAsync: true` on the wire) and polls during fetch; this option is not
21+
* read.
22+
* - **Kernel backend (`useSEA`):** selects the kernel execution path —
23+
* `false`/unset (default) runs the blocking direct-results path (faster,
24+
* cancellable mid-compute); `true` submits and polls (returns a pending
25+
* handle before completion). Default is sync, matching the python
26+
* connector's `cursor.execute()`.
1627
*/
1728
runAsync?: boolean;
1829
maxRows?: number | bigint | Int64 | null;

lib/sea/SeaOperationBackend.ts

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ function delay(ms: number): Promise<void> {
101101
* with the enum; `Canceled` (one-L spelling) is mapped defensively, and any
102102
* unrecognised value collapses to `Unknown`.
103103
*/
104+
// Hoisted out of the (hot, 100ms) async poll loop — `Object.values` would
105+
// otherwise allocate a fresh array on every status tick.
106+
const OPERATION_STATE_VALUES = Object.values(OperationState) as string[];
107+
104108
function statusStringToOperationState(state: string): OperationState {
105109
if (state === 'Canceled') {
106110
return OperationState.Cancelled;
107111
}
108-
if ((Object.values(OperationState) as string[]).includes(state)) {
112+
if (OPERATION_STATE_VALUES.includes(state)) {
109113
return state as OperationState;
110114
}
111115
return OperationState.Unknown;
@@ -157,10 +161,6 @@ export default class SeaOperationBackend implements IOperationBackend {
157161
// Undefined on the async / metadata paths.
158162
private readonly cancellableExecution?: SeaNativeCancellableExecution;
159163

160-
// Sync path: the server statement id captured from the resolved `Statement`
161-
// once `result()` settles. Undefined until then; surfaced via `id`.
162-
private resolvedStatementId?: string;
163-
164164
// Metadata path: terminal statement. Also the resolved fetch handle on the
165165
// sync-execute path once `cancellableExecution.result()` settles.
166166
private blockingStatement?: SeaOperationStatement;
@@ -235,14 +235,15 @@ export default class SeaOperationBackend implements IOperationBackend {
235235
}
236236

237237
public get id(): string {
238-
// On the sync (cancellable) path the server statement id isn't known at
239-
// construction — it's published mid-`result()` once the initial execute
240-
// round-trip returns. Surface it once available (preferring the id captured
241-
// from the resolved `Statement`, then the live canceller slot) so a
242-
// cancelled/closed sync operation is traceable by the same id the server and
243-
// kernel logs key on, matching the async path (which has it from `submit`).
244-
// Falls back to the construction-time id (a client UUID) until then.
245-
return this.resolvedStatementId ?? this.cancellableExecution?.statementId ?? this._id;
238+
// STABLE for the operation's lifetime. The facade keys telemetry start/
239+
// complete on this value (DBSQLOperation → MetricsAggregator), so it must
240+
// NOT mutate — a sync op's server statement_id isn't known until `result()`
241+
// resolves (mid-execute), and flipping `id` then would split the start/
242+
// complete records across two keys and silently drop the summary. The
243+
// resolved server statement_id is instead surfaced via a debug log (see
244+
// `getFetchHandle`) for server/kernel log correlation. On the async path
245+
// `_id` already IS the server id (available at submit).
246+
return this._id;
246247
}
247248

248249
public hasResultSet(): boolean {
@@ -482,9 +483,20 @@ export default class SeaOperationBackend implements IOperationBackend {
482483
// Still Pending/Running — enforce the client-side timeout before sleeping.
483484
if (deadline !== undefined && Date.now() >= deadline) {
484485
// Best-effort server-side cancel so the statement doesn't keep running
485-
// after we stop waiting; never mask the timeout with a cancel failure.
486+
// after we stop waiting; never mask the timeout with a cancel failure,
487+
// but warn-log a failed cancel so a still-running server statement is
488+
// diagnosable (mirrors the fetch-error cleanup path).
486489
// eslint-disable-next-line no-await-in-loop
487-
await this.cancel().catch(() => undefined);
490+
await this.cancel().catch((cancelErr) => {
491+
const cause = cancelErr instanceof Error ? cancelErr.message : String(cancelErr);
492+
this.context
493+
.getLogger()
494+
.log(
495+
LogLevel.warn,
496+
`SEA query-timeout cleanup: cancel() failed for operation ${this._id}; the server-side ` +
497+
`statement may keep running until the session is closed. Cause: ${cause}`,
498+
);
499+
});
488500
throw new OperationStateError(OperationStateErrorCode.Timeout);
489501
}
490502

@@ -555,9 +567,15 @@ export default class SeaOperationBackend implements IOperationBackend {
555567
.result()
556568
.then((stmt) => {
557569
this.blockingStatement = stmt as unknown as SeaOperationStatement;
558-
// Capture the now-known server statement id (stable on the resolved
559-
// Statement) so `id` reports it for the rest of the lifecycle.
560-
this.resolvedStatementId = this.blockingStatement.statementId ?? this.resolvedStatementId;
570+
// Log the now-known server statement id (NOT surfaced via `id`,
571+
// which must stay stable for telemetry correlation) so a sync op is
572+
// correlatable to server/kernel logs by its client operation id.
573+
const serverId = this.blockingStatement.statementId;
574+
if (serverId && serverId !== this._id) {
575+
this.context
576+
.getLogger()
577+
.log(LogLevel.debug, `SEA operation ${this._id} resolved to server statement_id ${serverId}`);
578+
}
561579
return stmt as unknown as SeaFetchHandle;
562580
})
563581
.catch((err) => {

lib/sea/SeaSessionBackend.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ export interface SeaSessionBackendOptions {
5252
/**
5353
* SEA-backed implementation of `ISessionBackend`.
5454
*
55-
* **M0 scope:** `executeStatement` + `close`. Metadata methods
56-
* (`getCatalogs`, `getSchemas`, etc.) defer to M1 — they throw a clear
57-
* `HiveDriverError` so consumers using SEA against metadata APIs get an
58-
* actionable message instead of silently falling back. The Thrift
59-
* backend continues to handle the metadata path by default (callers
60-
* opt into SEA via `ConnectionOptions.useSEA`).
55+
* **Scope:** `executeStatement` (sync + async), `close`, `getInfo`, and the
56+
* full metadata surface (`getCatalogs`, `getSchemas`, `getTables`,
57+
* `getColumns`, `getFunctions`, `getTableTypes`, `getTypeInfo`,
58+
* `getPrimaryKeys`, `getCrossReference`) — each forwards to the kernel's napi
59+
* metadata calls (see `runMetadata`). The Thrift backend remains the default;
60+
* callers opt into the kernel path via `ConnectionOptions.useSEA`.
6161
*
6262
* **Session config flow:** catalog / schema / sessionConf are applied
6363
* once at session creation (kernel `Session::builder().defaults()` +
@@ -155,10 +155,13 @@ export default class SeaSessionBackend implements ISessionBackend {
155155
);
156156
}
157157

158-
// `runAsync` selects the kernel execution path, exactly mirroring the
159-
// Thrift backend's `runAsync` distinction (the only observable difference is
160-
// WHEN `executeStatement` resolves — the public API, result shape, schema,
161-
// and error classes are identical on both paths):
158+
// `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel-
159+
// specific use of the option — the Thrift backend hardcodes `runAsync: true`
160+
// on the wire and never reads `options.runAsync`, so the field is a no-op
161+
// there. The only observable difference between the two SEA paths is WHEN
162+
// `executeStatement` resolves; the public API, result shape, schema, and
163+
// error classes are identical on both (and to Thrift). See the option's
164+
// JSDoc in `IDBSQLSession` for the cross-backend contract.
162165
//
163166
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
164167
// `executeStatementCancellable`: the kernel blocks on `execute()`

lib/thrift-backend/ThriftSessionBackend.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,19 @@ export default class ThriftSessionBackend implements ISessionBackend {
170170
const driver = await this.context.getDriver();
171171
const clientConfig = this.context.getConfig();
172172

173+
// `rowLimit` / `statementConf` are kernel-backend (SEA) options with no
174+
// Thrift wire equivalent. Surface a debug breadcrumb rather than dropping
175+
// them silently, so a caller that set them on the Thrift path has signal.
176+
if (options.rowLimit !== undefined || options.statementConf !== undefined) {
177+
this.context
178+
.getLogger()
179+
.log(
180+
LogLevel.debug,
181+
'ThriftSessionBackend.executeStatement: rowLimit / statementConf are kernel-backend (useSEA) ' +
182+
'options and are ignored on the Thrift path.',
183+
);
184+
}
185+
173186
const request = new TExecuteStatementReq({
174187
sessionHandle: this.sessionHandle,
175188
statement,

tests/unit/sea/execution.test.ts

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -326,16 +326,16 @@ function makeBinding(connection: SeaConnection): SeaNativeBinding & {
326326
return Object.assign(binding, { openSessionStub });
327327
}
328328

329-
function makeContext(): IClientContext {
330-
const logger: IDBSQLLogger = {
329+
function makeContext(logger?: IDBSQLLogger): IClientContext {
330+
const log: IDBSQLLogger = logger ?? {
331331
log(_level: LogLevel, _message: string): void {
332332
// no-op
333333
},
334334
};
335335
const config = {} as ClientConfig;
336336
return {
337337
getConfig: () => config,
338-
getLogger: () => logger,
338+
getLogger: () => log,
339339
getConnectionProvider: async () => {
340340
throw new Error('not used by SEA backend');
341341
},
@@ -659,6 +659,37 @@ describe('SeaSessionBackend', () => {
659659
expect(conf?.query_tags).to.contain('team:x');
660660
});
661661

662+
it('queryTags wins over a query_tags key in statementConf (precedence on collision)', async () => {
663+
const connection = new FakeNativeConnection();
664+
const session = makeSession(connection);
665+
await session.executeStatement('SELECT 1', {
666+
statementConf: { query_tags: 'manual-raw-value' },
667+
queryTags: { team: 'x' },
668+
});
669+
const conf = (connection.lastOptions as { statementConf?: Record<string, string> }).statementConf;
670+
// The structured `queryTags` option overwrites a raw `query_tags` conf key —
671+
// a single, predictable wire value rather than two competing ones.
672+
expect(conf?.query_tags).to.contain('team:x').and.to.not.equal('manual-raw-value');
673+
});
674+
675+
it('maps a submit-time kernel error via logAndMapError on both paths', async () => {
676+
const envelope = `__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'SUBMIT_BOOM' })}`;
677+
for (const opts of [{}, { runAsync: true }]) {
678+
const connection = new FakeNativeConnection();
679+
connection.throwOnExecute = new Error(envelope); // fails executeStatementCancellable / submitStatement
680+
const session = makeSession(connection);
681+
let thrown: unknown;
682+
try {
683+
// eslint-disable-next-line no-await-in-loop
684+
await session.executeStatement('SELECT 1', opts);
685+
} catch (err) {
686+
thrown = err;
687+
}
688+
expect(thrown, `path ${JSON.stringify(opts)}`).to.be.instanceOf(HiveDriverError);
689+
expect((thrown as Error).message).to.match(/SUBMIT_BOOM/);
690+
}
691+
});
692+
662693
// Genuinely unsupported on SEA — rejected (rather than silently ignored) so
663694
// a caller/agent gets signal instead of a no-op. queryTags / queryTimeout /
664695
// rowLimit are NOT here — they are forwarded (asserted above).
@@ -994,18 +1025,42 @@ describe('SeaOperationBackend — sync (executeStatementCancellable) path', () =
9941025
).to.throw(HiveDriverError, /exactly one/);
9951026
});
9961027

997-
it('surfaces the resolved server statement id as op.id once the sync execute completes', async () => {
998-
const op = makeSyncOp(new FakeCancellableExecution());
999-
// Before result() resolves, the server id is not yet known (real
1000-
// `CancellableExecution.statementId` is null pre-round-trip), so op.id is the
1001-
// construction-time fallback — a client UUID, NOT the server statement id.
1028+
it('keeps op.id stable across the sync execute and logs the resolved server statement id', async () => {
1029+
// op.id MUST stay stable (the facade keys telemetry start/complete on it —
1030+
// a mid-flight flip to the server id would split the records and drop the
1031+
// summary). The server statement_id is surfaced via a debug log instead.
1032+
const logs: Array<{ level: LogLevel; message: string }> = [];
1033+
const logger: IDBSQLLogger = { log: (level, message) => logs.push({ level, message }) };
1034+
const op = new SeaOperationBackend({
1035+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
1036+
cancellableExecution: new FakeCancellableExecution() as any,
1037+
context: makeContext(logger),
1038+
});
10021039
const idBefore = op.id;
10031040
expect(idBefore).to.be.a('string').and.have.length.greaterThan(0);
1004-
expect(idBefore).to.not.equal('01ef-fake-statement-id');
1005-
// Driving the blocking execute to terminal publishes the server id; op.id
1006-
// then reports it (parity with the async path, traceable in server logs).
10071041
await op.waitUntilReady();
1008-
expect(op.id).to.equal('01ef-fake-statement-id');
1042+
// Stable: driving result() to terminal does NOT mutate the id.
1043+
expect(op.id).to.equal(idBefore);
1044+
// But the now-known server statement_id is logged for correlation.
1045+
expect(logs.some((l) => l.message.includes('01ef-fake-statement-id'))).to.equal(true);
1046+
});
1047+
1048+
it('surfaces the kernel SQL-error envelope when a sync result() rejects (Failed)', async () => {
1049+
const exec = new FakeCancellableExecution();
1050+
// The kernel rejects result() with a sentinel-framed structured error;
1051+
// decodeNapiKernelError turns it into a typed HiveDriverError (sync path).
1052+
exec.resultError = new Error(
1053+
`__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'TABLE_OR_VIEW_NOT_FOUND' })}`,
1054+
);
1055+
const op = makeSyncOp(exec);
1056+
let thrown: unknown;
1057+
try {
1058+
await op.waitUntilReady();
1059+
} catch (err) {
1060+
thrown = err;
1061+
}
1062+
expect(thrown).to.be.instanceOf(HiveDriverError);
1063+
expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/);
10091064
});
10101065

10111066
it('status() reports Running before result() and Succeeded after', async () => {

0 commit comments

Comments
 (0)