Skip to content

Commit 63769f4

Browse files
suryaiyer95claude
andcommitted
fix: side-aware CTE injection for cross-warehouse data_diff SQL-query mode
When `source` and `target` are both SQL queries, `resolveTableSources` wraps them as `__diff_source` / `__diff_target` CTEs and the executor prepends the combined `WITH …` block to every engine-emitted task. T-SQL and Fabric parse-bind every CTE body even when unreferenced, so a task routed to the source warehouse failed to resolve the target-only base table referenced inside the unused `__diff_target` CTE (and vice versa), producing `Invalid object name` errors from the wrong warehouse. Return side-specific prefixes from `resolveTableSources` alongside the combined one, and have the executor loop in `runDataDiff` pick the source or target prefix per task when `source_warehouse !== target_warehouse`. Same-warehouse behaviour is unchanged. Adds `data-diff-cte.test.ts` covering plain-name passthrough, both-query wrapping, side-specific CTE isolation, and CTE merging with engine-emitted `WITH` clauses (10 tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 173d32f commit 63769f4

2 files changed

Lines changed: 179 additions & 11 deletions

File tree

packages/opencode/src/altimate/native/connections/data-diff.ts

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,41 @@ function isQuery(input: string): boolean {
5353
* If either source or target is an arbitrary query, wrap them in CTEs so the
5454
* DataParity engine can treat them as tables named `__diff_source` / `__diff_target`.
5555
*
56-
* Returns `{ table1Name, table2Name, ctePrefix | null }`.
56+
* Returns both a combined prefix (used for same-warehouse tasks where a JOIN
57+
* might reference both CTEs) and side-specific prefixes (used for cross-warehouse
58+
* tasks where each warehouse only has access to its own base tables).
5759
*
58-
* When a CTE prefix is returned, it must be prepended to every SQL task emitted
59-
* by the engine before execution.
60+
* **Why side-specific prefixes matter:** T-SQL / Fabric parse-bind every CTE body
61+
* at parse time, even unreferenced ones. Sending a combined `WITH __diff_source
62+
* AS (... FROM mssql_only_table), __diff_target AS (... FROM fabric_only_table)`
63+
* to MSSQL fails because MSSQL can't resolve the Fabric-only table referenced in
64+
* the unused `__diff_target` CTE.
65+
*
66+
* Callers must prepend the appropriate prefix to every SQL task emitted by the
67+
* engine before execution.
6068
*/
6169
export function resolveTableSources(
6270
source: string,
6371
target: string,
64-
): { table1Name: string; table2Name: string; ctePrefix: string | null } {
72+
): {
73+
table1Name: string
74+
table2Name: string
75+
ctePrefix: string | null
76+
sourceCtePrefix: string | null
77+
targetCtePrefix: string | null
78+
} {
6579
const source_is_query = isQuery(source)
6680
const target_is_query = isQuery(target)
6781

6882
if (!source_is_query && !target_is_query) {
6983
// Both are plain table names — pass through unchanged
70-
return { table1Name: source, table2Name: target, ctePrefix: null }
84+
return {
85+
table1Name: source,
86+
table2Name: target,
87+
ctePrefix: null,
88+
sourceCtePrefix: null,
89+
targetCtePrefix: null,
90+
}
7191
}
7292

7393
// At least one is a query — wrap both in CTEs
@@ -81,11 +101,15 @@ export function resolveTableSources(
81101
const srcExpr = source_is_query ? source : `SELECT * FROM ${quoteIdent(source)}`
82102
const tgtExpr = target_is_query ? target : `SELECT * FROM ${quoteIdent(target)}`
83103

104+
const sourceCtePrefix = `WITH __diff_source AS (\n${srcExpr}\n)`
105+
const targetCtePrefix = `WITH __diff_target AS (\n${tgtExpr}\n)`
84106
const ctePrefix = `WITH __diff_source AS (\n${srcExpr}\n), __diff_target AS (\n${tgtExpr}\n)`
85107
return {
86108
table1Name: "__diff_source",
87109
table2Name: "__diff_target",
88110
ctePrefix,
111+
sourceCtePrefix,
112+
targetCtePrefix,
89113
}
90114
}
91115

@@ -784,10 +808,15 @@ export async function runDataDiff(params: DataDiffParams): Promise<DataDiffResul
784808
}
785809

786810
// Resolve sources (plain table names vs arbitrary queries)
787-
const { table1Name, table2Name, ctePrefix } = resolveTableSources(
788-
params.source,
789-
params.target,
790-
)
811+
const { table1Name, table2Name, ctePrefix, sourceCtePrefix, targetCtePrefix } =
812+
resolveTableSources(params.source, params.target)
813+
814+
// Cross-warehouse mode requires side-specific CTE injection: T-SQL / Fabric
815+
// parse-bind every CTE body even when unreferenced, so sending the combined
816+
// prefix to a warehouse that lacks the other side's base table fails at parse.
817+
const sourceWarehouse = params.source_warehouse
818+
const targetWarehouse = params.target_warehouse ?? params.source_warehouse
819+
const crossWarehouse = sourceWarehouse !== targetWarehouse
791820

792821
// Parse optional qualified names: "db.schema.table" → { database, schema, table }
793822
const parseQualified = (name: string) => {
@@ -911,8 +940,18 @@ export async function runDataDiff(params: DataDiffParams): Promise<DataDiffResul
911940
const taskResults = await Promise.all(
912941
tasks.map(async (task) => {
913942
const warehouse = warehouseFor(task.table_side)
914-
// Inject CTE definitions if we're in query-comparison mode
915-
const sql = ctePrefix ? injectCte(task.sql, ctePrefix) : task.sql
943+
// Inject CTE definitions if we're in query-comparison mode. In
944+
// cross-warehouse mode each task only gets the CTE for its own side —
945+
// the other side's base tables aren't bindable on this warehouse.
946+
let prefix: string | null = null
947+
if (ctePrefix) {
948+
if (crossWarehouse) {
949+
prefix = task.table_side === "Table2" ? targetCtePrefix : sourceCtePrefix
950+
} else {
951+
prefix = ctePrefix
952+
}
953+
}
954+
const sql = prefix ? injectCte(task.sql, prefix) : task.sql
916955
try {
917956
const rows = await executeQuery(sql, warehouse)
918957
return { id: task.id, rows, error: null }
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Tests for CTE wrapping and injection in SQL-query mode.
3+
*
4+
* The tricky case is cross-warehouse comparison where source and target are both
5+
* SQL queries referencing tables that only exist on their own side. The combined
6+
* CTE prefix cannot be sent to both warehouses because T-SQL / Fabric parse-bind
7+
* every CTE body even when unreferenced — the "other side" CTE would fail to
8+
* resolve its base table.
9+
*/
10+
import { describe, test, expect } from "bun:test"
11+
12+
import { resolveTableSources, injectCte } from "../../src/altimate/native/connections/data-diff"
13+
14+
describe("resolveTableSources", () => {
15+
test("plain table names pass through without wrapping", () => {
16+
const r = resolveTableSources("orders", "orders_v2")
17+
expect(r.table1Name).toBe("orders")
18+
expect(r.table2Name).toBe("orders_v2")
19+
expect(r.ctePrefix).toBeNull()
20+
expect(r.sourceCtePrefix).toBeNull()
21+
expect(r.targetCtePrefix).toBeNull()
22+
})
23+
24+
test("schema-qualified plain names pass through", () => {
25+
const r = resolveTableSources("gold.dim_customer", "TRANSFORMED.DimCustomer")
26+
expect(r.table1Name).toBe("gold.dim_customer")
27+
expect(r.table2Name).toBe("TRANSFORMED.DimCustomer")
28+
expect(r.ctePrefix).toBeNull()
29+
})
30+
31+
test("both queries are wrapped in CTEs with aliases", () => {
32+
const r = resolveTableSources(
33+
"SELECT id, val FROM [TRANSFORMED].[DimCustomer]",
34+
"SELECT id, val FROM [gold].[dim_customer]",
35+
)
36+
expect(r.table1Name).toBe("__diff_source")
37+
expect(r.table2Name).toBe("__diff_target")
38+
expect(r.ctePrefix).toContain("__diff_source AS (")
39+
expect(r.ctePrefix).toContain("__diff_target AS (")
40+
expect(r.ctePrefix).toContain("[TRANSFORMED].[DimCustomer]")
41+
expect(r.ctePrefix).toContain("[gold].[dim_customer]")
42+
})
43+
44+
test("side-specific prefixes contain only the relevant CTE", () => {
45+
const r = resolveTableSources(
46+
"SELECT id FROM [TRANSFORMED].[DimCustomer]",
47+
"SELECT id FROM [gold].[dim_customer]",
48+
)
49+
// Source prefix has source table only — must not leak target table ref
50+
expect(r.sourceCtePrefix).toContain("__diff_source AS (")
51+
expect(r.sourceCtePrefix).toContain("[TRANSFORMED].[DimCustomer]")
52+
expect(r.sourceCtePrefix).not.toContain("__diff_target")
53+
expect(r.sourceCtePrefix).not.toContain("[gold].[dim_customer]")
54+
55+
// Target prefix has target table only — must not leak source table ref
56+
expect(r.targetCtePrefix).toContain("__diff_target AS (")
57+
expect(r.targetCtePrefix).toContain("[gold].[dim_customer]")
58+
expect(r.targetCtePrefix).not.toContain("__diff_source")
59+
expect(r.targetCtePrefix).not.toContain("[TRANSFORMED].[DimCustomer]")
60+
})
61+
62+
test("mixed: plain source + query target still wraps both sides", () => {
63+
const r = resolveTableSources(
64+
"orders",
65+
"SELECT * FROM other.orders WHERE region = 'EU'",
66+
)
67+
expect(r.table1Name).toBe("__diff_source")
68+
expect(r.table2Name).toBe("__diff_target")
69+
// Plain table wrapped with ANSI double-quoted identifiers
70+
expect(r.sourceCtePrefix).toContain('SELECT * FROM "orders"')
71+
expect(r.targetCtePrefix).toContain("other.orders")
72+
})
73+
74+
test("query detection requires both keyword AND whitespace", () => {
75+
// A table literally named "select" should NOT be treated as a query
76+
const r = resolveTableSources("select", "with")
77+
expect(r.table1Name).toBe("select")
78+
expect(r.table2Name).toBe("with")
79+
expect(r.ctePrefix).toBeNull()
80+
})
81+
})
82+
83+
describe("injectCte", () => {
84+
test("prepends CTE prefix to a plain SELECT", () => {
85+
const prefix = "WITH __diff_source AS (\nSELECT 1 AS id\n)"
86+
const sql = "SELECT COUNT(*) FROM __diff_source"
87+
const out = injectCte(sql, prefix)
88+
expect(out.startsWith(prefix)).toBe(true)
89+
expect(out).toContain("SELECT COUNT(*) FROM __diff_source")
90+
})
91+
92+
test("merges with an engine-emitted WITH clause", () => {
93+
const prefix = "WITH __diff_source AS (\nSELECT * FROM base\n)"
94+
const engineSql = "WITH engine_cte AS (SELECT id FROM __diff_source) SELECT * FROM engine_cte"
95+
const out = injectCte(engineSql, prefix)
96+
// Must start with a single WITH, with our CTE first, then engine's
97+
expect(out.match(/^WITH /)).not.toBeNull()
98+
expect((out.match(/\bWITH\b/g) ?? []).length).toBe(1)
99+
expect(out.indexOf("__diff_source AS")).toBeLessThan(out.indexOf("engine_cte AS"))
100+
})
101+
102+
test("side-specific injection: source prefix does not leak target refs", () => {
103+
// Simulates cross-warehouse fp1_1 task going to MSSQL. It must not see any
104+
// reference to the Fabric-only target table, since MSSQL parse-binds every
105+
// CTE body.
106+
const r = resolveTableSources(
107+
"SELECT id FROM [TRANSFORMED].[DimCustomer]",
108+
"SELECT id FROM [gold].[dim_customer]",
109+
)
110+
const engineFp1Sql =
111+
"SELECT COUNT(*), SUM(CAST(...HASHBYTES('MD5', CONCAT(CAST([id] AS NVARCHAR(MAX))))...)) FROM [__diff_source]"
112+
const sqlForMssql = injectCte(engineFp1Sql, r.sourceCtePrefix!)
113+
expect(sqlForMssql).toContain("[TRANSFORMED].[DimCustomer]")
114+
expect(sqlForMssql).not.toContain("[gold].[dim_customer]")
115+
expect(sqlForMssql).not.toContain("__diff_target")
116+
})
117+
118+
test("side-specific injection: target prefix does not leak source refs", () => {
119+
const r = resolveTableSources(
120+
"SELECT id FROM [TRANSFORMED].[DimCustomer]",
121+
"SELECT id FROM [gold].[dim_customer]",
122+
)
123+
const engineFp2Sql = "SELECT COUNT(*) FROM [__diff_target]"
124+
const sqlForFabric = injectCte(engineFp2Sql, r.targetCtePrefix!)
125+
expect(sqlForFabric).toContain("[gold].[dim_customer]")
126+
expect(sqlForFabric).not.toContain("[TRANSFORMED].[DimCustomer]")
127+
expect(sqlForFabric).not.toContain("__diff_source")
128+
})
129+
})

0 commit comments

Comments
 (0)