-
Notifications
You must be signed in to change notification settings - Fork 93
Expand file tree
/
Copy pathdatabricks.ts
More file actions
145 lines (130 loc) · 4.32 KB
/
Copy pathdatabricks.ts
File metadata and controls
145 lines (130 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* Databricks driver using the `@databricks/sql` package.
*/
import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types"
export async function connect(config: ConnectionConfig): Promise<Connector> {
let databricksModule: any
try {
databricksModule = await import("@databricks/sql")
databricksModule = databricksModule.default || databricksModule
} catch {
throw new Error(
"Databricks driver not installed. Run: npm install @databricks/sql",
)
}
let client: any
let session: any
function escapeIdentifier(value: string): string {
return value.replace(/`/g, "``")
}
return {
async connect() {
const DBSQLClient = databricksModule.DBSQLClient ?? databricksModule
// Suppress @databricks/sql Winston console logging — it writes JSON
// log lines to stdout which corrupt the TUI display (see #249).
// Use a no-op logger that satisfies the interface but discards all output.
const logger = { log: () => {}, setLevel: () => {} }
client = new DBSQLClient({ logger })
const connectionOptions: Record<string, unknown> = {
host: config.server_hostname,
path: config.http_path,
token: config.access_token,
}
await client.connect(connectionOptions)
session = await client.openSession({
initialCatalog: config.catalog as string | undefined,
initialSchema: config.schema as string | undefined,
})
},
async execute(sql: string, limit?: number, binds?: any[], options?: ExecuteOptions): Promise<ConnectorResult> {
const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000)
let query = sql
const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql)
if (
isSelectLike &&
effectiveLimit &&
!/\bLIMIT\b/i.test(sql)
) {
query = `${sql.replace(/;\s*$/, "")} LIMIT ${effectiveLimit + 1}`
}
const stmtOptions: Record<string, any> = {}
if (binds?.length) {
stmtOptions.ordinalParameters = binds
}
const operation = await session.executeStatement(query, stmtOptions)
const rows = await operation.fetchAll()
await operation.close()
const columns = rows.length > 0 ? Object.keys(rows[0]) : []
const truncated = effectiveLimit > 0 && rows.length > effectiveLimit
const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows
return {
columns,
rows: limitedRows.map((row: any) =>
columns.map((col) => row[col]),
),
row_count: limitedRows.length,
truncated,
}
},
async listSchemas(): Promise<string[]> {
const operation = await session.executeStatement("SHOW SCHEMAS")
const rows = await operation.fetchAll()
await operation.close()
return rows.map(
(r: any) =>
(r.databaseName ?? r.namespace ?? Object.values(r)[0]) as string,
)
},
async listTables(
schema: string,
): Promise<Array<{ name: string; type: string }>> {
const operation = await session.executeStatement(
`SHOW TABLES IN \`${escapeIdentifier(schema)}\``,
)
const rows = await operation.fetchAll()
await operation.close()
return rows.map((r: any) => ({
name: (r.tableName ?? Object.values(r)[0]) as string,
type:
r.isTemporary === true
? "temporary"
: "table",
}))
},
async describeTable(
schema: string,
table: string,
): Promise<SchemaColumn[]> {
const operation = await session.executeStatement(
`DESCRIBE TABLE \`${escapeIdentifier(schema)}\`.\`${escapeIdentifier(table)}\``,
)
const rows = await operation.fetchAll()
await operation.close()
return rows
.filter((r: any) => r.col_name && !r.col_name.startsWith("#"))
.map((r: any) => ({
name: r.col_name as string,
data_type: r.data_type as string,
nullable: r.nullable !== "false",
}))
},
async close() {
if (session) {
try {
await session.close()
} catch {
// ignore
}
session = null
}
if (client) {
try {
await client.close()
} catch {
// ignore
}
client = null
}
},
}
}