-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathaltimate-core-column-lineage.ts
More file actions
61 lines (56 loc) · 2.66 KB
/
altimate-core-column-lineage.ts
File metadata and controls
61 lines (56 loc) · 2.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import z from "zod"
import { Tool } from "../../tool/tool"
import { Dispatcher } from "../native"
export const AltimateCoreColumnLineageTool = Tool.define("altimate_core_column_lineage", {
description:
"Trace schema-aware column lineage. Maps how columns flow through a query from source tables to output. Requires altimate_core.init() with API key. Provide schema_context or schema_path for accurate table/column resolution.",
parameters: z.object({
sql: z.string().describe("SQL query to trace lineage for"),
dialect: z.string().optional().describe("SQL dialect (e.g. snowflake, bigquery)"),
schema_path: z.string().optional().describe("Path to YAML/JSON schema file"),
schema_context: z.record(z.string(), z.any()).optional().describe("Inline schema definition"),
}),
async execute(args, ctx) {
try {
const result = await Dispatcher.call("altimate_core.column_lineage", {
sql: args.sql,
dialect: args.dialect ?? "",
schema_path: args.schema_path ?? "",
schema_context: args.schema_context,
})
const data = (result.data ?? {}) as Record<string, any>
const edgeCount = data.column_lineage?.length ?? 0
const error = result.error ?? data.error
return {
title: `Column Lineage: ${edgeCount} edge(s)`,
metadata: { success: result.success, edge_count: edgeCount, ...(error && { error }) },
output: formatColumnLineage(data),
}
} catch (e) {
const msg = e instanceof Error ? e.message : String(e)
return { title: "Column Lineage: ERROR", metadata: { success: false, edge_count: 0, error: msg }, output: `Failed: ${msg}` }
}
},
})
function formatColumnLineage(data: Record<string, any>): string {
if (data.error) return `Error: ${data.error}`
if (!data.column_lineage?.length && !data.column_dict) return "No column lineage edges found."
const lines: string[] = []
// column_dict: output columns -> source columns mapping
if (data.column_dict && Object.keys(data.column_dict).length > 0) {
lines.push("Column Mappings:")
for (const [target, sources] of Object.entries(data.column_dict)) {
const srcList = Array.isArray(sources) ? (sources as string[]).join(", ") : JSON.stringify(sources)
lines.push(` ${target} ← ${srcList}`)
}
lines.push("")
}
if (data.column_lineage?.length) {
lines.push("Lineage Edges:")
for (const edge of data.column_lineage) {
const transform = edge.lens_type ?? edge.transform_type ?? edge.transform ?? ""
lines.push(` ${edge.source} → ${edge.target}${transform ? ` (${transform})` : ""}`)
}
}
return lines.length ? lines.join("\n") : "No column lineage edges found."
}