-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathduckdb.ts
More file actions
188 lines (172 loc) · 5.89 KB
/
duckdb.ts
File metadata and controls
188 lines (172 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/**
* DuckDB driver using the `duckdb` package.
*/
import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types"
export async function connect(config: ConnectionConfig): Promise<Connector> {
let duckdb: any
try {
duckdb = await import("duckdb")
duckdb = duckdb.default || duckdb
} catch {
throw new Error("DuckDB driver not installed. Run: npm install duckdb")
}
const dbPath = (config.path as string) ?? ":memory:"
let db: any
let connection: any
// altimate_change start — improve DuckDB error messages
function wrapDuckDBError(err: Error): Error {
const msg = err.message || String(err)
if (msg.toLowerCase().includes("locked") || msg.includes("SQLITE_BUSY") || msg.includes("DUCKDB_LOCKED")) {
return new Error(
`Database "${dbPath}" is locked by another process. ` +
`DuckDB does not support concurrent write access. ` +
`Close other connections to this file and try again.`,
)
}
return err
}
// altimate_change end
function query(sql: string): Promise<any[]> {
return new Promise((resolve, reject) => {
connection.all(sql, (err: Error | null, rows: any[]) => {
if (err) reject(wrapDuckDBError(err))
else resolve(rows ?? [])
})
})
}
function queryWithParams(sql: string, params: any[]): Promise<any[]> {
return new Promise((resolve, reject) => {
connection.all(sql, ...params, (err: Error | null, rows: any[]) => {
if (err) reject(wrapDuckDBError(err))
else resolve(rows ?? [])
})
})
}
return {
async connect() {
// altimate_change start — retry with read-only on lock errors
const tryConnect = (accessMode?: string): Promise<any> =>
new Promise<any>((resolve, reject) => {
let resolved = false
let timeout: ReturnType<typeof setTimeout> | undefined
const opts = accessMode ? { access_mode: accessMode } : undefined
const instance = new duckdb.Database(
dbPath,
opts,
(err: Error | null) => {
if (resolved) { if (instance && typeof instance.close === "function") instance.close(); return }
resolved = true
if (timeout) clearTimeout(timeout)
if (err) {
const msg = err.message || String(err)
if (msg.toLowerCase().includes("locked") || msg.includes("SQLITE_BUSY") || msg.includes("DUCKDB_LOCKED")) {
reject(new Error("DUCKDB_LOCKED"))
} else {
reject(err)
}
} else {
resolve(instance)
}
},
)
// Bun: native callback may not fire; fall back after 2s
timeout = setTimeout(() => {
if (!resolved) {
resolved = true
reject(new Error(`Timed out opening DuckDB database "${dbPath}"`))
}
}, 2000)
})
try {
db = await tryConnect()
} catch (err: any) {
if (err.message === "DUCKDB_LOCKED" && dbPath !== ":memory:") {
// Retry in read-only mode — allows concurrent reads
try {
db = await tryConnect("READ_ONLY")
} catch (retryErr) {
throw wrapDuckDBError(
retryErr instanceof Error ? retryErr : new Error(String(retryErr)),
)
}
} else {
throw err
}
}
// altimate_change end
connection = db.connect()
},
async execute(sql: string, limit?: number, binds?: any[]): Promise<ConnectorResult> {
const effectiveLimit = limit ?? 1000
let finalSql = sql
const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql)
if (
isSelectLike &&
effectiveLimit &&
!/\bLIMIT\b/i.test(sql)
) {
finalSql = `${sql.replace(/;\s*$/, "")} LIMIT ${effectiveLimit + 1}`
}
const rows = binds?.length
? await queryWithParams(finalSql, binds)
: await query(finalSql)
const columns =
rows.length > 0 ? Object.keys(rows[0]) : []
const truncated = rows.length > effectiveLimit
const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows
return {
columns,
rows: limitedRows.map((row) =>
columns.map((col) => row[col]),
),
row_count: limitedRows.length,
truncated,
}
},
async listSchemas(): Promise<string[]> {
const rows = await query(
"SELECT schema_name FROM information_schema.schemata ORDER BY schema_name",
)
return rows.map((r) => r.schema_name as string)
},
async listTables(
schema: string,
): Promise<Array<{ name: string; type: string }>> {
const rows = await queryWithParams(
`SELECT table_name, table_type FROM information_schema.tables WHERE table_schema = $1 ORDER BY table_name`,
[schema],
)
return rows.map((r) => ({
name: r.table_name as string,
type: r.table_type === "VIEW" ? "view" : "table",
}))
},
async describeTable(
schema: string,
table: string,
): Promise<SchemaColumn[]> {
const rows = await queryWithParams(
`SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position`,
[schema, table],
)
return rows.map((r) => ({
name: r.column_name as string,
data_type: r.data_type as string,
nullable: r.is_nullable === "YES",
}))
},
async close() {
if (db) {
await new Promise<void>((resolve) => {
db.close((err: Error | null) => {
resolve()
})
// Bun: native callback may not fire; fall back after timeout
setTimeout(resolve, 500)
})
db = null
connection = null
}
},
}
}