Skip to content

Commit a3bf320

Browse files
authored
[SEA-NodeJS] (1/8) Backend abstraction — IBackend / ISessionBackend / IOperationBackend (#378)
* sea-abstraction: introduce IBackend / ISessionBackend / IOperationBackend Refactors DBSQLClient/Session/Operation to dispatch through three backend interfaces. ThriftBackend (lib/thrift-backend/) contains the relocated existing thrift logic. SeaBackend (lib/sea/) is a stub for M0; the sea-napi-binding feature wires the real impl. Public surface (lib/index.ts) unchanged. No new dependencies. All existing tests pass. Files: - lib/contracts/IBackend.ts (new) - lib/contracts/ISessionBackend.ts (new) - lib/contracts/IOperationBackend.ts (new) - lib/contracts/IDBSQLClient.ts (adds useSEA?: boolean to ConnectionOptions) - lib/thrift-backend/ThriftBackend.ts (new) - lib/thrift-backend/ThriftSessionBackend.ts (new) - lib/thrift-backend/ThriftOperationBackend.ts (new) - lib/sea/SeaBackend.ts (new, M0 stub) - lib/DBSQLClient.ts (dispatch through IBackend; useSEA picks SeaBackend) - lib/DBSQLSession.ts (facade over ISessionBackend; staging stays here) - lib/DBSQLOperation.ts (facade over IOperationBackend; iterators/fetchAll stay here) - tests/unit/DBSQLClient.test.ts (retarget internal state lookup through backend; pre-seed client.backend in tests that bypass connect()) - tests/unit/DBSQLOperation.test.ts (retarget internal state lookup through backend) * sea-abstraction: cleanup — restore JSDoc, dedupe test pre-seed, fix inline type Addresses code-bloat-watchdog findings from commit 0085928: - Restores public-API JSDoc on DBSQLSession + DBSQLOperation methods (was deleted as scope creep; contracts unchanged so docs still apply) - Adds makeStubbedClient() helper to tests/unit/DBSQLClient.test.ts; replaces 14× duplicated ThriftBackend pre-seed - Imports WaitUntilReadyOptions instead of inline option types in IOperationBackend + DBSQLOperation.waitUntilReady * sea-abstraction: address full-review findings (F1-F17 except F5) Round-N fixes from the 9-reviewer pre-review. Public IOperation/DBSQLOperation surface preserved byte-identical; backend interfaces (IBackend / ISessionBackend / IOperationBackend) made fully neutral so both Thrift and SEA can implement the same contract. F1 — neutral DTOs at IOperationBackend with Thrift-shape preservation on the public facade (adapter pattern): - lib/contracts/OperationStatus.ts (new) — neutral OperationStatus + OperationState enum mirroring databricks-sql-python's CommandState and kernel pyo3's StatementStatus taxonomy. - lib/contracts/ResultMetadata.ts (new) — neutral ResultMetadata + ResultFormat enum mirroring the three TSparkRowSetType cases. - IOperationBackend.status()/getResultMetadata() return the neutral DTOs. - ThriftOperationBackend.status() adapts at the boundary via adaptOperationStatus / adaptResultMetadata; module-level helpers thriftStateToOperationState and thriftRowSetTypeToResultFormat do the enum maps. - ThriftOperationBackend exposes thriftStatusResponse() and thriftResultMetadataResponse() as public Thrift-only accessors used by the facade's zero-loss fast path (kept for internal state-machine + result-handler dispatch as well). - lib/utils/thriftWireSynthesis.ts (new) — synthesizeThriftStatus and synthesizeThriftResultSetMetadata: convert neutral DTOs back to Thrift wire shape for the non-Thrift backend path. Lossy on Thrift-only fields (taskStatus, numModifiedRows, cacheLookupResult, etc.). - DBSQLOperation.status() and getMetadata() preserved Thrift return shape: Thrift backend path returns the real wire response (zero loss); non-Thrift backend path synthesizes via the new helpers. - DBSQLOperation.getResultMetadata() — new additive neutral accessor on IOperation; DBSQLSession.handleStagingOperation uses it instead of the deprecated Thrift-shaped getMetadata(). F2 — IBackend.connect() is now zero-arg. Backend reads everything it needs from IClientContext / constructor; matches Python connector's pattern of passing session_configuration via constructor not method-arg. F3 — Restore the 'Server protocol version' debug log dropped by the original PR-378 refactor. Re-added to ThriftSessionBackend.constructor with the LogLevel.debug + IClientContext.getLogger() pattern; matches the pre-refactor log site at main:lib/DBSQLSession.ts:175. F4 + F11 + F14 — SeaBackend stub safety: - close() is a no-op so DBSQLClient.close()'s state-clearing block can finish even after a useSEA: true connect() failure. - connect() and openSession() throw HiveDriverError instead of generic Error, matching the rest of the codebase. - connect(options: ConnectionOptions) and openSession(request: OpenSessionRequest) declare their parameters (with @typescript-eslint/no-unused-vars disable) so IDE autocomplete prompts the M1 SEA implementer. F6 + F7 + F9 + F10 — JSDoc on backend interfaces: - IBackend: connect/openSession/close docstrings; close() doc explicitly states transport-layer cleanup is owned by DBSQLClient. - ISessionBackend: copy IDBSQLSession's per-method one-liner JSDoc. - IOperationBackend: doc hasResultSet (readonly external; mutates internally), waitUntilReady (MUST throw OperationStateError on terminal non-success). F8 — tests/unit/sea/SeaBackend.test.ts (new) locks in the stub contract: connect() rejects HiveDriverError, openSession() rejects HiveDriverError, close() resolves no-op. ~30 LOC. F12 — Drop legacy { handle, ... } ctor branch from DBSQLOperation and DBSQLSession: - Facades accept only { backend, context }. - DBSQLSession no longer imports ThriftSessionBackend at all. - DBSQLOperation imports ThriftOperationBackend solely for the F1 typed downcast (zero-loss Thrift fast path); this is a deliberate, scoped coupling tied to the back-compat decision. - tests/unit/.stubs/createSessionForTest.ts and createOperationForTest.ts (new) wrap the legacy shape; all 48 + 54 test sites mechanically migrated. F15 — ThriftOperationBackend.waitUntilReady uses imported WaitUntilReadyOptions type instead of an anonymous inline shape. F16 — useSEA flag moved out of public ConnectionOptions: - Removed useSEA?: boolean from the exported lib/contracts/IDBSQLClient.ts ConnectionOptions; no longer ships in the public .d.ts. - lib/contracts/InternalConnectionOptions.ts (new) declares the flag as a non-exported internal extension; DBSQLClient.connect() reads via a typed cast. Mirrors Python's kwargs.get('use_sea', False) pattern at databricks-sql-python/src/databricks/sql/session.py:111. F17 — Missing return; after case 'timeout' in forwardConnectionEvent so a future fifth case doesn't silently fall through. The trailing return; in the last case triggers no-useless-return — quieted with a localized eslint-disable-next-line + intent comment. F5 — deferred per owner instruction (test-only as any cast tightening). Verification: - yarn lint clean (3 pre-existing warnings in tests/e2e/protocol_versions.test.ts). - yarn build clean. - tsc --noEmit -p tsconfig.json clean (apart from pre-existing examples/tokenFederation/* import errors that exist on main). - Runtime smoke test of SeaBackend stub + Thrift-wire synthesis round-trip passes 5/5 assertions. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * sea-abstraction: address PR #378 review-comment fixes (H1 / M1-M4 / L1-L10) Addresses 15 review findings from the code-review-squad pass on PR #378. L11 (backend kind field on the three interfaces) is deliberately deferred to avoid a cross-stack cascade ripple while the downstream PRs are still in flight. H1 — fetchChunk lost mid-flight failIfClosed regression. Add optional `isClosed?: () => boolean` to IOperationBackend.fetchChunk's options bag. ThriftOperationBackend.fetchChunk probes it after the setTimeout(0) macrotask yield and returns [] when set; the facade's post-fetch failIfClosed then raises the user-visible OperationStateError. Restores the guard that the refactor split across the facade/backend boundary so a cancel/close arriving during the yield window no longer runs the data RPC to completion needlessly. M1 — neutralize WaitUntilReadyOptions callback shape. Introduce IOperationBackendWaitOptions { callback?: (status: OperationStatus) => unknown } on the backend interface. Facade keeps the public Thrift-typed OperationStatusCallback and adapts at the boundary by wrapping the user's callback with synthesizeThriftStatus. ThriftOperationBackend.waitUntilReady consumes the neutral options and passes adaptOperationStatus(response) to the callback. M2 — synthesizeOkStatus maps OperationState to TStatusCode. Add synthesizeStatusFromOperation that returns ERROR_STATUS for Failed/Cancelled/Closed (carrying errorMessage + sqlState) and SUCCESS_STATUS otherwise. Wire it into synthesizeThriftStatus so legacy Status.assert(resp.status) sees the right code on non-Thrift backends. M3 — TelemetryEvent + DriverConfiguration carry a backend tag. Add optional backend?: 'thrift' | 'sea' | 'kernel' on both interfaces so dashboards can slice latency/error rate by backend without a metrics-schema migration once non-Thrift emission goes live. M4 — test coverage for the synthesize helpers + useSEA failure path. New tests/unit/thrift-backend/wireSynthesis.test.ts covering all OperationState/ResultFormat mappings, ERROR_STATUS carries errorMessage/sqlState, hasResultSet round-trip, schema/arrowSchema/ lz4Compressed/isStagingOperation preservation, and the L3 throw on unknown ResultFormat. New test in DBSQLClient.test.ts asserts that a useSEA:true connect failure leaves this.backend === undefined and the next openSession() surfaces "not connected" rather than the SeaBackend's "not implemented" error. L1 — forwardConnectionEvent normalizes payload to Error. Replace `payload as Error` with `payload instanceof Error ? payload : new Error(String(payload))` so a backend that emits a non-Error through the cross-backend onConnectionEvent doesn't crash the logger.log call. L2 — DBSQLClient.connect publishes this.backend only on success. Construct the backend locally, await connect() in a try/catch, run a best-effort backend.close() (per IBackend.close()'s safe-on-partial-init contract) and rethrow on failure. Only assign this.backend after a clean connect so a failed connect surfaces "DBSQLClient: not connected" on the next openSession. L3 — resultFormatToThrift throws on unknown ResultFormat. Replace the silent default fallback to COLUMN_BASED_SET with a HiveDriverError. Prevents a future ResultFormat enum extension from silently routing results through JsonResultHandler and surfacing garbled rows. L4 — DBSQLOperation.getMetadata carries @deprecated. Adds the canonical TypeScript JSDoc tag so IDEs (strikethrough), tsc, ESLint plugins, and agentic codegen pick up the soft deprecation in favour of getResultMetadata. L5 — numberToInt64 re-export carries @deprecated. Re-export through a named const with a JSDoc block (rather than a bare `export { ... } from`) so the @deprecated tag attaches to the symbol consumers see in their IDE / .d.ts. L6 — DBSQLSession.runBackend helper. Collapse 11 duplicated `failIfClosed → backend.X → failIfClosed` brackets into a single private runBackend<T>(fn) so the open-flag-before-and-after contract has a name and can't be forgotten in a new delegation method. L7 — restore three why-comments deleted from DBSQLSession. Staging-detection invariant in executeStatement, AWS-vs-Azure 404 difference on staging-remove, and the Content-Length-required note on staging-upload. Verbatim from main; these document non-obvious intentional behaviour the refactor inadvertently dropped. L8 — hasResultSet becomes a method on IOperationBackend. The value is state-dependent (the Thrift impl mutates the underlying operation handle inside processOperationStatusResponse), so the property+readonly+disclaimer-JSDoc pattern was misleading. Method form makes the live-read semantics obvious to a fresh implementer. 3 facade call sites updated. L9 — wireSynthesis moves under thrift-backend. The file imports Thrift IDL types and produces Thrift-typed values; it belongs next to ThriftOperationBackend, not in the neutral lib/utils/ tree where it would creep into the dependency cone of future backend-neutral helpers. Same reasoning that placed numberToInt64 and getDirectResultsOptions under thrift-backend/. L10 — interface-level downcast policy. Add a JSDoc paragraph on IOperationBackend grandfathering the two existing `instanceof ThriftOperationBackend` downcasts in DBSQLOperation.status/getMetadata and prohibiting new ones. Future zero-loss back-compat needs should extend the interface (or add an optional method) rather than spawn a per-backend branch matrix. Gates: yarn build (exit 0), yarn lint (0 errors, 3 pre-existing warnings in tests/e2e/protocol_versions.test.ts), yarn test on touched files (163 passing, +12 net new tests from M4 work; 2 failures pre- existing on PR head unchanged: getSchema-directResults and the LZ4-cloud-fetch flag — both flagged in the team-lead playbook as known prior regressions). Cascade implications for downstream PRs (#380 #377 #379 #382 #381 #384 #383): L8 converts hasResultSet from a property to a method, M1 swaps WaitUntilReadyOptions for IOperationBackendWaitOptions on the backend interface. Both are mechanical renames at downstream backend impls when they rebase. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * Fix SEA abstraction merge fallout Restore Thrift compatibility paths needed by existing schema and result-handler tests after merging main telemetry changes. Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * Restore Thrift result-handler compatibility hooks Keep existing e2e-only inspection hooks available through the facade while the new backend abstraction owns result handling. Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> --------- Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent e200a1b commit a3bf320

24 files changed

Lines changed: 1999 additions & 869 deletions

lib/DBSQLClient.ts

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
import thrift from 'thrift';
2-
import Int64 from 'node-int64';
32
import os from 'os';
43

54
import { EventEmitter } from 'events';
65
import TCLIService from '../thrift/TCLIService';
7-
import { TProtocolVersion } from '../thrift/TCLIService_types';
86
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
97
import IDriver from './contracts/IDriver';
108
import IClientContext, { ClientConfig } from './contracts/IClientContext';
@@ -15,9 +13,12 @@ import IDBSQLSession from './contracts/IDBSQLSession';
1513
import IAuthentication from './connection/contracts/IAuthentication';
1614
import HttpConnection from './connection/connections/HttpConnection';
1715
import IConnectionOptions from './connection/contracts/IConnectionOptions';
18-
import Status from './dto/Status';
1916
import HiveDriverError from './errors/HiveDriverError';
20-
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
17+
import { buildUserAgentString } from './utils';
18+
import IBackend from './contracts/IBackend';
19+
import { InternalConnectionOptions } from './contracts/InternalConnectionOptions';
20+
import ThriftBackend from './thrift-backend/ThriftBackend';
21+
import SeaBackend from './sea/SeaBackend';
2122
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
2223
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
2324
import {
@@ -47,19 +48,6 @@ function prependSlash(str: string): string {
4748
return str;
4849
}
4950

50-
function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
51-
if (!catalogName && !schemaName) {
52-
return {};
53-
}
54-
55-
return {
56-
initialNamespace: {
57-
catalogName,
58-
schemaName,
59-
},
60-
};
61-
}
62-
6351
export type ThriftLibrary = Pick<typeof thrift, 'createClient'>;
6452

6553
/**
@@ -111,6 +99,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
11199

112100
private readonly sessions = new CloseableCollection<DBSQLSession>();
113101

102+
private backend?: IBackend;
103+
114104
// Telemetry components — `telemetryClient` is the shared per-host owner
115105
// (process-wide via TelemetryClientProvider). The exporter, aggregator,
116106
// circuit-breaker registry and feature-flag cache live on it. Each
@@ -633,34 +623,35 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
633623

634624
this.connectionProvider = this.createConnectionProvider(options);
635625

636-
const thriftConnection = await this.connectionProvider.getThriftConnection();
626+
// M0: `useSEA` is consumed via a non-exported internal-options cast so it
627+
// doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")`
628+
// pattern (see databricks-sql-python/src/databricks/sql/session.py).
629+
const internalOptions = options as ConnectionOptions & InternalConnectionOptions;
630+
const backend = internalOptions.useSEA
631+
? new SeaBackend()
632+
: new ThriftBackend({
633+
context: this,
634+
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
635+
});
637636

638-
thriftConnection.on('error', (error: Error) => {
639-
// Error.stack already contains error type and message, so log stack if available,
640-
// otherwise fall back to just error type + message
641-
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
637+
// Publish `this.backend` only after a successful `connect()`. Otherwise a
638+
// failed connect would leave a half-initialized backend in place, and the
639+
// next `openSession()` would slip past the `!this.backend` guard and
640+
// surface a misleading "backend not implemented" / partial-state error
641+
// instead of the accurate "DBSQLClient: not connected".
642+
try {
643+
await backend.connect(options);
644+
} catch (err) {
645+
// `IBackend.close()` is documented as safe on a partially-initialized
646+
// backend; best-effort cleanup so we don't leak sockets / state.
642647
try {
643-
this.emit('error', error);
644-
} catch (e) {
645-
// EventEmitter will throw unhandled error when emitting 'error' event.
646-
// Since we already logged it few lines above, just suppress this behaviour
648+
await backend.close();
649+
} catch (closeErr) {
650+
// Swallow; the original error is what the caller needs to see.
647651
}
648-
});
649-
650-
thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
651-
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
652-
this.emit('reconnecting', params);
653-
});
654-
655-
thriftConnection.on('close', () => {
656-
this.logger.log(LogLevel.debug, 'Closing connection.');
657-
this.emit('close');
658-
});
659-
660-
thriftConnection.on('timeout', () => {
661-
this.logger.log(LogLevel.debug, 'Connection timed out.');
662-
this.emit('timeout');
663-
});
652+
throw err;
653+
}
654+
this.backend = backend;
664655

665656
// Initialize telemetry if enabled. The env var DATABRICKS_TELEMETRY_DISABLED
666657
// is a hard kill switch for ops/IT teams who can't redeploy app code.
@@ -691,6 +682,41 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
691682
return this;
692683
}
693684

685+
private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
686+
switch (event) {
687+
case 'error': {
688+
// `payload` is typed `unknown` because the cross-backend
689+
// `IBackend.onConnectionEvent` doesn't constrain the error shape.
690+
// Normalize to `Error` so the stack/name/message access below is safe
691+
// for any backend that emits a non-Error value (e.g. a bare string).
692+
const error = payload instanceof Error ? payload : new Error(String(payload));
693+
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
694+
try {
695+
this.emit('error', error);
696+
} catch (e) {
697+
// EventEmitter throws when 'error' has no listeners; we've already logged it.
698+
}
699+
return;
700+
}
701+
case 'reconnecting':
702+
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`);
703+
this.emit('reconnecting', payload);
704+
return;
705+
case 'close':
706+
this.logger.log(LogLevel.debug, 'Closing connection.');
707+
this.emit('close');
708+
return;
709+
case 'timeout':
710+
this.logger.log(LogLevel.debug, 'Connection timed out.');
711+
this.emit('timeout');
712+
// Explicit return mirrors the other cases and protects against
713+
// fall-through if a new event is added below.
714+
// eslint-disable-next-line no-useless-return
715+
return;
716+
// no default
717+
}
718+
}
719+
694720
/**
695721
* Starts new session
696722
* @public
@@ -701,6 +727,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
701727
* const session = await client.openSession();
702728
*/
703729
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
730+
if (!this.backend) {
731+
throw new HiveDriverError('DBSQLClient: not connected');
732+
}
733+
704734
// Track connection open latency
705735
const startTime = Date.now();
706736

@@ -711,30 +741,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
711741
if (this.config.enableMetricViewMetadata) {
712742
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
713743
}
714-
715-
// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
716-
if (request.queryTags !== undefined) {
717-
const serialized = serializeQueryTags(request.queryTags);
718-
if (serialized) {
719-
configuration.QUERY_TAGS = serialized;
720-
} else {
721-
delete configuration.QUERY_TAGS;
722-
}
723-
}
724-
725-
const response = await this.driver.openSession({
726-
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
727-
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
744+
const sessionBackend = await this.backend.openSession({
745+
...request,
728746
configuration,
729-
canUseMultipleCatalogs: true,
730-
});
731-
732-
Status.assert(response.status);
733-
const session = new DBSQLSession({
734-
handle: definedOrError(response.sessionHandle),
735-
context: this,
736-
serverProtocolVersion: response.serverProtocolVersion,
737747
});
748+
const session = new DBSQLSession({ backend: sessionBackend, context: this });
738749
this.sessions.add(session);
739750

740751
// Emit connection.open telemetry event. The DriverConfiguration blob
@@ -772,6 +783,9 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
772783
*/
773784
public async close(): Promise<void> {
774785
await this.sessions.closeAll();
786+
await this.backend?.close();
787+
788+
this.backend = undefined;
775789

776790
// Cleanup telemetry. Releasing our refcount on the shared TelemetryClient
777791
// is awaited because the underlying close() drains the final HTTP POST —

0 commit comments

Comments
 (0)