11import thrift from 'thrift' ;
2- import Int64 from 'node-int64' ;
32import os from 'os' ;
43
54import { EventEmitter } from 'events' ;
65import TCLIService from '../thrift/TCLIService' ;
7- import { TProtocolVersion } from '../thrift/TCLIService_types' ;
86import IDBSQLClient , { ClientOptions , ConnectionOptions , OpenSessionRequest } from './contracts/IDBSQLClient' ;
97import IDriver from './contracts/IDriver' ;
108import IClientContext , { ClientConfig } from './contracts/IClientContext' ;
@@ -15,9 +13,12 @@ import IDBSQLSession from './contracts/IDBSQLSession';
1513import IAuthentication from './connection/contracts/IAuthentication' ;
1614import HttpConnection from './connection/connections/HttpConnection' ;
1715import IConnectionOptions from './connection/contracts/IConnectionOptions' ;
18- import Status from './dto/Status' ;
1916import HiveDriverError from './errors/HiveDriverError' ;
20- import { buildUserAgentString , definedOrError , serializeQueryTags } from './utils' ;
17+ import { buildUserAgentString } from './utils' ;
18+ import IBackend from './contracts/IBackend' ;
19+ import { InternalConnectionOptions } from './contracts/InternalConnectionOptions' ;
20+ import ThriftBackend from './thrift-backend/ThriftBackend' ;
21+ import SeaBackend from './sea/SeaBackend' ;
2122import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication' ;
2223import DatabricksOAuth , { OAuthFlow } from './connection/auth/DatabricksOAuth' ;
2324import {
@@ -47,19 +48,6 @@ function prependSlash(str: string): string {
4748 return str ;
4849}
4950
50- function getInitialNamespaceOptions ( catalogName ?: string , schemaName ?: string ) {
51- if ( ! catalogName && ! schemaName ) {
52- return { } ;
53- }
54-
55- return {
56- initialNamespace : {
57- catalogName,
58- schemaName,
59- } ,
60- } ;
61- }
62-
6351export type ThriftLibrary = Pick < typeof thrift , 'createClient' > ;
6452
6553/**
@@ -111,6 +99,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
11199
112100 private readonly sessions = new CloseableCollection < DBSQLSession > ( ) ;
113101
102+ private backend ?: IBackend ;
103+
114104 // Telemetry components — `telemetryClient` is the shared per-host owner
115105 // (process-wide via TelemetryClientProvider). The exporter, aggregator,
116106 // circuit-breaker registry and feature-flag cache live on it. Each
@@ -637,34 +627,35 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
637627
638628 this . connectionProvider = this . createConnectionProvider ( options ) ;
639629
640- const thriftConnection = await this . connectionProvider . getThriftConnection ( ) ;
630+ // M0: `useSEA` is consumed via a non-exported internal-options cast so it
631+ // doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")`
632+ // pattern (see databricks-sql-python/src/databricks/sql/session.py).
633+ const internalOptions = options as ConnectionOptions & InternalConnectionOptions ;
634+ const backend = internalOptions . useSEA
635+ ? new SeaBackend ( )
636+ : new ThriftBackend ( {
637+ context : this ,
638+ onConnectionEvent : ( event , payload ) => this . forwardConnectionEvent ( event , payload ) ,
639+ } ) ;
641640
642- thriftConnection . on ( 'error' , ( error : Error ) => {
643- // Error.stack already contains error type and message, so log stack if available,
644- // otherwise fall back to just error type + message
645- this . logger . log ( LogLevel . error , error . stack || `${ error . name } : ${ error . message } ` ) ;
641+ // Publish `this.backend` only after a successful `connect()`. Otherwise a
642+ // failed connect would leave a half-initialized backend in place, and the
643+ // next `openSession()` would slip past the `!this.backend` guard and
644+ // surface a misleading "backend not implemented" / partial-state error
645+ // instead of the accurate "DBSQLClient: not connected".
646+ try {
647+ await backend . connect ( options ) ;
648+ } catch ( err ) {
649+ // `IBackend.close()` is documented as safe on a partially-initialized
650+ // backend; best-effort cleanup so we don't leak sockets / state.
646651 try {
647- this . emit ( 'error' , error ) ;
648- } catch ( e ) {
649- // EventEmitter will throw unhandled error when emitting 'error' event.
650- // Since we already logged it few lines above, just suppress this behaviour
652+ await backend . close ( ) ;
653+ } catch ( closeErr ) {
654+ // Swallow; the original error is what the caller needs to see.
651655 }
652- } ) ;
653-
654- thriftConnection . on ( 'reconnecting' , ( params : { delay : number ; attempt : number } ) => {
655- this . logger . log ( LogLevel . debug , `Reconnecting, params: ${ JSON . stringify ( params ) } ` ) ;
656- this . emit ( 'reconnecting' , params ) ;
657- } ) ;
658-
659- thriftConnection . on ( 'close' , ( ) => {
660- this . logger . log ( LogLevel . debug , 'Closing connection.' ) ;
661- this . emit ( 'close' ) ;
662- } ) ;
663-
664- thriftConnection . on ( 'timeout' , ( ) => {
665- this . logger . log ( LogLevel . debug , 'Connection timed out.' ) ;
666- this . emit ( 'timeout' ) ;
667- } ) ;
656+ throw err ;
657+ }
658+ this . backend = backend ;
668659
669660 // Initialize telemetry if enabled. The env var DATABRICKS_TELEMETRY_DISABLED
670661 // is a hard kill switch for ops/IT teams who can't redeploy app code.
@@ -695,6 +686,41 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
695686 return this ;
696687 }
697688
689+ private forwardConnectionEvent ( event : 'error' | 'reconnecting' | 'close' | 'timeout' , payload ?: unknown ) : void {
690+ switch ( event ) {
691+ case 'error' : {
692+ // `payload` is typed `unknown` because the cross-backend
693+ // `IBackend.onConnectionEvent` doesn't constrain the error shape.
694+ // Normalize to `Error` so the stack/name/message access below is safe
695+ // for any backend that emits a non-Error value (e.g. a bare string).
696+ const error = payload instanceof Error ? payload : new Error ( String ( payload ) ) ;
697+ this . logger . log ( LogLevel . error , error . stack || `${ error . name } : ${ error . message } ` ) ;
698+ try {
699+ this . emit ( 'error' , error ) ;
700+ } catch ( e ) {
701+ // EventEmitter throws when 'error' has no listeners; we've already logged it.
702+ }
703+ return ;
704+ }
705+ case 'reconnecting' :
706+ this . logger . log ( LogLevel . debug , `Reconnecting, params: ${ JSON . stringify ( payload ) } ` ) ;
707+ this . emit ( 'reconnecting' , payload ) ;
708+ return ;
709+ case 'close' :
710+ this . logger . log ( LogLevel . debug , 'Closing connection.' ) ;
711+ this . emit ( 'close' ) ;
712+ return ;
713+ case 'timeout' :
714+ this . logger . log ( LogLevel . debug , 'Connection timed out.' ) ;
715+ this . emit ( 'timeout' ) ;
716+ // Explicit return mirrors the other cases and protects against
717+ // fall-through if a new event is added below.
718+ // eslint-disable-next-line no-useless-return
719+ return ;
720+ // no default
721+ }
722+ }
723+
698724 /**
699725 * Starts new session
700726 * @public
@@ -705,6 +731,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
705731 * const session = await client.openSession();
706732 */
707733 public async openSession ( request : OpenSessionRequest = { } ) : Promise < IDBSQLSession > {
734+ if ( ! this . backend ) {
735+ throw new HiveDriverError ( 'DBSQLClient: not connected' ) ;
736+ }
737+
708738 // Track connection open latency
709739 const startTime = Date . now ( ) ;
710740
@@ -715,30 +745,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
715745 if ( this . config . enableMetricViewMetadata ) {
716746 configuration [ 'spark.sql.thriftserver.metadata.metricview.enabled' ] = 'true' ;
717747 }
718-
719- // Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
720- if ( request . queryTags !== undefined ) {
721- const serialized = serializeQueryTags ( request . queryTags ) ;
722- if ( serialized ) {
723- configuration . QUERY_TAGS = serialized ;
724- } else {
725- delete configuration . QUERY_TAGS ;
726- }
727- }
728-
729- const response = await this . driver . openSession ( {
730- client_protocol_i64 : new Int64 ( TProtocolVersion . SPARK_CLI_SERVICE_PROTOCOL_V8 ) ,
731- ...getInitialNamespaceOptions ( request . initialCatalog , request . initialSchema ) ,
748+ const sessionBackend = await this . backend . openSession ( {
749+ ...request ,
732750 configuration,
733- canUseMultipleCatalogs : true ,
734- } ) ;
735-
736- Status . assert ( response . status ) ;
737- const session = new DBSQLSession ( {
738- handle : definedOrError ( response . sessionHandle ) ,
739- context : this ,
740- serverProtocolVersion : response . serverProtocolVersion ,
741751 } ) ;
752+ const session = new DBSQLSession ( { backend : sessionBackend , context : this } ) ;
742753 this . sessions . add ( session ) ;
743754
744755 // Emit connection.open telemetry event. The DriverConfiguration blob
@@ -776,6 +787,9 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
776787 */
777788 public async close ( ) : Promise < void > {
778789 await this . sessions . closeAll ( ) ;
790+ await this . backend ?. close ( ) ;
791+
792+ this . backend = undefined ;
779793
780794 // Cleanup telemetry. Releasing our refcount on the shared TelemetryClient
781795 // is awaited because the underlying close() drains the final HTTP POST —
0 commit comments