-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathdetect-join-candidates.ts
More file actions
317 lines (288 loc) · 9.71 KB
/
detect-join-candidates.ts
File metadata and controls
317 lines (288 loc) · 9.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
// altimate_change start — cross-DB join key inference
/**
* Cross-DB join key inference.
*
* For each pair of (db, table, column) drawn from different warehouse
* connections, look for a shared value-shape: both sides have a non-empty
* common prefix that ends in a separator (`_`, `-`, `:`), the prefixes are
* different, and stripping the prefixes leaves at least one matching suffix.
*
* The canonical pattern this targets: a `business_id` column whose values
* look like `businessid_42` joins to a `business_ref` column with values
* like `businessref_42`. The inference is purely value-based — it does not
* inspect column names — so it survives schemas that disagree on naming
* conventions.
*
* The algorithm here is a TypeScript port of `_detect_join_candidates` /
* `_common_prefix` from dab_bench's preindexer.
*/
import * as Registry from "./registry"
import type { Connector } from "@altimateai/drivers"
import type {
AltimateCoreDetectJoinCandidatesParams,
AltimateCoreResult,
} from "../types"
const DEFAULT_SAMPLE_SIZE = 50
const DEFAULT_MAX_TABLES_PER_CONNECTION = 50
const SEPARATORS = ["_", "-", ":"] as const
/**
* Longest common prefix across `values`, trimmed back to the last separator.
*
* Returns `""` if the prefix is empty or contains no separator — in that case
* the values do not share a "join key shape" and should be skipped.
*/
export function commonPrefix(values: readonly string[]): string {
const items = values.filter((v): v is string => typeof v === "string")
if (items.length === 0) return ""
let prefix = items[0]
for (let k = 1; k < items.length; k++) {
const s = items[k]
let i = 0
const limit = Math.min(prefix.length, s.length)
while (i < limit && prefix[i] === s[i]) i++
prefix = prefix.slice(0, i)
if (prefix.length === 0) return ""
}
if (prefix.length === 0) return ""
if (endsWithSeparator(prefix)) return prefix
// Walk back to the last separator we can find — anything past it is
// a partial token, not a join key prefix.
let bestIdx = -1
for (const sep of SEPARATORS) {
const idx = prefix.lastIndexOf(sep)
if (idx > bestIdx) bestIdx = idx
}
if (bestIdx < 0) return ""
return prefix.slice(0, bestIdx + 1)
}
function endsWithSeparator(s: string): boolean {
if (s.length === 0) return false
const last = s[s.length - 1]
return last === "_" || last === "-" || last === ":"
}
/** A single (db, table, column) bag of string sample values. */
export interface ColumnSampleBag {
db: string
table: string
column: string
values: string[]
}
export interface JoinCandidate {
left_db: string
left_table: string
left_col: string
right_db: string
right_table: string
right_col: string
prefix_rule: { left: string; right: string }
suffix_overlap: number
/**
* Confidence proxy in [0, 1]: overlap normalized by the smaller suffix bag.
* Cheap and monotonic — not a probability.
*/
confidence: number
}
/**
* Pure (no I/O) detector. Exported so unit tests can drive it with synthetic
* sample data — and so the integration test can use an in-memory SQLite fixture
* without re-implementing the algorithm.
*/
export function detectJoinCandidatesFromBags(bags: ColumnSampleBag[]): JoinCandidate[] {
const candidates: JoinCandidate[] = []
for (let i = 0; i < bags.length; i++) {
const left = bags[i]
if (left.values.length === 0) continue
const lp = commonPrefix(left.values)
if (!lp) continue
const lsuf = stripPrefixSet(left.values, lp)
if (lsuf.size === 0) continue
for (let j = i + 1; j < bags.length; j++) {
const right = bags[j]
if (right.db === left.db) continue // cross-DB only
if (right.values.length === 0) continue
const rp = commonPrefix(right.values)
if (!rp || rp === lp) continue
const rsuf = stripPrefixSet(right.values, rp)
if (rsuf.size === 0) continue
let overlap = 0
for (const s of lsuf) if (rsuf.has(s)) overlap++
if (overlap === 0) continue
const denom = Math.min(lsuf.size, rsuf.size)
const confidence = denom > 0 ? overlap / denom : 0
candidates.push({
left_db: left.db,
left_table: left.table,
left_col: left.column,
right_db: right.db,
right_table: right.table,
right_col: right.column,
prefix_rule: { left: lp, right: rp },
suffix_overlap: overlap,
confidence,
})
}
}
candidates.sort((a, b) => {
if (b.suffix_overlap !== a.suffix_overlap) return b.suffix_overlap - a.suffix_overlap
return b.confidence - a.confidence
})
return candidates
}
function stripPrefixSet(values: readonly string[], prefix: string): Set<string> {
const out = new Set<string>()
for (const v of values) {
if (typeof v === "string" && v.startsWith(prefix)) {
const suf = v.slice(prefix.length)
if (suf.length > 0) out.add(suf)
}
}
return out
}
// ---------------------------------------------------------------------------
// I/O: pull sample bags from live connectors
// ---------------------------------------------------------------------------
/** Heuristic: data types we treat as "string-like" for sampling. */
const STRING_TYPE_PATTERN =
/^(varchar|char|character|text|string|nvarchar|nchar|clob|json|uuid|citext|bpchar|name)/i
function isStringLike(dataType: string | undefined): boolean {
if (!dataType) return false
return STRING_TYPE_PATTERN.test(dataType.trim())
}
/**
* Quote a SQL identifier with double quotes — safe for every dialect we ship
* a driver for. Embedded double-quotes are doubled per ANSI rules.
*/
function quoteIdent(name: string): string {
return '"' + name.replace(/"/g, '""') + '"'
}
/**
* Fetch up to `sampleSize` non-null string sample values for one column.
* Returns `[]` on any error so a single bad table never aborts the scan.
*/
async function fetchColumnSamples(
connector: Connector,
schema: string | undefined,
table: string,
column: string,
sampleSize: number,
): Promise<string[]> {
const target = schema ? `${quoteIdent(schema)}.${quoteIdent(table)}` : quoteIdent(table)
const col = quoteIdent(column)
const sql = `SELECT ${col} FROM ${target} WHERE ${col} IS NOT NULL LIMIT ${sampleSize}`
try {
const result = await connector.execute(sql, sampleSize)
const out: string[] = []
for (const row of result.rows) {
const v = row[0]
if (typeof v === "string" && v.length > 0) out.push(v)
}
return out
} catch {
return []
}
}
/**
* Build the per-(db,table,column) sample bag list across all `connections`.
*
* Errors connecting to or describing one warehouse must not abort the whole
* run — the caller still wants candidates from the surviving connections.
*/
export async function collectSampleBags(
params: AltimateCoreDetectJoinCandidatesParams,
): Promise<{ bags: ColumnSampleBag[]; errors: Record<string, string> }> {
const sampleSize = params.sample_size ?? DEFAULT_SAMPLE_SIZE
const maxTables = params.max_tables_per_connection ?? DEFAULT_MAX_TABLES_PER_CONNECTION
const bags: ColumnSampleBag[] = []
const errors: Record<string, string> = {}
for (const name of params.connections) {
try {
const connector = await Registry.get(name)
const schemas = params.schema_name
? [params.schema_name]
: await safeListSchemas(connector)
let tablesScanned = 0
for (const schema of schemas) {
if (tablesScanned >= maxTables) break
const tables = await safeListTables(connector, schema)
for (const t of tables) {
if (tablesScanned >= maxTables) break
tablesScanned++
const columns = await safeDescribeTable(connector, schema, t.name)
for (const c of columns) {
if (!isStringLike(c.data_type)) continue
const values = await fetchColumnSamples(
connector,
schema,
t.name,
c.name,
sampleSize,
)
if (values.length === 0) continue
bags.push({ db: name, table: `${schema}.${t.name}`, column: c.name, values })
}
}
}
} catch (e) {
errors[name] = String(e)
}
}
return { bags, errors }
}
async function safeListSchemas(connector: Connector): Promise<string[]> {
try {
return await connector.listSchemas()
} catch {
return ["public"]
}
}
async function safeListTables(
connector: Connector,
schema: string,
): Promise<Array<{ name: string; type: string }>> {
try {
return await connector.listTables(schema)
} catch {
return []
}
}
async function safeDescribeTable(
connector: Connector,
schema: string,
table: string,
): Promise<Array<{ name: string; data_type: string }>> {
try {
const cols = await connector.describeTable(schema, table)
return cols.map((c) => ({ name: c.name, data_type: c.data_type }))
} catch {
return []
}
}
// ---------------------------------------------------------------------------
// Native handler entrypoint
// ---------------------------------------------------------------------------
export async function detectJoinCandidates(
params: AltimateCoreDetectJoinCandidatesParams,
): Promise<AltimateCoreResult> {
if (!Array.isArray(params.connections) || params.connections.length < 2) {
return {
success: false,
data: {},
error: "detect_join_candidates requires at least two warehouse connections.",
}
}
try {
const { bags, errors } = await collectSampleBags(params)
const candidates = detectJoinCandidatesFromBags(bags)
return {
success: true,
data: {
candidates,
bags_scanned: bags.length,
connection_errors: errors,
},
}
} catch (e) {
return { success: false, data: {}, error: String(e) }
}
}
// altimate_change end