-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathSeaSessionBackend.ts
More file actions
432 lines (401 loc) · 19.1 KB
/
Copy pathSeaSessionBackend.ts
File metadata and controls
432 lines (401 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
// Copyright (c) 2026 Databricks, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { v4 as uuidv4 } from 'uuid';
import ISessionBackend from '../contracts/ISessionBackend';
import IOperationBackend from '../contracts/IOperationBackend';
import IClientContext from '../contracts/IClientContext';
import {
ExecuteStatementOptions,
TypeInfoRequest,
CatalogsRequest,
SchemasRequest,
TablesRequest,
TableTypesRequest,
ColumnsRequest,
FunctionsRequest,
PrimaryKeysRequest,
CrossReferenceRequest,
} from '../contracts/IDBSQLSession';
import Status from '../dto/Status';
import InfoValue from '../dto/InfoValue';
import HiveDriverError from '../errors/HiveDriverError';
import ParameterError from '../errors/ParameterError';
import { LogLevel } from '../contracts/IDBSQLLogger';
import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader';
import { decodeNapiKernelError } from './SeaErrorMapping';
import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend';
import SeaOperationBackend from './SeaOperationBackend';
import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams';
import { seaServerInfoValue } from './SeaServerInfo';
import { serializeQueryTags } from '../utils';
export interface SeaSessionBackendOptions {
/** The opaque napi `Connection` handle returned by `openSession`. */
connection: SeaConnection;
context: IClientContext;
/** Optional override for `id`. Defaults to a fresh UUIDv4. */
id?: string;
}
/**
* SEA-backed implementation of `ISessionBackend`.
*
* **Scope:** `executeStatement` (sync + async), `close`, `getInfo`, and the
* full metadata surface (`getCatalogs`, `getSchemas`, `getTables`,
* `getColumns`, `getFunctions`, `getTableTypes`, `getTypeInfo`,
* `getPrimaryKeys`, `getCrossReference`) — each forwards to the kernel's napi
* metadata calls (see `runMetadata`). The Thrift backend remains the default;
* callers opt into the kernel path via `ConnectionOptions.useSEA`.
*
* **Session config flow:** catalog / schema / sessionConf are applied
* once at session creation (kernel `Session::builder().defaults()` +
* `.session_conf()` → SEA `CreateSession.catalog` / `.schema` /
* `.session_confs`) and remain in effect for every statement run on
* the resulting napi `Connection`. No per-statement forwarding is
* needed — that pattern was removed when the napi binding moved these
* onto `openSession` to match pyo3.
*/
export default class SeaSessionBackend implements ISessionBackend {
private readonly connection: SeaConnection;
private readonly context: IClientContext;
private readonly _id: string;
private closed = false;
constructor({ connection, context, id }: SeaSessionBackendOptions) {
this.connection = connection;
this.context = context;
this._id = id ?? uuidv4();
}
public get id(): string {
return this._id;
}
/**
* `getInfo` (JDBC `DatabaseMetaData` / ODBC `SQLGetInfo`) has no SEA/kernel
* endpoint, so — exactly as JDBC does for `DatabaseMetaData` — we synthesize
* the answer client-side for the three `TGetInfoType`s the Databricks server
* answers (server name / DBMS name / DBMS version) and reject the rest.
*
* This is NOT a SEA-only contract narrowing: probing the live warehouse over
* the Thrift path confirms the server itself returns an error for every
* other `TGetInfoType` (CLI_MAX_DRIVER_CONNECTIONS, CLI_DATA_SOURCE_NAME, …),
* and the three values it does answer are byte-identical to the constants we
* synthesize (`"Spark SQL"` / `"Spark SQL"` / `"3.1.1"`, re-verified live).
* So rejecting an unsupported type matches Thrift's effective behaviour — we
* just surface a clearer, typed error than the server's opaque one. See
* {@link seaServerInfoValue}.
*/
public async getInfo(infoType: number): Promise<InfoValue> {
this.failIfClosed();
const value = seaServerInfoValue(infoType);
if (value === undefined) {
throw new HiveDriverError(
`SEA getInfo: unsupported TGetInfoType ${infoType}. Only the info types the Databricks ` +
`server itself answers are supported: CLI_SERVER_NAME (13), CLI_DBMS_NAME (17), ` +
`CLI_DBMS_VER (18). The server rejects every other type on the Thrift path too, so this ` +
`is not a SEA-specific restriction.`,
);
}
return new InfoValue(value);
}
/**
* Execute a SQL statement through the napi binding.
*
* Catalog / schema / sessionConf are session-level (applied at open).
* Per-statement options forwarded to the kernel `ExecuteOptions`:
* - `ordinalParameters` / `namedParameters` → bound params (mutually
* exclusive — the kernel binds one placeholder style per statement);
* - `queryTimeout` → enforced client-side by the operation backend's poll
* deadline (the kernel ignores `queryTimeoutSecs` on the async submit
* path), NOT forwarded to the napi options;
* - `rowLimit` → `rowLimit` (SEA-only server-side row cap);
* - `queryTags` → serialised into the conf overlay's reserved
* `query_tags` key (the same wire shape Thrift's `serializeQueryTags`
* produces), merged with any explicit `statementConf`.
*
* Still rejected (genuinely unsupported on SEA, rather than silently
* dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a
* per-statement knob), `useLZ4Compression` (kernel owns result compression),
* and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by
* the facade at fetch time, so it is intentionally not handled here.
*/
public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise<IOperationBackend> {
this.failIfClosed();
if (options.useCloudFetch !== undefined) {
throw new HiveDriverError(
'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA',
);
}
if (options.useLZ4Compression !== undefined) {
throw new HiveDriverError(
'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)',
);
}
if (options.stagingAllowedLocalPath !== undefined) {
throw new HiveDriverError(
'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA',
);
}
// `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel-
// specific use of the option — the Thrift backend hardcodes `runAsync: true`
// on the wire and never reads `options.runAsync`, so the field is a no-op
// there. The only observable difference between the two SEA paths is WHEN
// `executeStatement` resolves; the public API, result shape, schema, and
// error classes are identical on both (and to Thrift). See the option's
// JSDoc in `IDBSQLSession` for the cross-backend contract.
//
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
// `executeStatementCancellable`: the kernel blocks on `execute()`
// (server-side direct-results / poll-to-terminal), which is faster and,
// with the napi sync canceller, fully cancellable mid-COMPUTE. The
// blocking drive runs in the operation backend's `result()` (inside
// `waitUntilReady`, which the facade invokes lazily at first fetch).
// `queryTimeoutSecs` IS honoured on this path (forwarded to the napi
// options below) since the kernel `execute()` consults it.
//
// - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server
// returns a pending `AsyncStatement` immediately while the query runs;
// the backend polls `status()` to terminal in `waitUntilReady()` and
// materialises results via `awaitResult()`. `queryTimeoutSecs` is
// ignored by the kernel on submit, so it is enforced client-side by the
// operation backend's poll-loop deadline instead.
const runAsync = options.runAsync ?? false;
const queryTimeoutSecs =
options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined;
if (!runAsync) {
// DEFAULT — directResults (the Thrift / JDBC / Python use_sea model). The
// kernel sends ExecuteStatement with the server inline wait (~10s default
// + on_wait_timeout=CONTINUE) and returns WITHOUT polling past it, as a
// single `AsyncStatement` handle:
// - a fast query comes back seeded with the inline result, so the first
// fetch/status is served with zero extra round-trips (and is
// 404-proof — a terminal handle never polls a released statement);
// - a slow query comes back as a poll/cancel handle.
// Either way the handle is tied to a server-owned statement, so a long
// query stays cancellable (`asyncStatement.cancel()`) and `close()` is a
// clean release — no eager-handle / close-drives workaround. Fire-and-
// forget DDL/DML commits because the server runs it inline during the POST.
const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs);
let asyncStatement;
try {
asyncStatement =
execOptions === undefined
? await this.connection.executeStatement(statement)
: await this.connection.executeStatement(statement, execOptions);
} catch (err) {
throw this.logAndMapError('executeStatement', err);
}
return new SeaOperationBackend({
asyncStatement,
context: this.context,
queryTimeoutSecs,
});
}
// Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on
// submit — `wait_timeout=0s`); it is enforced client-side by the poll loop.
const execOptions = this.buildExecuteOptions(options);
let asyncStatement;
try {
asyncStatement =
execOptions === undefined
? await this.connection.submitStatement(statement)
: await this.connection.submitStatement(statement, execOptions);
} catch (err) {
throw this.logAndMapError('executeStatement', err);
}
// `queryTimeout` is enforced client-side by the operation backend's poll
// loop: the kernel ignores `queryTimeoutSecs` on the async submit path
// (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward
// it to the napi options — passing it there would be a silent no-op.
return new SeaOperationBackend({
asyncStatement: asyncStatement!,
context: this.context,
// `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()`
// coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf).
queryTimeoutSecs,
});
}
/**
* Translate the public `ExecuteStatementOptions` into the kernel napi
* `ExecuteOptions`, returning `undefined` when nothing is set so the
* no-options call shape (`executeStatement(sql)`) is preserved.
*/
private buildExecuteOptions(
options: ExecuteStatementOptions,
queryTimeoutSecs?: number,
): SeaNativeExecuteOptions | undefined {
// Positional (`?`) and named (`:name`) parameters are mutually exclusive —
// the kernel binds one placeholder style per statement. Use the SAME error
// type and message as the Thrift backend (`ThriftSessionBackend`) so a
// caller catching `ParameterError` behaves identically across backends.
const positionalParams = buildSeaPositionalParams(options.ordinalParameters);
const namedParams = buildSeaNamedParams(options.namedParameters);
if (positionalParams !== undefined && namedParams !== undefined) {
throw new ParameterError('Driver does not support both ordinal and named parameters.');
}
const execOptions: SeaNativeExecuteOptions = {};
if (positionalParams !== undefined) {
execOptions.positionalParams = positionalParams;
}
if (namedParams !== undefined) {
execOptions.namedParams = namedParams;
}
// `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes
// it in): the kernel `execute()` consults it as the server statement
// timeout. On the async submit path the caller omits it (the kernel ignores
// it under `wait_timeout=0s`), so it is enforced client-side by the
// operation backend's poll-loop deadline instead (see executeStatement).
if (queryTimeoutSecs !== undefined) {
execOptions.queryTimeoutSecs = queryTimeoutSecs;
}
if (options.rowLimit !== undefined) {
execOptions.rowLimit = Number(options.rowLimit);
}
// Per-statement conf overlay plus query tags. Tags are serialised JS-side
// into the reserved `query_tags` key (the same wire shape the Thrift
// backend produces via `serializeQueryTags` → `confOverlay`), rather than
// via the napi `queryTags` field: napi's `HashMap<String,String>` can't
// represent a null-valued tag, and the kernel rejects setting both the
// `queryTags` field and a `query_tags` conf key.
const serializedQueryTags = serializeQueryTags(options.queryTags);
if (options.statementConf !== undefined || serializedQueryTags !== undefined) {
const statementConf: Record<string, string> = { ...(options.statementConf ?? {}) };
if (serializedQueryTags !== undefined) {
statementConf.query_tags = serializedQueryTags;
}
if (Object.keys(statementConf).length > 0) {
execOptions.statementConf = statementConf;
}
}
return Object.keys(execOptions).length > 0 ? execOptions : undefined;
}
/** Wrap a napi metadata `Statement` (already terminal) as an operation backend. */
private wrapStatement(nativeStatement: SeaStatement): IOperationBackend {
return new SeaOperationBackend({
statement: nativeStatement,
context: this.context,
id: nativeStatement.statementId,
});
}
/**
* Metadata calls forward to the kernel's metadata surface (`listCatalogs`,
* `listTables`, …), each of which returns a napi `Statement` whose result
* carries the JDBC-shaped columns. We wrap that handle exactly like an
* executed statement. The kernel owns the SQL synthesis, the column
* projection, and (for `listTables`) the client-side `TABLE_TYPE` filter —
* the driver only maps the request fields to positional arguments.
*
* The `runAsync` / `maxRows` request fields are not threaded here: `runAsync`
* is deprecated, and `maxRows` is applied by the facade at fetch time (same
* as the Thrift path), so the napi call takes only the filter arguments.
*/
public async getTypeInfo(_request: TypeInfoRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listTypeInfo());
}
public async getCatalogs(_request: CatalogsRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listCatalogs());
}
public async getSchemas(request: SchemasRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listSchemas(request.catalogName, request.schemaName));
}
public async getTables(request: TablesRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.listTables(request.catalogName, request.schemaName, request.tableName, request.tableTypes),
);
}
public async getTableTypes(_request: TableTypesRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() => this.connection.listTableTypes());
}
public async getColumns(request: ColumnsRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.listColumns(request.catalogName, request.schemaName, request.tableName, request.columnName),
);
}
public async getFunctions(request: FunctionsRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.listFunctions(request.catalogName, request.schemaName, request.functionName),
);
}
public async getPrimaryKeys(request: PrimaryKeysRequest): Promise<IOperationBackend> {
this.failIfClosed();
// The kernel requires a catalog for primary-key lookup (`Identifier::new`
// rejects an empty string). The Thrift backend can forward an undefined
// catalog and let the server resolve a default; the SEA/kernel path cannot,
// so reject up front with a clear, actionable message rather than passing
// `''` and surfacing the kernel's opaque "identifier must not be empty".
if (request.catalogName === undefined || request.catalogName === '') {
throw new HiveDriverError(
'SEA getPrimaryKeys requires a catalog — pass `catalogName` explicitly. (The Thrift backend ' +
'can omit it and let the server resolve a default; the SEA kernel path requires it.)',
);
}
return this.runMetadata(() =>
this.connection.getPrimaryKeys(request.catalogName as string, request.schemaName, request.tableName),
);
}
public async getCrossReference(request: CrossReferenceRequest): Promise<IOperationBackend> {
this.failIfClosed();
return this.runMetadata(() =>
this.connection.getCrossReference(
request.parentCatalogName,
request.parentSchemaName,
request.parentTableName,
request.foreignCatalogName,
request.foreignSchemaName,
request.foreignTableName,
),
);
}
/** Run a napi metadata call, mapping kernel errors and wrapping the result handle. */
private async runMetadata(call: () => Promise<SeaStatement>): Promise<IOperationBackend> {
let nativeStatement: SeaStatement;
try {
nativeStatement = await call();
} catch (err) {
throw this.logAndMapError('metadata', err);
}
return this.wrapStatement(nativeStatement);
}
/**
* Map a napi/kernel error to a typed driver error and emit a debug breadcrumb
* first, matching the rest of the SEA backend's logging convention
* (`SeaOperationLifecycle` / `SeaOperationBackend`). Metadata and bound-param
* execute failures otherwise threw with no on-call signal.
*/
private logAndMapError(op: string, err: unknown): Error {
const mapped = decodeNapiKernelError(err);
this.context.getLogger().log(LogLevel.debug, `SEA ${op} failed for session ${this._id}: ${mapped.message}`);
return mapped;
}
public async close(): Promise<Status> {
if (this.closed) {
return Status.success();
}
try {
await this.connection.close();
} catch (err) {
throw decodeNapiKernelError(err);
}
this.closed = true;
return Status.success();
}
private failIfClosed(): void {
if (this.closed) {
throw new HiveDriverError('SeaSessionBackend: session is closed');
}
}
}