|
7 | 7 | * This file is the bridge between that engine and altimate-code's drivers. |
8 | 8 | */ |
9 | 9 |
|
10 | | -import type { DataDiffParams, DataDiffResult } from "../types" |
| 10 | +import type { DataDiffParams, DataDiffResult, PartitionDiffResult } from "../types" |
11 | 11 | import * as Registry from "./registry" |
12 | 12 |
|
13 | 13 | // --------------------------------------------------------------------------- |
@@ -119,7 +119,238 @@ async function executeQuery(sql: string, warehouseName: string | undefined): Pro |
119 | 119 |
|
120 | 120 | const MAX_STEPS = 200 |
121 | 121 |
|
| 122 | +// --------------------------------------------------------------------------- |
| 123 | +// Partition support |
| 124 | +// --------------------------------------------------------------------------- |
| 125 | + |
| 126 | +/** |
| 127 | + * Build a DATE_TRUNC expression appropriate for the warehouse dialect. |
| 128 | + */ |
| 129 | +function dateTruncExpr(granularity: string, column: string, dialect: string): string { |
| 130 | + const g = granularity.toLowerCase() |
| 131 | + switch (dialect) { |
| 132 | + case "bigquery": |
| 133 | + return `DATE_TRUNC(${column}, ${g.toUpperCase()})` |
| 134 | + case "clickhouse": |
| 135 | + return `toStartOf${g.charAt(0).toUpperCase() + g.slice(1)}(${column})` |
| 136 | + case "mysql": |
| 137 | + case "mariadb": { |
| 138 | + const fmt = { day: "%Y-%m-%d", week: "%Y-%u", month: "%Y-%m-01", year: "%Y-01-01" }[g] ?? "%Y-%m-01" |
| 139 | + return `DATE_FORMAT(${column}, '${fmt}')` |
| 140 | + } |
| 141 | + default: |
| 142 | + // Postgres, Snowflake, Redshift, DuckDB, etc. |
| 143 | + return `DATE_TRUNC('${g}', ${column})` |
| 144 | + } |
| 145 | +} |
| 146 | + |
| 147 | +/** |
| 148 | + * Build SQL to discover distinct partition values from the source table. |
| 149 | + */ |
| 150 | +function buildPartitionDiscoverySQL( |
| 151 | + table: string, |
| 152 | + partitionColumn: string, |
| 153 | + granularity: string | undefined, |
| 154 | + bucketSize: number | undefined, |
| 155 | + dialect: string, |
| 156 | + whereClause?: string, |
| 157 | +): string { |
| 158 | + const isNumeric = bucketSize != null |
| 159 | + |
| 160 | + let expr: string |
| 161 | + if (isNumeric) { |
| 162 | + expr = `FLOOR(${partitionColumn} / ${bucketSize}) * ${bucketSize}` |
| 163 | + } else { |
| 164 | + expr = dateTruncExpr(granularity ?? "month", partitionColumn, dialect) |
| 165 | + } |
| 166 | + |
| 167 | + const where = whereClause ? `WHERE ${whereClause}` : "" |
| 168 | + return `SELECT DISTINCT ${expr} AS _p FROM ${table} ${where} ORDER BY _p` |
| 169 | +} |
| 170 | + |
| 171 | +/** |
| 172 | + * Build a WHERE clause that scopes to a single partition. |
| 173 | + */ |
| 174 | +function buildPartitionWhereClause( |
| 175 | + partitionColumn: string, |
| 176 | + partitionValue: string, |
| 177 | + granularity: string | undefined, |
| 178 | + bucketSize: number | undefined, |
| 179 | + dialect: string, |
| 180 | +): string { |
| 181 | + if (bucketSize != null) { |
| 182 | + const lo = Number(partitionValue) |
| 183 | + const hi = lo + bucketSize |
| 184 | + return `${partitionColumn} >= ${lo} AND ${partitionColumn} < ${hi}` |
| 185 | + } |
| 186 | + |
| 187 | + const expr = dateTruncExpr(granularity ?? "month", partitionColumn, dialect) |
| 188 | + |
| 189 | + // Cast the literal appropriately per dialect |
| 190 | + switch (dialect) { |
| 191 | + case "bigquery": |
| 192 | + return `${expr} = '${partitionValue}'` |
| 193 | + case "clickhouse": |
| 194 | + return `${expr} = toDate('${partitionValue}')` |
| 195 | + case "mysql": |
| 196 | + case "mariadb": |
| 197 | + return `${expr} = '${partitionValue}'` |
| 198 | + default: |
| 199 | + return `${expr} = '${partitionValue}'` |
| 200 | + } |
| 201 | +} |
| 202 | + |
| 203 | +/** |
| 204 | + * Extract DiffStats from a successful outcome (if present). |
| 205 | + */ |
| 206 | +function extractStats(outcome: unknown): { |
| 207 | + rows_source: number |
| 208 | + rows_target: number |
| 209 | + differences: number |
| 210 | + status: "identical" | "differ" |
| 211 | +} { |
| 212 | + const o = outcome as any |
| 213 | + if (!o) return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } |
| 214 | + |
| 215 | + if (o.Match) { |
| 216 | + return { |
| 217 | + rows_source: o.Match.row_count ?? 0, |
| 218 | + rows_target: o.Match.row_count ?? 0, |
| 219 | + differences: 0, |
| 220 | + status: "identical", |
| 221 | + } |
| 222 | + } |
| 223 | + |
| 224 | + if (o.Diff) { |
| 225 | + const d = o.Diff |
| 226 | + return { |
| 227 | + rows_source: d.total_source_rows ?? 0, |
| 228 | + rows_target: d.total_target_rows ?? 0, |
| 229 | + differences: (d.rows_only_in_source ?? 0) + (d.rows_only_in_target ?? 0) + (d.rows_updated ?? 0), |
| 230 | + status: "differ", |
| 231 | + } |
| 232 | + } |
| 233 | + |
| 234 | + return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } |
| 235 | +} |
| 236 | + |
| 237 | +/** |
| 238 | + * Merge two Diff outcomes into one aggregated Diff outcome. |
| 239 | + */ |
| 240 | +function mergeOutcomes(accumulated: unknown, next: unknown): unknown { |
| 241 | + const a = accumulated as any |
| 242 | + const n = next as any |
| 243 | + |
| 244 | + const aD = a?.Diff ?? (a?.Match ? { total_source_rows: a.Match.row_count, total_target_rows: a.Match.row_count, rows_only_in_source: 0, rows_only_in_target: 0, rows_updated: 0, rows_identical: a.Match.row_count, sample_diffs: [] } : null) |
| 245 | + const nD = n?.Diff ?? (n?.Match ? { total_source_rows: n.Match.row_count, total_target_rows: n.Match.row_count, rows_only_in_source: 0, rows_only_in_target: 0, rows_updated: 0, rows_identical: n.Match.row_count, sample_diffs: [] } : null) |
| 246 | + |
| 247 | + if (!aD && !nD) return { Match: { row_count: 0 } } |
| 248 | + if (!aD) return next |
| 249 | + if (!nD) return accumulated |
| 250 | + |
| 251 | + const merged = { |
| 252 | + total_source_rows: (aD.total_source_rows ?? 0) + (nD.total_source_rows ?? 0), |
| 253 | + total_target_rows: (aD.total_target_rows ?? 0) + (nD.total_target_rows ?? 0), |
| 254 | + rows_only_in_source: (aD.rows_only_in_source ?? 0) + (nD.rows_only_in_source ?? 0), |
| 255 | + rows_only_in_target: (aD.rows_only_in_target ?? 0) + (nD.rows_only_in_target ?? 0), |
| 256 | + rows_updated: (aD.rows_updated ?? 0) + (nD.rows_updated ?? 0), |
| 257 | + rows_identical: (aD.rows_identical ?? 0) + (nD.rows_identical ?? 0), |
| 258 | + sample_diffs: [...(aD.sample_diffs ?? []), ...(nD.sample_diffs ?? [])].slice(0, 20), |
| 259 | + } |
| 260 | + |
| 261 | + const totalDiff = merged.rows_only_in_source + merged.rows_only_in_target + merged.rows_updated |
| 262 | + if (totalDiff === 0) { |
| 263 | + return { Match: { row_count: merged.total_source_rows, algorithm: "partitioned" } } |
| 264 | + } |
| 265 | + return { Diff: merged } |
| 266 | +} |
| 267 | + |
| 268 | +/** |
| 269 | + * Run a partitioned diff: discover partition values, diff each partition independently, |
| 270 | + * then aggregate results. |
| 271 | + */ |
| 272 | +async function runPartitionedDiff(params: DataDiffParams): Promise<DataDiffResult> { |
| 273 | + const resolveDialect = (warehouse: string | undefined): string => { |
| 274 | + if (warehouse) { |
| 275 | + const cfg = Registry.getConfig(warehouse) |
| 276 | + return cfg?.type ?? "generic" |
| 277 | + } |
| 278 | + const warehouses = Registry.list().warehouses |
| 279 | + return warehouses[0]?.type ?? "generic" |
| 280 | + } |
| 281 | + |
| 282 | + const sourceDialect = resolveDialect(params.source_warehouse) |
| 283 | + const { table1Name } = resolveTableSources(params.source, params.target) |
| 284 | + |
| 285 | + // Discover partition values from source |
| 286 | + const discoverySql = buildPartitionDiscoverySQL( |
| 287 | + table1Name, |
| 288 | + params.partition_column!, |
| 289 | + params.partition_granularity, |
| 290 | + params.partition_bucket_size, |
| 291 | + sourceDialect, |
| 292 | + params.where_clause, |
| 293 | + ) |
| 294 | + |
| 295 | + let partitionValues: string[] |
| 296 | + try { |
| 297 | + const rows = await executeQuery(discoverySql, params.source_warehouse) |
| 298 | + partitionValues = rows.map((r) => String(r[0] ?? "")).filter(Boolean) |
| 299 | + } catch (e) { |
| 300 | + return { success: false, error: `Partition discovery failed: ${e}`, steps: 0 } |
| 301 | + } |
| 302 | + |
| 303 | + if (partitionValues.length === 0) { |
| 304 | + return { success: true, steps: 1, outcome: { Match: { row_count: 0, algorithm: "partitioned" } }, partition_results: [] } |
| 305 | + } |
| 306 | + |
| 307 | + // Diff each partition |
| 308 | + const partitionResults: PartitionDiffResult[] = [] |
| 309 | + let aggregatedOutcome: unknown = null |
| 310 | + let totalSteps = 1 |
| 311 | + |
| 312 | + for (const pVal of partitionValues) { |
| 313 | + const partWhere = buildPartitionWhereClause( |
| 314 | + params.partition_column!, |
| 315 | + pVal, |
| 316 | + params.partition_granularity, |
| 317 | + params.partition_bucket_size, |
| 318 | + sourceDialect, |
| 319 | + ) |
| 320 | + const fullWhere = params.where_clause ? `(${params.where_clause}) AND (${partWhere})` : partWhere |
| 321 | + |
| 322 | + const result = await runDataDiff({ |
| 323 | + ...params, |
| 324 | + where_clause: fullWhere, |
| 325 | + partition_column: undefined, // prevent recursion |
| 326 | + }) |
| 327 | + |
| 328 | + totalSteps += result.steps |
| 329 | + |
| 330 | + if (!result.success) { |
| 331 | + partitionResults.push({ partition: pVal, rows_source: 0, rows_target: 0, differences: 0, status: "error", error: result.error }) |
| 332 | + continue |
| 333 | + } |
| 334 | + |
| 335 | + const stats = extractStats(result.outcome) |
| 336 | + partitionResults.push({ partition: pVal, ...stats }) |
| 337 | + aggregatedOutcome = aggregatedOutcome == null ? result.outcome : mergeOutcomes(aggregatedOutcome, result.outcome) |
| 338 | + } |
| 339 | + |
| 340 | + return { |
| 341 | + success: true, |
| 342 | + steps: totalSteps, |
| 343 | + outcome: aggregatedOutcome ?? { Match: { row_count: 0, algorithm: "partitioned" } }, |
| 344 | + partition_results: partitionResults, |
| 345 | + } |
| 346 | +} |
| 347 | + |
122 | 348 | export async function runDataDiff(params: DataDiffParams): Promise<DataDiffResult> { |
| 349 | + // Dispatch to partitioned diff if partition_column is set |
| 350 | + if (params.partition_column) { |
| 351 | + return runPartitionedDiff(params) |
| 352 | + } |
| 353 | + |
123 | 354 | // Dynamically import NAPI module (not available in test environments without the binary) |
124 | 355 | let DataParitySession: new (specJson: string) => { |
125 | 356 | start(): string |
|
0 commit comments