Skip to content
129 changes: 55 additions & 74 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import thrift from 'thrift';
import Int64 from 'node-int64';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import IDriver from './contracts/IDriver';
import IClientContext, { ClientConfig } from './contracts/IClientContext';
Expand All @@ -14,9 +12,12 @@ import IDBSQLSession from './contracts/IDBSQLSession';
import IAuthentication from './connection/contracts/IAuthentication';
import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
import { buildUserAgentString } from './utils';
import IBackend from './contracts/IBackend';
import { InternalConnectionOptions } from './contracts/InternalConnectionOptions';
import ThriftBackend from './thrift-backend/ThriftBackend';
import SeaBackend from './sea/SeaBackend';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
import {
Expand All @@ -39,19 +40,6 @@ function prependSlash(str: string): string {
return str;
}

function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
if (!catalogName && !schemaName) {
return {};
}

return {
initialNamespace: {
catalogName,
schemaName,
},
};
}

export type ThriftLibrary = Pick<typeof thrift, 'createClient'>;

export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
Expand All @@ -75,6 +63,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private readonly sessions = new CloseableCollection<DBSQLSession>();

private backend?: IBackend;

private static getDefaultLogger(): IDBSQLLogger {
if (!this.defaultLogger) {
this.defaultLogger = new DBSQLLogger();
Expand Down Expand Up @@ -248,38 +238,53 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

this.connectionProvider = this.createConnectionProvider(options);

const thriftConnection = await this.connectionProvider.getThriftConnection();

thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});
// M0: `useSEA` is consumed via a non-exported internal-options cast so it
// doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")`
// pattern (see databricks-sql-python/src/databricks/sql/session.py).
const internalOptions = options as ConnectionOptions & InternalConnectionOptions;
this.backend = internalOptions.useSEA

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — confusing diagnostic on a partially-initialized path.

With useSEA: true, SeaBackend.connect() always throws (M0 stub), but this.backend is already assigned. A subsequent openSession() passes the !this.backend guard at line 298 and reaches SeaBackend.openSession(), which throws "SEA backend not implemented yet" instead of "DBSQLClient: not connected".

  • Suggested fix: either (a) assign this.backend after a successful connect(), or (b) try/catch around await this.backend.connect(options) and null this.backend on failure.

The IBackend.close() JSDoc explicitly mentions "MUST be safe to call on a partially-initialized backend (i.e. after a failed connect())" — but that contract only covers close(), not the openSession-after-failed-connect path.


Posted by code-review-squad · /full-review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a9347e8.

Restructured to construct the backend locally, attempt connect() in a try/catch, run a best-effort backend.close() on failure (relying on IBackend.close()'s documented safe-on-partial-init contract), and only assign this.backend after a clean connect. A failed connect() now leaves this.backend === undefined, so the next openSession() trips the existing !this.backend guard and surfaces HiveDriverError: DBSQLClient: not connected — the diagnostic this fix was about delivering.

Unit-tested via the new useSEA: true failure-path test in tests/unit/DBSQLClient.test.ts (asserts the post-connect-failure backend state and the openSession diagnostic).


This comment was generated with GitHub MCP.

? new SeaBackend()
: new ThriftBackend({
context: this,
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});
await this.backend.connect(options);

return this;
}

private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
switch (event) {
case 'error': {
const error = payload as Error;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — latent crash hazard. Pre-refactor the Thrift EventEmitter typing guaranteed error: Error. Post-refactor payload is unknown and the cast is unchecked. If a future backend ever emits a non-Error value through this path (e.g. a string, which EventEmitter.emit('error', ...) permits), error.stack || error.name + ': ' + error.message will throw TypeError. The try/catch only protects this.emit, not the logger.log line.

  • Suggested fix:
    const error = payload instanceof Error ? payload : new Error(String(payload));

Posted by code-review-squad · /full-review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a9347e8.

Replaced the unchecked cast at lib/DBSQLClient.ts:260 with:

const error = payload instanceof Error ? payload : new Error(String(payload));

Added a JSDoc comment noting that payload is unknown because the cross-backend IBackend.onConnectionEvent doesn't constrain the error shape. logger.log and this.emit('error', error) are both safe now even if a backend emits a bare string through the event.


This comment was generated with GitHub MCP.

this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter throws when 'error' has no listeners; we've already logged it.
}
return;
}
case 'reconnecting':
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`);
this.emit('reconnecting', payload);
return;
case 'close':
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
return;
case 'timeout':
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
// Explicit return mirrors the other cases and protects against
// fall-through if a new event is added below.
// eslint-disable-next-line no-useless-return
return;
// no default
}
}

/**
* Starts new session
* @public
Expand All @@ -290,44 +295,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
* const session = await client.openSession();
*/
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
// Prepare session configuration
const configuration = request.configuration ? { ...request.configuration } : {};

// Add metric view metadata config if enabled
if (this.config.enableMetricViewMetadata) {
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
}

// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
if (request.queryTags !== undefined) {
const serialized = serializeQueryTags(request.queryTags);
if (serialized) {
configuration.QUERY_TAGS = serialized;
} else {
delete configuration.QUERY_TAGS;
}
if (!this.backend) {
throw new HiveDriverError('DBSQLClient: not connected');
}

const response = await this.driver.openSession({
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
configuration,
canUseMultipleCatalogs: true,
});

Status.assert(response.status);
const session = new DBSQLSession({
handle: definedOrError(response.sessionHandle),
context: this,
serverProtocolVersion: response.serverProtocolVersion,
});
const sessionBackend = await this.backend.openSession(request);
const session = new DBSQLSession({ backend: sessionBackend, context: this });
this.sessions.add(session);
return session;
}

public async close(): Promise<void> {
await this.sessions.closeAll();
await this.backend?.close();

this.backend = undefined;
this.client = undefined;
this.connectionProvider = undefined;
this.authProvider = undefined;
Expand Down
Loading