Skip to content

Commit 25991fd

Browse files
kulvirgitclaude
andcommitted
refactor: replace custom lineage with sqlguard column lineage
- Delete `lineage/check.py`, `lineage/classification.py`, and `test_lineage.py` - Rewire `lineage.check` bridge method to use `guard_column_lineage` (sqlguard) - Rewrite `dbt/lineage.py` to delegate to sqlguard instead of custom implementation - Add lazy `_ensure_init()` in `guard.py` for sqlguard SDK initialization - Update `DbtLineageResult` to pass through raw sqlguard output (`raw_lineage`) - Update `LineageCheckResult` in protocol.ts to use `SqlGuardResult` format - Update `lineage-check.ts` and `dbt-lineage.ts` for new result shapes - Fix `test_lineage_check` to assert on new response structure Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fc31b92 commit 25991fd

12 files changed

Lines changed: 146 additions & 440 deletions

File tree

packages/altimate-code/src/bridge/protocol.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,9 @@ export interface LineageEdge {
136136
}
137137

138138
export interface LineageCheckResult {
139-
edges: LineageEdge[]
140-
tables: string[]
141-
columns: string[]
142-
confidence: string
143-
confidence_factors: string[]
139+
success: boolean
140+
data: Record<string, unknown>
141+
error?: string
144142
}
145143

146144
// --- dbt ---
@@ -916,9 +914,7 @@ export interface DbtLineageResult {
916914
model_name: string
917915
model_unique_id?: string
918916
compiled_sql?: string
919-
edges: LineageEdge[]
920-
tables: string[]
921-
columns: string[]
917+
raw_lineage: Record<string, unknown>
922918
confidence: string
923919
confidence_factors: string[]
924920
}

packages/altimate-code/src/tool/dbt-lineage.ts

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { DbtLineageResult } from "../bridge/protocol"
55

66
export const DbtLineageTool = Tool.define("dbt_lineage", {
77
description:
8-
"Compute column-level lineage for a dbt model. Takes a manifest.json path and model name, extracts compiled SQL and upstream schemas, and traces how source columns flow to output columns.",
8+
"Compute column-level lineage for a dbt model using the Rust-based sqlguard engine. Takes a manifest.json path and model name, extracts compiled SQL and upstream schemas, and traces column flow.",
99
parameters: z.object({
1010
manifest_path: z.string().describe("Path to dbt manifest.json file"),
1111
model: z.string().describe("Model name or unique_id (e.g. 'my_model' or 'model.project.my_model')"),
@@ -19,12 +19,12 @@ export const DbtLineageTool = Tool.define("dbt_lineage", {
1919
dialect: args.dialect,
2020
})
2121

22+
const hasError = result.confidence_factors.length > 0 && result.confidence === "low"
23+
2224
return {
23-
title: `dbt Lineage: ${result.model_name} ${result.edges.length} edge(s) [${result.confidence}]`,
25+
title: `dbt Lineage: ${result.model_name} [${result.confidence}]`,
2426
metadata: {
2527
model_name: result.model_name,
26-
edgeCount: result.edges.length,
27-
tableCount: result.tables.length,
2828
confidence: result.confidence,
2929
},
3030
output: formatDbtLineage(result),
@@ -33,7 +33,7 @@ export const DbtLineageTool = Tool.define("dbt_lineage", {
3333
const msg = e instanceof Error ? e.message : String(e)
3434
return {
3535
title: "dbt Lineage: ERROR",
36-
metadata: { model_name: args.model, edgeCount: 0, tableCount: 0, confidence: "unknown" },
36+
metadata: { model_name: args.model, confidence: "unknown" },
3737
output: `Failed: ${msg}`,
3838
}
3939
}
@@ -53,25 +53,37 @@ function formatDbtLineage(result: DbtLineageResult): string {
5353
lines.push("")
5454
}
5555

56-
if (result.edges.length === 0) {
57-
lines.push("No column-level lineage edges detected.")
56+
const lineage = result.raw_lineage
57+
if (!lineage || Object.keys(lineage).length === 0) {
58+
lines.push("No lineage data returned.")
5859
if (!result.compiled_sql) {
5960
lines.push("Run `dbt compile` first to generate compiled SQL.")
6061
}
6162
return lines.join("\n")
6263
}
6364

64-
lines.push("Column Lineage:")
65-
lines.push("Source → Target | Transform")
66-
lines.push("".padEnd(60, "-"))
65+
// column_dict: output columns -> source columns mapping
66+
const columnDict = lineage.column_dict as Record<string, unknown> | undefined
67+
if (columnDict) {
68+
lines.push("Column Mappings:")
69+
for (const [target, sources] of Object.entries(columnDict)) {
70+
lines.push(` ${target}${JSON.stringify(sources)}`)
71+
}
72+
lines.push("")
73+
}
6774

68-
for (const edge of result.edges) {
69-
const transform = edge.transform ? ` | ${edge.transform}` : ""
70-
lines.push(`${edge.source_table}.${edge.source_column}${edge.target_table}.${edge.target_column}${transform}`)
75+
// column_lineage: detailed edge list
76+
const edges = lineage.column_lineage as unknown[] | undefined
77+
if (edges?.length) {
78+
lines.push("Lineage Edges:")
79+
for (const edge of edges) {
80+
lines.push(` ${JSON.stringify(edge)}`)
81+
}
7182
}
7283

73-
lines.push("")
74-
lines.push(`Tables: ${result.tables.join(", ")}`)
84+
if (!columnDict && !edges) {
85+
lines.push(JSON.stringify(lineage, null, 2))
86+
}
7587

7688
return lines.join("\n")
7789
}

packages/altimate-code/src/tool/lineage-check.ts

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { LineageCheckResult } from "../bridge/protocol"
55

66
export const LineageCheckTool = Tool.define("lineage_check", {
77
description:
8-
"Check column-level lineage for a SQL query. Traces how source columns flow through transformations to output columns. Useful for impact analysis and understanding data flow.",
8+
"Check column-level lineage for a SQL query using the Rust-based sqlguard engine. Traces how source columns flow through transformations to output columns. Useful for impact analysis and understanding data flow.",
99
parameters: z.object({
1010
sql: z.string().describe("SQL query to trace lineage for"),
1111
dialect: z
@@ -26,54 +26,54 @@ export const LineageCheckTool = Tool.define("lineage_check", {
2626
schema_context: args.schema_context,
2727
})
2828

29+
const data = result.data as Record<string, any>
30+
if (result.error) {
31+
return {
32+
title: "Lineage: ERROR",
33+
metadata: { success: false },
34+
output: `Error: ${result.error}`,
35+
}
36+
}
37+
2938
return {
30-
title: `Lineage: ${result.edges.length} edge${result.edges.length !== 1 ? "s" : ""}, ${result.tables.length} table${result.tables.length !== 1 ? "s" : ""} [${result.confidence}]`,
31-
metadata: {
32-
edgeCount: result.edges.length,
33-
tableCount: result.tables.length,
34-
columnCount: result.columns.length,
35-
confidence: result.confidence,
36-
},
37-
output: formatLineage(result),
39+
title: `Lineage: ${result.success ? "OK" : "PARTIAL"}`,
40+
metadata: { success: result.success },
41+
output: formatLineage(data),
3842
}
3943
} catch (e) {
4044
const msg = e instanceof Error ? e.message : String(e)
4145
return {
4246
title: "Lineage: ERROR",
43-
metadata: { edgeCount: 0, tableCount: 0, columnCount: 0, confidence: "unknown" },
44-
output: `Failed to check lineage: ${msg}\n\nEnsure the Python bridge is running and altimate-engine is installed.`,
47+
metadata: { success: false },
48+
output: `Failed to check lineage: ${msg}\n\nEnsure the Python bridge is running and sqlguard is initialized.`,
4549
}
4650
}
4751
},
4852
})
4953

50-
function formatLineage(result: LineageCheckResult): string {
54+
function formatLineage(data: Record<string, any>): string {
5155
const lines: string[] = []
5256

53-
if (result.confidence_factors.length > 0) {
54-
lines.push(`Confidence: ${result.confidence}`)
55-
lines.push(` Note: ${result.confidence_factors.join("; ")}`)
57+
// column_dict: output columns -> source columns mapping
58+
if (data.column_dict) {
59+
lines.push("Column Mappings:")
60+
for (const [target, sources] of Object.entries(data.column_dict)) {
61+
lines.push(` ${target}${JSON.stringify(sources)}`)
62+
}
5663
lines.push("")
5764
}
5865

59-
if (result.edges.length === 0) {
60-
lines.push("No column-level lineage edges detected.")
61-
lines.push("This may indicate the query uses SELECT * or has complex expressions that couldn't be traced.")
62-
return lines.join("\n")
66+
// column_lineage: detailed edge list
67+
if (data.column_lineage?.length) {
68+
lines.push("Lineage Edges:")
69+
for (const edge of data.column_lineage) {
70+
lines.push(` ${JSON.stringify(edge)}`)
71+
}
6372
}
6473

65-
lines.push("Column Lineage Edges:")
66-
lines.push("Source Table.Column → Target Table.Column | Transform")
67-
lines.push("".padEnd(60, "-"))
68-
69-
for (const edge of result.edges) {
70-
const transform = edge.transform ? ` | ${edge.transform}` : ""
71-
lines.push(`${edge.source_table}.${edge.source_column}${edge.target_table}.${edge.target_column}${transform}`)
74+
if (lines.length === 0) {
75+
lines.push(JSON.stringify(data, null, 2))
7276
}
7377

74-
lines.push("")
75-
lines.push(`Tables: ${result.tables.join(", ")}`)
76-
lines.push(`Columns: ${result.columns.join(", ")}`)
77-
7878
return lines.join("\n")
7979
}

packages/altimate-engine/src/altimate_engine/dbt/lineage.py

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
from pathlib import Path
88
from typing import Any
99

10-
from altimate_engine.lineage.check import check_lineage
10+
from altimate_engine.sql.guard import guard_column_lineage
1111
from altimate_engine.models import (
1212
DbtLineageParams,
1313
DbtLineageResult,
14-
LineageCheckParams,
15-
ModelColumn,
1614
)
1715

1816
logger = logging.getLogger(__name__)
@@ -23,15 +21,12 @@ def dbt_lineage(params: DbtLineageParams) -> DbtLineageResult:
2321
2422
Loads the manifest, finds the target model (by name or unique_id),
2523
extracts its compiled SQL + upstream schemas, and delegates to
26-
the lineage checker.
24+
sqlguard's column_lineage via guard_column_lineage.
2725
"""
2826
manifest_path = Path(params.manifest_path)
2927
if not manifest_path.exists():
3028
return DbtLineageResult(
3129
model_name=params.model,
32-
edges=[],
33-
tables=[],
34-
columns=[],
3530
confidence="low",
3631
confidence_factors=["Manifest file not found"],
3732
)
@@ -42,9 +37,6 @@ def dbt_lineage(params: DbtLineageParams) -> DbtLineageResult:
4237
except (json.JSONDecodeError, OSError) as e:
4338
return DbtLineageResult(
4439
model_name=params.model,
45-
edges=[],
46-
tables=[],
47-
columns=[],
4840
confidence="low",
4941
confidence_factors=[f"Failed to parse manifest: {e}"],
5042
)
@@ -57,9 +49,6 @@ def dbt_lineage(params: DbtLineageParams) -> DbtLineageResult:
5749
if model_node is None:
5850
return DbtLineageResult(
5951
model_name=params.model,
60-
edges=[],
61-
tables=[],
62-
columns=[],
6352
confidence="low",
6453
confidence_factors=[f"Model '{params.model}' not found in manifest"],
6554
)
@@ -69,9 +58,6 @@ def dbt_lineage(params: DbtLineageParams) -> DbtLineageResult:
6958
if not sql:
7059
return DbtLineageResult(
7160
model_name=params.model,
72-
edges=[],
73-
tables=[],
74-
columns=[],
7561
confidence="low",
7662
confidence_factors=["No compiled SQL found — run `dbt compile` first"],
7763
)
@@ -85,40 +71,33 @@ def dbt_lineage(params: DbtLineageParams) -> DbtLineageResult:
8571
upstream_ids = model_node.get("depends_on", {}).get("nodes", [])
8672
schema_context = _build_schema_context(nodes, sources, upstream_ids)
8773

88-
# Delegate to lineage checker
89-
lineage_result = check_lineage(
90-
LineageCheckParams(
91-
sql=sql,
92-
dialect=dialect,
93-
schema_context=schema_context if schema_context else None,
94-
)
74+
# Delegate to sqlguard column_lineage
75+
raw = guard_column_lineage(
76+
sql,
77+
dialect=dialect,
78+
schema_context=schema_context if schema_context else None,
9579
)
9680

81+
# Extract database/schema defaults from model node
9782
return DbtLineageResult(
9883
model_name=model_node.get("name", params.model),
9984
model_unique_id=_get_unique_id(nodes, params.model),
10085
compiled_sql=sql,
101-
edges=lineage_result.edges,
102-
tables=lineage_result.tables,
103-
columns=lineage_result.columns,
104-
confidence=lineage_result.confidence,
105-
confidence_factors=lineage_result.confidence_factors,
86+
raw_lineage=raw,
87+
confidence="high" if not raw.get("error") else "low",
88+
confidence_factors=[raw["error"]] if raw.get("error") else [],
10689
)
10790

10891

10992
def _find_model(nodes: dict[str, Any], model: str) -> dict[str, Any] | None:
11093
"""Find model node by name or unique_id."""
111-
# Try exact unique_id match first
11294
if model in nodes:
11395
return nodes[model]
114-
115-
# Try model.{name} format
11696
for node_id, node in nodes.items():
11797
if node.get("resource_type") != "model":
11898
continue
11999
if node.get("name") == model:
120100
return node
121-
122101
return None
123102

124103

@@ -134,7 +113,6 @@ def _get_unique_id(nodes: dict[str, Any], model: str) -> str | None:
134113

135114
def _detect_dialect(manifest: dict[str, Any], model_node: dict[str, Any]) -> str:
136115
"""Detect SQL dialect from manifest metadata."""
137-
# Check adapter type in metadata
138116
metadata = manifest.get("metadata", {})
139117
adapter = metadata.get("adapter_type", "")
140118
if adapter:
@@ -148,28 +126,26 @@ def _detect_dialect(manifest: dict[str, Any], model_node: dict[str, Any]) -> str
148126
"duckdb": "duckdb",
149127
}
150128
return dialect_map.get(adapter, adapter)
151-
152129
return "snowflake"
153130

154131

155132
def _build_schema_context(
156133
nodes: dict[str, Any],
157134
sources: dict[str, Any],
158135
upstream_ids: list[str],
159-
) -> dict[str, list[ModelColumn]]:
136+
) -> dict | None:
160137
"""Build schema context from upstream model/source columns.
161138
162-
Returns a dict mapping table names to column definitions for
163-
the lineage checker's schema_context param.
139+
Returns sqlguard schema format:
140+
{"tables": {"table_name": {"columns": [{"name": ..., "type": ...}]}}, "version": "1"}
164141
"""
165-
schema: dict[str, list[ModelColumn]] = {}
142+
tables: dict[str, dict] = {}
166143

167144
for uid in upstream_ids:
168145
node = nodes.get(uid) or sources.get(uid)
169146
if node is None:
170147
continue
171148

172-
# Use alias if available, else name
173149
table_name = node.get("alias") or node.get("name", "")
174150
if not table_name:
175151
continue
@@ -179,14 +155,14 @@ def _build_schema_context(
179155
continue
180156

181157
cols = [
182-
ModelColumn(
183-
name=col.get("name", col_name),
184-
data_type=col.get("data_type") or col.get("type") or "",
185-
)
158+
{"name": col.get("name", col_name), "type": col.get("data_type") or col.get("type") or ""}
186159
for col_name, col in columns_dict.items()
187160
]
188161

189162
if cols:
190-
schema[table_name] = cols
163+
tables[table_name] = {"columns": cols}
164+
165+
if not tables:
166+
return None
191167

192-
return schema
168+
return {"tables": tables, "version": "1"}

packages/altimate-engine/src/altimate_engine/lineage/__init__.py

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)