-
Notifications
You must be signed in to change notification settings - Fork 49
[SEA-NodeJS] (1/8) Backend abstraction — IBackend / ISessionBackend / IOperationBackend #378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
0085928
2be1a63
8a22d54
a9347e8
45563e5
0e802ec
e4e6bcf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,7 @@ | ||
| import thrift from 'thrift'; | ||
| import Int64 from 'node-int64'; | ||
|
|
||
| import { EventEmitter } from 'events'; | ||
| import TCLIService from '../thrift/TCLIService'; | ||
| import { TProtocolVersion } from '../thrift/TCLIService_types'; | ||
| import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient'; | ||
| import IDriver from './contracts/IDriver'; | ||
| import IClientContext, { ClientConfig } from './contracts/IClientContext'; | ||
|
|
@@ -14,9 +12,12 @@ import IDBSQLSession from './contracts/IDBSQLSession'; | |
| import IAuthentication from './connection/contracts/IAuthentication'; | ||
| import HttpConnection from './connection/connections/HttpConnection'; | ||
| import IConnectionOptions from './connection/contracts/IConnectionOptions'; | ||
| import Status from './dto/Status'; | ||
| import HiveDriverError from './errors/HiveDriverError'; | ||
| import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils'; | ||
| import { buildUserAgentString } from './utils'; | ||
| import IBackend from './contracts/IBackend'; | ||
| import { InternalConnectionOptions } from './contracts/InternalConnectionOptions'; | ||
| import ThriftBackend from './thrift-backend/ThriftBackend'; | ||
| import SeaBackend from './sea/SeaBackend'; | ||
| import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication'; | ||
| import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth'; | ||
| import { | ||
|
|
@@ -39,19 +40,6 @@ function prependSlash(str: string): string { | |
| return str; | ||
| } | ||
|
|
||
| function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) { | ||
| if (!catalogName && !schemaName) { | ||
| return {}; | ||
| } | ||
|
|
||
| return { | ||
| initialNamespace: { | ||
| catalogName, | ||
| schemaName, | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| export type ThriftLibrary = Pick<typeof thrift, 'createClient'>; | ||
|
|
||
| export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext { | ||
|
|
@@ -75,6 +63,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I | |
|
|
||
| private readonly sessions = new CloseableCollection<DBSQLSession>(); | ||
|
|
||
| private backend?: IBackend; | ||
|
|
||
| private static getDefaultLogger(): IDBSQLLogger { | ||
| if (!this.defaultLogger) { | ||
| this.defaultLogger = new DBSQLLogger(); | ||
|
|
@@ -248,38 +238,53 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I | |
|
|
||
| this.connectionProvider = this.createConnectionProvider(options); | ||
|
|
||
| const thriftConnection = await this.connectionProvider.getThriftConnection(); | ||
|
|
||
| thriftConnection.on('error', (error: Error) => { | ||
| // Error.stack already contains error type and message, so log stack if available, | ||
| // otherwise fall back to just error type + message | ||
| this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`); | ||
| try { | ||
| this.emit('error', error); | ||
| } catch (e) { | ||
| // EventEmitter will throw unhandled error when emitting 'error' event. | ||
| // Since we already logged it few lines above, just suppress this behaviour | ||
| } | ||
| }); | ||
|
|
||
| thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => { | ||
| this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`); | ||
| this.emit('reconnecting', params); | ||
| }); | ||
|
|
||
| thriftConnection.on('close', () => { | ||
| this.logger.log(LogLevel.debug, 'Closing connection.'); | ||
| this.emit('close'); | ||
| }); | ||
| // M0: `useSEA` is consumed via a non-exported internal-options cast so it | ||
| // doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")` | ||
| // pattern (see databricks-sql-python/src/databricks/sql/session.py). | ||
| const internalOptions = options as ConnectionOptions & InternalConnectionOptions; | ||
| this.backend = internalOptions.useSEA | ||
| ? new SeaBackend() | ||
| : new ThriftBackend({ | ||
| context: this, | ||
| onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload), | ||
| }); | ||
|
|
||
| thriftConnection.on('timeout', () => { | ||
| this.logger.log(LogLevel.debug, 'Connection timed out.'); | ||
| this.emit('timeout'); | ||
| }); | ||
| await this.backend.connect(options); | ||
|
|
||
| return this; | ||
| } | ||
|
|
||
| private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void { | ||
| switch (event) { | ||
| case 'error': { | ||
| const error = payload as Error; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Low — latent crash hazard. Pre-refactor the Thrift
Posted by code-review-squad ·
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in a9347e8. Replaced the unchecked cast at const error = payload instanceof Error ? payload : new Error(String(payload));Added a JSDoc comment noting that This comment was generated with GitHub MCP. |
||
| this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`); | ||
| try { | ||
| this.emit('error', error); | ||
| } catch (e) { | ||
| // EventEmitter throws when 'error' has no listeners; we've already logged it. | ||
| } | ||
| return; | ||
| } | ||
| case 'reconnecting': | ||
| this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`); | ||
| this.emit('reconnecting', payload); | ||
| return; | ||
| case 'close': | ||
| this.logger.log(LogLevel.debug, 'Closing connection.'); | ||
| this.emit('close'); | ||
| return; | ||
| case 'timeout': | ||
| this.logger.log(LogLevel.debug, 'Connection timed out.'); | ||
| this.emit('timeout'); | ||
| // Explicit return mirrors the other cases and protects against | ||
| // fall-through if a new event is added below. | ||
| // eslint-disable-next-line no-useless-return | ||
| return; | ||
| // no default | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Starts new session | ||
| * @public | ||
|
|
@@ -290,44 +295,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I | |
| * const session = await client.openSession(); | ||
| */ | ||
| public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> { | ||
| // Prepare session configuration | ||
| const configuration = request.configuration ? { ...request.configuration } : {}; | ||
|
|
||
| // Add metric view metadata config if enabled | ||
| if (this.config.enableMetricViewMetadata) { | ||
| configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true'; | ||
| } | ||
|
|
||
| // Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS | ||
| if (request.queryTags !== undefined) { | ||
| const serialized = serializeQueryTags(request.queryTags); | ||
| if (serialized) { | ||
| configuration.QUERY_TAGS = serialized; | ||
| } else { | ||
| delete configuration.QUERY_TAGS; | ||
| } | ||
| if (!this.backend) { | ||
| throw new HiveDriverError('DBSQLClient: not connected'); | ||
| } | ||
|
|
||
| const response = await this.driver.openSession({ | ||
| client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8), | ||
| ...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema), | ||
| configuration, | ||
| canUseMultipleCatalogs: true, | ||
| }); | ||
|
|
||
| Status.assert(response.status); | ||
| const session = new DBSQLSession({ | ||
| handle: definedOrError(response.sessionHandle), | ||
| context: this, | ||
| serverProtocolVersion: response.serverProtocolVersion, | ||
| }); | ||
| const sessionBackend = await this.backend.openSession(request); | ||
| const session = new DBSQLSession({ backend: sessionBackend, context: this }); | ||
| this.sessions.add(session); | ||
| return session; | ||
| } | ||
|
|
||
| public async close(): Promise<void> { | ||
| await this.sessions.closeAll(); | ||
| await this.backend?.close(); | ||
|
|
||
| this.backend = undefined; | ||
| this.client = undefined; | ||
| this.connectionProvider = undefined; | ||
| this.authProvider = undefined; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Low — confusing diagnostic on a partially-initialized path.
With
useSEA: true,SeaBackend.connect()always throws (M0 stub), butthis.backendis already assigned. A subsequentopenSession()passes the!this.backendguard at line 298 and reachesSeaBackend.openSession(), which throws "SEA backend not implemented yet" instead of "DBSQLClient: not connected".this.backendafter a successfulconnect(), or (b)try/catcharoundawait this.backend.connect(options)and nullthis.backendon failure.The
IBackend.close()JSDoc explicitly mentions "MUST be safe to call on a partially-initialized backend (i.e. after a failedconnect())" — but that contract only coversclose(), not the openSession-after-failed-connect path.Posted by code-review-squad ·
/full-reviewThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a9347e8.
Restructured to construct the backend locally, attempt
connect()in a try/catch, run a best-effortbackend.close()on failure (relying onIBackend.close()'s documented safe-on-partial-init contract), and only assignthis.backendafter a clean connect. A failedconnect()now leavesthis.backend === undefined, so the nextopenSession()trips the existing!this.backendguard and surfacesHiveDriverError: DBSQLClient: not connected— the diagnostic this fix was about delivering.Unit-tested via the new
useSEA: truefailure-path test intests/unit/DBSQLClient.test.ts(asserts the post-connect-failure backend state and the openSession diagnostic).This comment was generated with GitHub MCP.