-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathKernelSessionBackend.ts
More file actions
468 lines (435 loc) · 21.2 KB
/
Copy pathKernelSessionBackend.ts
File metadata and controls
468 lines (435 loc) · 21.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
// Copyright (c) 2026 Databricks, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { v4 as uuidv4 } from 'uuid';
import ISessionBackend from '../contracts/ISessionBackend';
import IOperationBackend from '../contracts/IOperationBackend';
import IClientContext from '../contracts/IClientContext';
import {
ExecuteStatementOptions,
TypeInfoRequest,
CatalogsRequest,
SchemasRequest,
TablesRequest,
TableTypesRequest,
ColumnsRequest,
FunctionsRequest,
PrimaryKeysRequest,
CrossReferenceRequest,
} from '../contracts/IDBSQLSession';
import Status from '../dto/Status';
import InfoValue from '../dto/InfoValue';
import HiveDriverError from '../errors/HiveDriverError';
import ParameterError from '../errors/ParameterError';
import { LogLevel } from '../contracts/IDBSQLLogger';
import {
KernelConnection,
KernelNativeExecuteOptions,
KernelStatement,
KernelNativeAsyncStatement,
} from './KernelNativeLoader';
import { decodeNapiKernelError } from './KernelErrorMapping';
import KernelOperationBackend from './KernelOperationBackend';
import { buildKernelPositionalParams, buildKernelNamedParams } from './KernelPositionalParams';
import { kernelServerInfoValue } from './KernelServerInfo';
import { serializeQueryTags } from '../utils';
export interface KernelSessionBackendOptions {
/** The opaque napi `Connection` handle returned by `openSession`. */
connection: KernelConnection;
context: IClientContext;
/** Optional override for `id`. Defaults to a fresh UUIDv4. */
id?: string;
}
/**
* kernel-backed implementation of `ISessionBackend`.
*
* **Scope:** `executeStatement` (sync + async), `close`, `getInfo`, and the
* full metadata surface (`getCatalogs`, `getSchemas`, `getTables`,
* `getColumns`, `getFunctions`, `getTableTypes`, `getTypeInfo`,
* `getPrimaryKeys`, `getCrossReference`) — each forwards to the kernel's napi
* metadata calls (see `runMetadata`). The Thrift backend remains the default;
* callers opt into the kernel path via `ConnectionOptions.useKernel`.
*
* **Session config flow:** catalog / schema / sessionConf are applied
* once at session creation (kernel `Session::builder().defaults()` +
* `.session_conf()` → SEA `CreateSession.catalog` / `.schema` /
* `.session_confs`) and remain in effect for every statement run on
* the resulting napi `Connection`. No per-statement forwarding is
* needed — that pattern was removed when the napi binding moved these
* onto `openSession` to match pyo3.
*/
/**
* Narrow the directResults union (`executeStatementDirect`'s
* `Statement | AsyncStatement`) to the Running `AsyncStatement` arm. Only that
* arm exposes `awaitResult`; the terminal `Statement` (Completed arm) does not.
* Mirrors the kernel `DirectStatement::{Completed, Running}` discriminant, which
* the opaque napi classes can't carry on the wire — the `awaitResult` probe is
* the load-bearing feature-detect (see databricks-sql-kernel#140).
*/
function isKernelAsyncStatement(x: KernelStatement | KernelNativeAsyncStatement): x is KernelNativeAsyncStatement {
return typeof (x as KernelNativeAsyncStatement).awaitResult === 'function';
}
export default class KernelSessionBackend implements ISessionBackend {
private readonly connection: KernelConnection;
private readonly context: IClientContext;
private readonly _id: string;
private closed = false;
constructor({ connection, context, id }: KernelSessionBackendOptions) {
this.connection = connection;
this.context = context;
this._id = id ?? uuidv4();
}
public get id(): string {
return this._id;
}
/**
* `getInfo` (JDBC `DatabaseMetaData` / ODBC `SQLGetInfo`) has no kernel
* endpoint, so — exactly as JDBC does for `DatabaseMetaData` — we synthesize
* the answer client-side for the three `TGetInfoType`s the Databricks server
* answers (server name / DBMS name / DBMS version) and reject the rest.
*
* This is NOT a kernel-only contract narrowing: probing the live warehouse over
* the Thrift path confirms the server itself returns an error for every
* other `TGetInfoType` (CLI_MAX_DRIVER_CONNECTIONS, CLI_DATA_SOURCE_NAME, …),
* and the three values it does answer are byte-identical to the constants we
* synthesize (`"Spark SQL"` / `"Spark SQL"` / `"3.1.1"`, re-verified live).
* So rejecting an unsupported type matches Thrift's effective behaviour — we
* just surface a clearer, typed error than the server's opaque one. See
* {@link kernelServerInfoValue}.
*/
public async getInfo(infoType: number): Promise<InfoValue> {
this.failIfClosed();
const value = kernelServerInfoValue(infoType);
if (value === undefined) {
throw new HiveDriverError(
`kernel getInfo: unsupported TGetInfoType ${infoType}. Only the info types the Databricks ` +
`server itself answers are supported: CLI_SERVER_NAME (13), CLI_DBMS_NAME (17), ` +
`CLI_DBMS_VER (18). The server rejects every other type on the Thrift path too, so this ` +
`is not a kernel-specific restriction.`,
);
}
return new InfoValue(value);
}
/**
* Execute a SQL statement through the napi binding.
*
* Catalog / schema / sessionConf are session-level (applied at open).
* Per-statement options forwarded to the kernel `ExecuteOptions`:
* - `ordinalParameters` / `namedParameters` → bound params (mutually
* exclusive — the kernel binds one placeholder style per statement);
* - `queryTimeout` → NO-OP on kernel (SQL Warehouses use `STATEMENT_TIMEOUT`);
* never forwarded to the kernel and never applied as a client-side
* deadline — see the note in `executeStatement`;
* - `rowLimit` → `rowLimit` (kernel-only server-side row cap);
* - `queryTags` → serialised into the conf overlay's reserved
* `query_tags` key (the same wire shape Thrift's `serializeQueryTags`
* produces), merged with any explicit `statementConf`.
*
* Still rejected (genuinely unsupported on kernel, rather than silently
* dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a
* per-statement knob), `useLZ4Compression` (kernel owns result compression),
* and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by
* the facade at fetch time, so it is intentionally not handled here.
*/
public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise<IOperationBackend> {
this.failIfClosed();
// Per-statement options the kernel backend doesn't honour are treated as
// NO-OPs (logged at warn), not errors — so call sites written for the Thrift
// backend are drop-in on the kernel path. These are perf/format hints the
// kernel governs internally (useCloudFetch / useLZ4Compression) or a feature
// it doesn't expose yet (staging); ignoring them cannot change query results.
// NB: parameter binding (compound/BINARY) is deliberately NOT no-op'd — a
// dropped param would silently change results, so it still throws.
if (options.useCloudFetch !== undefined) {
this.context
.getLogger()
.log(
LogLevel.warn,
'kernel executeStatement: ignoring per-statement `useCloudFetch` — result fetching is governed by the kernel result configuration (no-op on kernel).',
);
}
if (options.useLZ4Compression !== undefined) {
this.context
.getLogger()
.log(
LogLevel.warn,
'kernel executeStatement: ignoring per-statement `useLZ4Compression` — result compression is governed by the kernel (no-op on kernel).',
);
}
if (options.stagingAllowedLocalPath !== undefined) {
this.context
.getLogger()
.log(
LogLevel.warn,
'kernel executeStatement: ignoring `stagingAllowedLocalPath` — volume/staging operations are not yet supported on the kernel backend (no-op).',
);
}
// `runAsync` selects the kernel execution path. NOTE: this is a kernel-
// specific use of the option — the Thrift backend hardcodes `runAsync: true`
// on the wire and never reads `options.runAsync`, so the field is a no-op
// there. The only observable difference between the two kernel paths is WHEN
// `executeStatement` resolves; the public API, result shape, schema, and
// error classes are identical on both (and to Thrift). See the option's
// JSDoc in `IDBSQLSession` for the cross-backend contract.
//
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
// `executeStatementDirect` (the directResults model): the kernel sends
// ExecuteStatement with the server's inline wait and returns WITHOUT
// polling past it — a terminal `Statement` (result inline) for a fast
// query, or a still-running `AsyncStatement` (poll/cancel handle) for a
// slow one. The handle is server-owned, so a long query stays cancellable
// via `op.cancel()` and `close()` is a clean release (no client-side
// drive-to-terminal). `queryTimeout` is a no-op here (see the note
// below) — it is NOT mapped to the server `wait_timeout`.
//
// - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server
// returns a pending `AsyncStatement` immediately while the query runs;
// the backend polls `status()` to terminal in `waitUntilReady()` and
// materialises results via `awaitResult()`. `queryTimeout` is a no-op
// here too.
const runAsync = options.runAsync ?? false;
// `queryTimeout` is a NO-OP on the kernel (kernel) backend. It is the JDBC
// `setQueryTimeout` knob which — per the option's JSDoc — is effective only
// on Compute clusters; SQL Warehouses (what kernel targets) use the
// `STATEMENT_TIMEOUT` SQL/session conf instead. We deliberately do NOT map it
// to the kernel `wait_timeout` field: `wait_timeout` is the server's inline-hold
// window (the time the POST blocks for results, capped 5–50s), NOT a
// statement-execution timeout — mapping it there silently caps the timeout at
// 50s, rejects <5s with HTTP 400, and conflates the inline wait with
// execution. So `queryTimeout` is neither forwarded to the kernel nor used as
// a client-side deadline.
if (!runAsync) {
// DEFAULT — directResults (the Thrift/JDBC model). The kernel's
// `executeStatementDirect` runs ExecuteStatement with a bounded server
// inline wait and returns WITHOUT polling past it:
// - a terminal `Statement` (result inline) for a fast query, or
// - an `AsyncStatement` (a poll/cancel handle) for a slow one.
// Either way the handle is tied to a server-owned statement, so a long
// query stays cancellable via `op.cancel()` and `close()` is a clean
// release (no client-side drive-to-terminal). Fire-and-forget DDL/DML
// commits because the server runs it inline during the POST.
const execOptions = this.buildExecuteOptions(options);
let direct: KernelStatement | KernelNativeAsyncStatement;
try {
direct =
execOptions === undefined
? await this.connection.executeStatementDirect(statement)
: await this.connection.executeStatementDirect(statement, execOptions);
} catch (err) {
throw this.logAndMapError('executeStatement', err);
}
// The kernel contract (`Promise<Statement | AsyncStatement>`) never yields
// null/undefined; reject a non-handle up front so a contract violation
// surfaces as a mapped driver error here rather than an opaque `TypeError`
// deferred to first fetch/close.
if (direct === null || direct === undefined) {
throw this.logAndMapError(
'executeStatement',
new HiveDriverError('kernel executeStatementDirect returned no statement handle'),
);
}
// Feature-detect the arm via a type guard: only the Running `AsyncStatement`
// exposes `awaitResult`; the terminal `Statement` (Completed arm) does not.
if (isKernelAsyncStatement(direct)) {
return new KernelOperationBackend({ asyncStatement: direct, context: this.context });
}
return new KernelOperationBackend({ statement: direct, context: this.context });
}
// Async path: submit (`wait_timeout=0s`) returns a pending handle the
// backend polls. (`queryTimeout` is a no-op on kernel — see the note above.)
const execOptions = this.buildExecuteOptions(options);
let asyncStatement;
try {
asyncStatement =
execOptions === undefined
? await this.connection.submitStatement(statement)
: await this.connection.submitStatement(statement, execOptions);
} catch (err) {
throw this.logAndMapError('executeStatement', err);
}
// `queryTimeout` is a no-op on kernel (see the note at the top of this method);
// not forwarded to the kernel and not applied as a client-side deadline.
return new KernelOperationBackend({
asyncStatement: asyncStatement!,
context: this.context,
});
}
/**
* Translate the public `ExecuteStatementOptions` into the kernel napi
* `ExecuteOptions`, returning `undefined` when nothing is set so the
* no-options call shape (`executeStatement(sql)`) is preserved.
*/
private buildExecuteOptions(options: ExecuteStatementOptions): KernelNativeExecuteOptions | undefined {
// Positional (`?`) and named (`:name`) parameters are mutually exclusive —
// the kernel binds one placeholder style per statement. Use the SAME error
// type and message as the Thrift backend (`ThriftSessionBackend`) so a
// caller catching `ParameterError` behaves identically across backends.
const positionalParams = buildKernelPositionalParams(options.ordinalParameters);
const namedParams = buildKernelNamedParams(options.namedParameters);
if (positionalParams !== undefined && namedParams !== undefined) {
throw new ParameterError('Driver does not support both ordinal and named parameters.');
}
const execOptions: KernelNativeExecuteOptions = {};
if (positionalParams !== undefined) {
execOptions.positionalParams = positionalParams;
}
if (namedParams !== undefined) {
execOptions.namedParams = namedParams;
}
// NB: `queryTimeout` is intentionally NOT forwarded — it is a no-op on kernel
// (SQL Warehouses use `STATEMENT_TIMEOUT`; mapping it to `wait_timeout` would
// abuse the inline-hold window). See the note in `executeStatement`.
if (options.rowLimit !== undefined) {
execOptions.rowLimit = Number(options.rowLimit);
}
// Per-statement conf overlay plus query tags. Tags are serialised JS-side
// into the reserved `query_tags` key (the same wire shape the Thrift
// backend produces via `serializeQueryTags` → `confOverlay`), rather than
// via the napi `queryTags` field: napi's `HashMap<String,String>` can't
// represent a null-valued tag, and the kernel rejects setting both the
// `queryTags` field and a `query_tags` conf key.
const serializedQueryTags = serializeQueryTags(options.queryTags);
if (options.statementConf !== undefined || serializedQueryTags !== undefined) {
const statementConf: Record<string, string> = { ...(options.statementConf ?? {}) };
if (serializedQueryTags !== undefined) {
statementConf.query_tags = serializedQueryTags;
}
if (Object.keys(statementConf).length > 0) {
execOptions.statementConf = statementConf;
}
}
return Object.keys(execOptions).length > 0 ? execOptions : undefined;
}
/** Wrap a napi metadata `Statement` (already terminal) as an operation backend. */
private wrapStatement(nativeStatement: KernelStatement): IOperationBackend {
return new KernelOperationBackend({
statement: nativeStatement,
context: this.context,
id: nativeStatement.statementId,
});
}
/**
* Metadata calls forward to the kernel's metadata surface (`listCatalogs`,
* `listTables`, …), each of which returns a napi `Statement` whose result
* carries the JDBC-shaped columns. We wrap that handle exactly like an
* executed statement. The kernel owns the SQL synthesis, the column
* projection, and (for `listTables`) the client-side `TABLE_TYPE` filter —
* the driver only maps the request fields to positional arguments.
*
* The `runAsync` / `maxRows` request fields are not threaded here: `runAsync`
* is deprecated, and `maxRows` is applied by the facade at fetch time (same
* as the Thrift path), so the napi call takes only the filter arguments.
*/
public async getTypeInfo(_request: TypeInfoRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listTypeInfo());
}
public async getCatalogs(_request: CatalogsRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listCatalogs());
}
public async getSchemas(request: SchemasRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listSchemas(request.catalogName, request.schemaName));
}
public async getTables(request: TablesRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.listTables(request.catalogName, request.schemaName, request.tableName, request.tableTypes),
);
}
public async getTableTypes(_request: TableTypesRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listTableTypes());
}
public async getColumns(request: ColumnsRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.listColumns(request.catalogName, request.schemaName, request.tableName, request.columnName),
);
}
public async getFunctions(request: FunctionsRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.listFunctions(request.catalogName, request.schemaName, request.functionName),
);
}
public async getPrimaryKeys(request: PrimaryKeysRequest): Promise<IOperationBackend> {
this.failIfClosed();
// The kernel requires a catalog for primary-key lookup (`Identifier::new`
// rejects an empty string). The Thrift backend can forward an undefined
// catalog and let the server resolve a default; the kernel path cannot,
// so reject up front with a clear, actionable message rather than passing
// `''` and surfacing the kernel's opaque "identifier must not be empty".
if (request.catalogName === undefined || request.catalogName === '') {
throw new HiveDriverError(
'kernel getPrimaryKeys requires a catalog — pass `catalogName` explicitly. (The Thrift backend ' +
'can omit it and let the server resolve a default; the kernel kernel path requires it.)',
);
}
return this.runMetadata(() =>
this.connection.getPrimaryKeys(request.catalogName as string, request.schemaName, request.tableName),
);
}
public async getCrossReference(request: CrossReferenceRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.getCrossReference(
request.parentCatalogName,
request.parentSchemaName,
request.parentTableName,
request.foreignCatalogName,
request.foreignSchemaName,
request.foreignTableName,
),
);
}
/** Run a napi metadata call, mapping kernel errors and wrapping the result handle. */
private async runMetadata(call: () => Promise<KernelStatement>): Promise<IOperationBackend> {
let nativeStatement: KernelStatement;
try {
nativeStatement = await call();
} catch (err) {
throw this.logAndMapError('metadata', err);
}
return this.wrapStatement(nativeStatement);
}
/**
* Map a napi/kernel error to a typed driver error and emit a debug breadcrumb
* first, matching the rest of the kernel backend's logging convention
* (`KernelOperationLifecycle` / `KernelOperationBackend`). Metadata and bound-param
* execute failures otherwise threw with no on-call signal.
*/
private logAndMapError(op: string, err: unknown): Error {
const mapped = decodeNapiKernelError(err);
this.context.getLogger().log(LogLevel.debug, `kernel ${op} failed for session ${this._id}: ${mapped.message}`);
return mapped;
}
public async close(): Promise<Status> {
if (this.closed) {
return Status.success();
}
try {
await this.connection.close();
} catch (err) {
throw decodeNapiKernelError(err);
}
this.closed = true;
return Status.success();
}
private failIfClosed(): void {
if (this.closed) {
throw new HiveDriverError('KernelSessionBackend: session is closed');
}
}
}