Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/opencode/src/altimate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export * from "./tools/altimate-core-column-lineage"
export * from "./tools/altimate-core-compare"
export * from "./tools/altimate-core-complete"
export * from "./tools/altimate-core-correct"
export * from "./tools/altimate-core-detect-join-candidates"
export * from "./tools/altimate-core-equivalence"
export * from "./tools/altimate-core-export-ddl"
export * from "./tools/altimate-core-extract-metadata"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
// altimate_change start — cross-DB join key inference
/**
* Cross-DB join key inference.
*
* For each pair of (db, table, column) drawn from different warehouse
* connections, look for a shared value-shape: both sides have a non-empty
* common prefix that ends in a separator (`_`, `-`, `:`), the prefixes are
* different, and stripping the prefixes leaves at least one matching suffix.
*
* The canonical pattern this targets: a `business_id` column whose values
* look like `businessid_42` joins to a `business_ref` column with values
* like `businessref_42`. The inference is purely value-based — it does not
* inspect column names — so it survives schemas that disagree on naming
* conventions.
*
* The algorithm here is a TypeScript port of `_detect_join_candidates` /
* `_common_prefix` from dab_bench's preindexer.
*/

import * as Registry from "./registry"
import type { Connector } from "@altimateai/drivers"
import type {
AltimateCoreDetectJoinCandidatesParams,
AltimateCoreResult,
} from "../types"

const DEFAULT_SAMPLE_SIZE = 50
const DEFAULT_MAX_TABLES_PER_CONNECTION = 50
const SEPARATORS = ["_", "-", ":"] as const

/**
* Longest common prefix across `values`, trimmed back to the last separator.
*
* Returns `""` if the prefix is empty or contains no separator — in that case
* the values do not share a "join key shape" and should be skipped.
*/
export function commonPrefix(values: readonly string[]): string {
const items = values.filter((v): v is string => typeof v === "string")
if (items.length === 0) return ""

let prefix = items[0]
for (let k = 1; k < items.length; k++) {
const s = items[k]
let i = 0
const limit = Math.min(prefix.length, s.length)
while (i < limit && prefix[i] === s[i]) i++
prefix = prefix.slice(0, i)
if (prefix.length === 0) return ""
}

if (prefix.length === 0) return ""
if (endsWithSeparator(prefix)) return prefix

// Walk back to the last separator we can find — anything past it is
// a partial token, not a join key prefix.
let bestIdx = -1
for (const sep of SEPARATORS) {
const idx = prefix.lastIndexOf(sep)
if (idx > bestIdx) bestIdx = idx
}
if (bestIdx < 0) return ""
return prefix.slice(0, bestIdx + 1)
}

function endsWithSeparator(s: string): boolean {
if (s.length === 0) return false
const last = s[s.length - 1]
return last === "_" || last === "-" || last === ":"
}

/** A single (db, table, column) bag of string sample values. */
export interface ColumnSampleBag {
db: string
table: string
column: string
values: string[]
}

export interface JoinCandidate {
left_db: string
left_table: string
left_col: string
right_db: string
right_table: string
right_col: string
prefix_rule: { left: string; right: string }
suffix_overlap: number
/**
* Confidence proxy in [0, 1]: overlap normalized by the smaller suffix bag.
* Cheap and monotonic — not a probability.
*/
confidence: number
}

/**
* Pure (no I/O) detector. Exported so unit tests can drive it with synthetic
* sample data — and so the integration test can use an in-memory SQLite fixture
* without re-implementing the algorithm.
*/
export function detectJoinCandidatesFromBags(bags: ColumnSampleBag[]): JoinCandidate[] {
const candidates: JoinCandidate[] = []
for (let i = 0; i < bags.length; i++) {
const left = bags[i]
if (left.values.length === 0) continue
const lp = commonPrefix(left.values)
if (!lp) continue
const lsuf = stripPrefixSet(left.values, lp)
if (lsuf.size === 0) continue

for (let j = i + 1; j < bags.length; j++) {
const right = bags[j]
if (right.db === left.db) continue // cross-DB only
if (right.values.length === 0) continue
const rp = commonPrefix(right.values)
if (!rp || rp === lp) continue
const rsuf = stripPrefixSet(right.values, rp)
if (rsuf.size === 0) continue

let overlap = 0
for (const s of lsuf) if (rsuf.has(s)) overlap++
if (overlap === 0) continue

const denom = Math.min(lsuf.size, rsuf.size)
const confidence = denom > 0 ? overlap / denom : 0

candidates.push({
left_db: left.db,
left_table: left.table,
left_col: left.column,
right_db: right.db,
right_table: right.table,
right_col: right.column,
prefix_rule: { left: lp, right: rp },
suffix_overlap: overlap,
confidence,
})
}
}

candidates.sort((a, b) => {
if (b.suffix_overlap !== a.suffix_overlap) return b.suffix_overlap - a.suffix_overlap
return b.confidence - a.confidence
})
return candidates
}

function stripPrefixSet(values: readonly string[], prefix: string): Set<string> {
const out = new Set<string>()
for (const v of values) {
if (typeof v === "string" && v.startsWith(prefix)) {
const suf = v.slice(prefix.length)
if (suf.length > 0) out.add(suf)
}
Comment on lines +150 to +156
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ignore whitespace-only suffixes before scoring.

stripPrefixSet() currently treats " " / "\t" as real suffixes because it only checks length > 0. That can produce suffix_overlap > 0 for columns that don't actually contain a usable join token.

Suggested fix
 function stripPrefixSet(values: readonly string[], prefix: string): Set<string> {
   const out = new Set<string>()
   for (const v of values) {
     if (typeof v === "string" && v.startsWith(prefix)) {
       const suf = v.slice(prefix.length)
-      if (suf.length > 0) out.add(suf)
+      if (suf.trim().length > 0) out.add(suf)
     }
   }
   return out
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function stripPrefixSet(values: readonly string[], prefix: string): Set<string> {
const out = new Set<string>()
for (const v of values) {
if (typeof v === "string" && v.startsWith(prefix)) {
const suf = v.slice(prefix.length)
if (suf.length > 0) out.add(suf)
}
function stripPrefixSet(values: readonly string[], prefix: string): Set<string> {
const out = new Set<string>()
for (const v of values) {
if (typeof v === "string" && v.startsWith(prefix)) {
const suf = v.slice(prefix.length)
if (suf.trim().length > 0) out.add(suf)
}
}
return out
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/opencode/src/altimate/native/connections/detect-join-candidates.ts`
around lines 150 - 156, stripPrefixSet currently adds suffixes that are only
whitespace because it checks suf.length > 0; update stripPrefixSet(values,
prefix) to trim whitespace before deciding and storing: compute suf =
v.slice(prefix.length), then let trimmed = suf.trim() and only add trimmed to
the Set if trimmed.length > 0 (store trimmed, not the raw suf). This ensures
whitespace-only suffixes like " " or "\t" are ignored when building the returned
Set.

}
return out
}

// ---------------------------------------------------------------------------
// I/O: pull sample bags from live connectors
// ---------------------------------------------------------------------------

/** Heuristic: data types we treat as "string-like" for sampling. */
const STRING_TYPE_PATTERN =
/^(varchar|char|character|text|string|nvarchar|nchar|clob|json|uuid|citext|bpchar|name)/i

function isStringLike(dataType: string | undefined): boolean {
if (!dataType) return false
return STRING_TYPE_PATTERN.test(dataType.trim())
}

/**
* Quote a SQL identifier with double quotes — safe for every dialect we ship
* a driver for. Embedded double-quotes are doubled per ANSI rules.
*/
function quoteIdent(name: string): string {
return '"' + name.replace(/"/g, '""') + '"'
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
}

/**
* Fetch up to `sampleSize` non-null string sample values for one column.
* Returns `[]` on any error so a single bad table never aborts the scan.
*/
async function fetchColumnSamples(
connector: Connector,
schema: string | undefined,
table: string,
column: string,
sampleSize: number,
): Promise<string[]> {
const target = schema ? `${quoteIdent(schema)}.${quoteIdent(table)}` : quoteIdent(table)
const col = quoteIdent(column)
const sql = `SELECT ${col} FROM ${target} WHERE ${col} IS NOT NULL LIMIT ${sampleSize}`
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
try {
const result = await connector.execute(sql, sampleSize)
const out: string[] = []
for (const row of result.rows) {
const v = row[0]
if (typeof v === "string" && v.length > 0) out.push(v)
}
return out
} catch {
return []
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/**
* Build the per-(db,table,column) sample bag list across all `connections`.
*
* Errors connecting to or describing one warehouse must not abort the whole
* run — the caller still wants candidates from the surviving connections.
*/
export async function collectSampleBags(
params: AltimateCoreDetectJoinCandidatesParams,
): Promise<{ bags: ColumnSampleBag[]; errors: Record<string, string> }> {
const sampleSize = params.sample_size ?? DEFAULT_SAMPLE_SIZE
const maxTables = params.max_tables_per_connection ?? DEFAULT_MAX_TABLES_PER_CONNECTION
const bags: ColumnSampleBag[] = []
const errors: Record<string, string> = {}

for (const name of params.connections) {
try {
const connector = await Registry.get(name)
const schemas = params.schema_name
? [params.schema_name]
: await safeListSchemas(connector)
let tablesScanned = 0
for (const schema of schemas) {
if (tablesScanned >= maxTables) break
const tables = await safeListTables(connector, schema)
for (const t of tables) {
if (tablesScanned >= maxTables) break
tablesScanned++
const columns = await safeDescribeTable(connector, schema, t.name)
for (const c of columns) {
if (!isStringLike(c.data_type)) continue
const values = await fetchColumnSamples(
connector,
schema,
t.name,
c.name,
sampleSize,
)
if (values.length === 0) continue
bags.push({ db: name, table: `${schema}.${t.name}`, column: c.name, values })
}
}
}
} catch (e) {
errors[name] = String(e)
}
}

return { bags, errors }
}

async function safeListSchemas(connector: Connector): Promise<string[]> {
try {
return await connector.listSchemas()
} catch {
return ["public"]
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

async function safeListTables(
connector: Connector,
schema: string,
): Promise<Array<{ name: string; type: string }>> {
try {
return await connector.listTables(schema)
} catch {
return []
}
}

async function safeDescribeTable(
connector: Connector,
schema: string,
table: string,
): Promise<Array<{ name: string; data_type: string }>> {
try {
const cols = await connector.describeTable(schema, table)
return cols.map((c) => ({ name: c.name, data_type: c.data_type }))
} catch {
return []
}
}

// ---------------------------------------------------------------------------
// Native handler entrypoint
// ---------------------------------------------------------------------------

export async function detectJoinCandidates(
params: AltimateCoreDetectJoinCandidatesParams,
): Promise<AltimateCoreResult> {
if (!Array.isArray(params.connections) || params.connections.length < 2) {
return {
success: false,
data: {},
error: "detect_join_candidates requires at least two warehouse connections.",
}
}
try {
const { bags, errors } = await collectSampleBags(params)
const candidates = detectJoinCandidatesFromBags(bags)
return {
success: true,
data: {
candidates,
bags_scanned: bags.length,
connection_errors: errors,
},
}
} catch (e) {
return { success: false, data: {}, error: String(e) }
}
}
// altimate_change end
10 changes: 10 additions & 0 deletions packages/opencode/src/altimate/native/connections/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import * as Registry from "./registry"
import { discoverContainers } from "./docker-discovery"
import { parseDbtProfiles } from "./dbt-profiles"
import { runDataDiff } from "./data-diff"
// altimate_change start — cross-DB join key inference
import { detectJoinCandidates } from "./detect-join-candidates"
// altimate_change end
import type {
SqlExecuteParams,
SqlExecuteResult,
Expand Down Expand Up @@ -626,6 +629,13 @@ register("data.diff", async (params: DataDiffParams): Promise<DataDiffResult> =>
return runDataDiff(params)
})

// altimate_change start — cross-DB join key inference
// --- altimate_core.detect_join_candidates ---
register("altimate_core.detect_join_candidates", async (params) => {
return detectJoinCandidates(params)
})
// altimate_change end

} // end registerAll

// Auto-register on module load
Expand Down
26 changes: 26 additions & 0 deletions packages/opencode/src/altimate/native/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,26 @@ export interface AltimateCoreIsSafeParams {
sql: string
}

// altimate_change start — cross-DB join key inference
/**
* Parameters for `altimate_core.detect_join_candidates`.
*
* Pulls a small bag of string sample values from each (warehouse, schema, table,
* column), then looks across DBs for value-prefix patterns that suggest a join
* relationship (e.g. `businessid_42` ↔ `businessref_42`).
*/
export interface AltimateCoreDetectJoinCandidatesParams {
/** Warehouse connection names to compare. At least two are required. */
connections: string[]
/** Optional restriction to a single schema per warehouse. */
schema_name?: string
/** Number of sample values per column. Default 50. */
sample_size?: number
/** Maximum number of tables to scan per connection. Default 50. */
max_tables_per_connection?: number
}
// altimate_change end

// --- dbt Lineage ---

export interface DbtLineageParams {
Expand Down Expand Up @@ -1212,6 +1232,12 @@ export const BridgeMethods = {
"altimate_core.introspection_sql": {} as { params: AltimateCoreIntrospectionSqlParams; result: AltimateCoreResult },
"altimate_core.parse_dbt": {} as { params: AltimateCoreParseDbtParams; result: AltimateCoreResult },
"altimate_core.is_safe": {} as { params: AltimateCoreIsSafeParams; result: AltimateCoreResult },
// altimate_change start — cross-DB join key inference
"altimate_core.detect_join_candidates": {} as {
params: AltimateCoreDetectJoinCandidatesParams
result: AltimateCoreResult
},
// altimate_change end
ping: {} as { params: Record<string, never>; result: { status: string } },
} as const

Expand Down
Loading
Loading