Skip to content

Commit c5773ed

Browse files
suryaiyer95claude
andcommitted
feat: wire data_diff tool through reladiff engine for deterministic data validation
Adds the full pipeline: TypeScript tool → Bridge → Python orchestrator → Rust engine. - `data-diff-run.ts`: TypeScript tool wrapping `Bridge.call("data_diff.run")` - `data_diff.py`: Python orchestrator driving the cooperative state machine loop via `altimate_core.ReladiffSession` (start → execute SQL → step → repeat) - `server.py`: Added `data_diff.run` dispatch to JSON-RPC bridge - `protocol.ts`: `DataDiffRunParams`/`DataDiffRunResult` interfaces + bridge method - `registry.ts`: Registered `DataDiffRunTool` in tool registry - `agent.ts`: Added `data_diff: "allow"` to data-diff agent permissions - `data-diff.txt`: Rewrote prompt to use `data_diff` tool as primary approach, with manual SQL as fallback Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0cf3477 commit c5773ed

7 files changed

Lines changed: 439 additions & 76 deletions

File tree

packages/altimate-engine/src/altimate_engine/server.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,24 @@ def dispatch(request: JsonRpcRequest) -> JsonRpcResponse:
953953
target_dialect=p.target_dialect,
954954
)
955955
result = LocalTestResult(**raw)
956+
elif method == "data_diff.run":
957+
from altimate_engine.sql.data_diff import run_data_diff
958+
959+
raw = run_data_diff(
960+
source_table=params.get("source_table", ""),
961+
target_table=params.get("target_table", ""),
962+
source_warehouse=params.get("source_warehouse", ""),
963+
target_warehouse=params.get("target_warehouse"),
964+
key_columns=params.get("key_columns", []),
965+
extra_columns=params.get("extra_columns"),
966+
algorithm=params.get("algorithm", "auto"),
967+
where_clause=params.get("where_clause"),
968+
source_database=params.get("source_database"),
969+
source_schema=params.get("source_schema"),
970+
target_database=params.get("target_database"),
971+
target_schema=params.get("target_schema"),
972+
)
973+
return JsonRpcResponse(result=raw, id=request.id)
956974
elif method == "ping":
957975
return JsonRpcResponse(result={"status": "ok"}, id=request.id)
958976
else:
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
"""Deterministic data diff engine using altimate-core's reladiff state machine.
2+
3+
Orchestrates the cooperative Rust state machine: creates a session, loops
4+
start() → execute SQL → step() until Done or Error. All SQL execution goes
5+
through the existing ConnectionRegistry.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import json
11+
import logging
12+
from typing import Any
13+
14+
from altimate_engine.connections import ConnectionRegistry
15+
from altimate_engine.models import SqlExecuteParams
16+
from altimate_engine.sql.executor import execute_sql
17+
18+
logger = logging.getLogger(__name__)
19+
20+
try:
21+
import altimate_core
22+
23+
RELADIFF_AVAILABLE = True
24+
except ImportError:
25+
RELADIFF_AVAILABLE = False
26+
27+
# Map TableSide enum values to warehouse names
28+
_SIDE_MAP = {"Table1": "source", "Table2": "target"}
29+
30+
31+
def _resolve_dialect(warehouse_name: str) -> str:
32+
"""Infer SQL dialect from connection type."""
33+
try:
34+
conn = ConnectionRegistry.get(warehouse_name)
35+
conn_type = getattr(conn, "type", "").lower()
36+
dialect_map = {
37+
"snowflake": "snowflake",
38+
"duckdb": "duckdb",
39+
"postgres": "postgres",
40+
"postgresql": "postgres",
41+
"bigquery": "bigquery",
42+
"mysql": "mysql",
43+
"clickhouse": "clickhouse",
44+
"databricks": "databricks",
45+
"redshift": "redshift",
46+
}
47+
return dialect_map.get(conn_type, "generic")
48+
except Exception:
49+
return "generic"
50+
51+
52+
def _execute_task(task: dict, warehouse: str) -> dict:
53+
"""Execute a single SQL task against the given warehouse."""
54+
result = execute_sql(
55+
SqlExecuteParams(sql=task["sql"], warehouse=warehouse, limit=100_000)
56+
)
57+
58+
# Convert SqlExecuteResult rows to the format expected by ReladiffSession.step()
59+
rows: list[list[str | None]] = []
60+
for row in result.rows:
61+
rows.append([str(v) if v is not None else None for v in row])
62+
63+
return {"id": task["id"], "rows": rows}
64+
65+
66+
def run_data_diff(
67+
*,
68+
source_table: str,
69+
target_table: str,
70+
source_warehouse: str,
71+
target_warehouse: str | None = None,
72+
key_columns: list[str],
73+
extra_columns: list[str] | None = None,
74+
algorithm: str = "auto",
75+
where_clause: str | None = None,
76+
source_database: str | None = None,
77+
source_schema: str | None = None,
78+
target_database: str | None = None,
79+
target_schema: str | None = None,
80+
) -> dict[str, Any]:
81+
"""Run a deterministic data diff using the Rust reladiff engine.
82+
83+
Returns the complete validation result as a dict.
84+
"""
85+
if not RELADIFF_AVAILABLE:
86+
return {
87+
"success": False,
88+
"error": "altimate-core not installed. ReladiffSession unavailable.",
89+
}
90+
91+
target_warehouse = target_warehouse or source_warehouse
92+
93+
# Resolve dialects from connection types
94+
dialect1 = _resolve_dialect(source_warehouse)
95+
dialect2 = _resolve_dialect(target_warehouse)
96+
97+
# Build session spec
98+
table1: dict[str, Any] = {"table": source_table}
99+
if source_database:
100+
table1["database"] = source_database
101+
if source_schema:
102+
table1["schema"] = source_schema
103+
104+
table2: dict[str, Any] = {"table": target_table}
105+
if target_database:
106+
table2["database"] = target_database
107+
if target_schema:
108+
table2["schema"] = target_schema
109+
110+
spec = {
111+
"table1": table1,
112+
"table2": table2,
113+
"dialect1": dialect1,
114+
"dialect2": dialect2,
115+
"config": {
116+
"algorithm": algorithm,
117+
"key_columns": key_columns,
118+
"extra_columns": extra_columns or [],
119+
},
120+
}
121+
122+
if where_clause:
123+
spec["config"]["where_clause"] = where_clause
124+
125+
logger.info("Starting reladiff session: %s", json.dumps(spec, indent=2))
126+
127+
# Create session and run the state machine loop
128+
try:
129+
session = altimate_core.ReladiffSession(json.dumps(spec))
130+
except Exception as e:
131+
return {"success": False, "error": f"Failed to create session: {e}"}
132+
133+
# Map table sides to warehouses
134+
warehouse_map = {"Table1": source_warehouse, "Table2": target_warehouse}
135+
136+
action = session.start()
137+
step_count = 0
138+
max_steps = 100 # Safety limit
139+
140+
while step_count < max_steps:
141+
step_count += 1
142+
action_type = action.get("type")
143+
144+
if action_type == "Done":
145+
outcome = action.get("outcome", {})
146+
return {
147+
"success": True,
148+
"status": "completed",
149+
"steps": step_count,
150+
"outcome": outcome,
151+
}
152+
153+
if action_type == "Error":
154+
return {
155+
"success": False,
156+
"error": action.get("message", "Unknown engine error"),
157+
"steps": step_count,
158+
}
159+
160+
if action_type != "ExecuteSql":
161+
return {
162+
"success": False,
163+
"error": f"Unexpected action type: {action_type}",
164+
"steps": step_count,
165+
}
166+
167+
# Execute all SQL tasks
168+
tasks = action.get("tasks", [])
169+
responses = []
170+
171+
for task in tasks:
172+
side = task.get("table_side", "Table1")
173+
wh = warehouse_map.get(side, source_warehouse)
174+
175+
logger.info(
176+
"Step %d: Executing [%s] on %s: %s",
177+
step_count,
178+
side,
179+
wh,
180+
task["sql"][:120],
181+
)
182+
183+
try:
184+
resp = _execute_task(task, wh)
185+
responses.append(resp)
186+
except Exception as e:
187+
return {
188+
"success": False,
189+
"error": f"SQL execution failed on {wh}: {e}",
190+
"steps": step_count,
191+
"failed_sql": task["sql"],
192+
}
193+
194+
# Feed responses back to the engine
195+
action = session.step(json.dumps(responses))
196+
197+
return {
198+
"success": False,
199+
"error": f"State machine did not converge after {max_steps} steps",
200+
"steps": step_count,
201+
}

packages/opencode/src/agent/agent.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ export namespace Agent {
244244
altimate_core_validate: "allow", altimate_core_lint: "allow",
245245
altimate_core_safety: "allow", altimate_core_transpile: "allow",
246246
altimate_core_check: "allow",
247+
data_diff: "allow",
247248
read: "allow", write: "allow", edit: "allow",
248249
grep: "allow", glob: "allow", bash: "allow",
249250
question: "allow",

packages/opencode/src/altimate/bridge/protocol.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,31 @@ export interface AltimateCoreIsSafeParams {
876876
sql: string
877877
}
878878

879+
// --- data diff (reladiff) ---
880+
export interface DataDiffRunParams {
881+
source_table: string
882+
target_table: string
883+
source_warehouse: string
884+
target_warehouse?: string
885+
key_columns: string[]
886+
extra_columns?: string[]
887+
algorithm?: "auto" | "hashdiff" | "joindiff" | "profile" | "recon" | "cascade"
888+
where_clause?: string
889+
source_database?: string
890+
source_schema?: string
891+
target_database?: string
892+
target_schema?: string
893+
}
894+
895+
export interface DataDiffRunResult {
896+
success: boolean
897+
status?: string
898+
error?: string
899+
steps?: number
900+
outcome?: Record<string, unknown>
901+
failed_sql?: string
902+
}
903+
879904
// --- dbt Lineage ---
880905

881906
export interface DbtLineageParams {
@@ -1032,6 +1057,7 @@ export const BridgeMethods = {
10321057
"altimate_core.introspection_sql": {} as { params: AltimateCoreIntrospectionSqlParams; result: AltimateCoreResult },
10331058
"altimate_core.parse_dbt": {} as { params: AltimateCoreParseDbtParams; result: AltimateCoreResult },
10341059
"altimate_core.is_safe": {} as { params: AltimateCoreIsSafeParams; result: AltimateCoreResult },
1060+
"data_diff.run": {} as { params: DataDiffRunParams; result: DataDiffRunResult },
10351061
ping: {} as { params: Record<string, never>; result: { status: string } },
10361062
} as const
10371063

0 commit comments

Comments
 (0)