Skip to content

Commit 822a92c

Browse files
authored
Merge branch 'main' into feat/databricks-provider
2 parents 4fd436c + ba9882f commit 822a92c

5 files changed

Lines changed: 329 additions & 1 deletion

File tree

packages/opencode/src/altimate/telemetry/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,28 @@ export namespace Telemetry {
637637
total_cost: number
638638
}
639639
// altimate_change end
640+
// altimate_change start — pre-execution SQL validation telemetry
641+
| {
642+
type: "sql_pre_validation"
643+
timestamp: number
644+
session_id: string
645+
/** skipped = no cache or stale, passed = valid SQL, blocked = invalid SQL caught, error = validation itself failed */
646+
outcome: "skipped" | "passed" | "blocked" | "error"
647+
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, dispatcher_failed, validation_exception */
648+
reason: string
649+
/** warehouse driver type (postgres, snowflake, bigquery, ...) — enables per-warehouse catch-rate analysis */
650+
warehouse_type: string
651+
/** read / write / unknown — enables per-query-type analysis */
652+
query_type: string
653+
/** SHA-256 prefix of masked SQL — join key to sql_execute_failure events for same query */
654+
masked_sql_hash: string
655+
schema_columns: number
656+
/** true when schema scan hit the column-scan cap — flags samples biased by large-warehouse truncation */
657+
schema_truncated: boolean
658+
duration_ms: number
659+
error_message?: string
660+
}
661+
// altimate_change end
640662

641663
/** SHA256 hash a masked error message for anonymous grouping. */
642664
export function hashError(maskedMessage: string): string {

packages/opencode/src/altimate/tools/sql-execute.ts

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import { Telemetry } from "../telemetry"
99
// altimate_change start — progressive disclosure suggestions
1010
import { PostConnectSuggestions } from "./post-connect-suggestions"
1111
// altimate_change end
12+
// altimate_change start — pre-execution SQL validation via cached schema
13+
import { getCache } from "../native/schema/cache"
14+
import * as Registry from "../native/connections/registry"
15+
// altimate_change end
1216

1317
export const SqlExecuteTool = Tool.define("sql_execute", {
1418
description: "Execute SQL against a connected data warehouse. Returns results as a formatted table.",
@@ -34,6 +38,14 @@ export const SqlExecuteTool = Tool.define("sql_execute", {
3438
}
3539
// altimate_change end
3640

41+
// altimate_change start — shadow-mode pre-execution SQL validation
42+
// Runs validation against cached schema and emits sql_pre_validation telemetry,
43+
// but does NOT block execution. Used to measure catch rate before deciding
44+
// whether to enable blocking in a future release. Fire-and-forget so it
45+
// doesn't add latency to the sql_execute hot path.
46+
preValidateSql(args.query, args.warehouse, queryType).catch(() => {})
47+
// altimate_change end
48+
3749
try {
3850
const result = await Dispatcher.call("sql.execute", {
3951
sql: args.query,
@@ -91,6 +103,184 @@ export const SqlExecuteTool = Tool.define("sql_execute", {
91103
},
92104
})
93105

106+
// altimate_change start — pre-execution SQL validation via cached schema
107+
const CACHE_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours
108+
// High ceiling so large warehouses aren't arbitrarily truncated; we emit
109+
// schema_truncated in telemetry when the cap is reached so the shadow sample
110+
// can be interpreted correctly.
111+
const COLUMN_SCAN_LIMIT = 500_000
112+
113+
interface PreValidationResult {
114+
blocked: boolean
115+
error?: string
116+
}
117+
118+
async function preValidateSql(sql: string, warehouse: string | undefined, queryType: string): Promise<PreValidationResult> {
119+
const startTime = Date.now()
120+
// Yield the event loop before heavy synchronous SQLite work so concurrent
121+
// tasks aren't blocked. Bun's sqlite API is sync and listColumns can touch
122+
// hundreds of thousands of rows for large warehouses.
123+
await new Promise<void>((resolve) => setImmediate(resolve))
124+
125+
// Precompute correlation fields used in every telemetry event this function emits.
126+
const maskedSqlHash = Telemetry.hashError(Telemetry.maskString(sql))
127+
128+
try {
129+
// Resolve the warehouse the same way sql.execute's fallback path does:
130+
// when caller omits `warehouse`, sql.execute uses Registry.list()[0].
131+
// Matching that here keeps the shadow validation aligned with actual
132+
// execution (dbt-routed queries are a known gap — they short-circuit
133+
// before this fallback, so validation may use a different warehouse
134+
// than the one dbt selects).
135+
const registered = Registry.list().warehouses
136+
let warehouseName = warehouse
137+
if (!warehouseName) {
138+
warehouseName = registered[0]?.name
139+
}
140+
const warehouseInfo = registered.find((w) => w.name === warehouseName)
141+
const warehouseType = warehouseInfo?.type ?? "unknown"
142+
143+
const ctx: TrackCtx = {
144+
warehouse_type: warehouseType,
145+
query_type: queryType,
146+
masked_sql_hash: maskedSqlHash,
147+
}
148+
149+
if (!warehouseName) {
150+
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
151+
return { blocked: false }
152+
}
153+
154+
const cache = await getCache()
155+
const status = cache.cacheStatus()
156+
157+
const warehouseStatus = status.warehouses.find((w) => w.name === warehouseName)
158+
if (!warehouseStatus?.last_indexed) {
159+
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
160+
return { blocked: false }
161+
}
162+
163+
// Check cache freshness
164+
const cacheAge = Date.now() - new Date(warehouseStatus.last_indexed).getTime()
165+
if (cacheAge > CACHE_TTL_MS) {
166+
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false, ctx)
167+
return { blocked: false }
168+
}
169+
170+
// Build schema context from cached columns
171+
const columns = cache.listColumns(warehouseName, COLUMN_SCAN_LIMIT)
172+
const schemaTruncated = columns.length >= COLUMN_SCAN_LIMIT
173+
if (columns.length === 0) {
174+
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false, ctx)
175+
return { blocked: false }
176+
}
177+
178+
// Build schema context keyed by fully-qualified name (database.schema.table)
179+
// so multi-database warehouses don't collide on schema+table alone.
180+
// Dedupe columns per table to defend against residual collisions.
181+
const schemaContext: Record<string, { name: string; type: string; nullable: boolean }[]> = {}
182+
const seenColumns: Record<string, Set<string>> = {}
183+
for (const col of columns) {
184+
const tableName = [col.database, col.schema_name, col.table].filter(Boolean).join(".")
185+
if (!tableName) continue
186+
if (!schemaContext[tableName]) {
187+
schemaContext[tableName] = []
188+
seenColumns[tableName] = new Set()
189+
}
190+
if (seenColumns[tableName].has(col.name)) continue
191+
seenColumns[tableName].add(col.name)
192+
schemaContext[tableName].push({
193+
name: col.name,
194+
type: col.data_type || "VARCHAR",
195+
nullable: col.nullable,
196+
})
197+
}
198+
199+
// Validate SQL against cached schema
200+
const validationResult = await Dispatcher.call("altimate_core.validate", {
201+
sql,
202+
schema_path: "",
203+
schema_context: schemaContext,
204+
})
205+
206+
// If the dispatcher itself failed, don't treat missing data as "valid".
207+
if (!validationResult.success) {
208+
const errMsg = typeof validationResult.error === "string" ? validationResult.error : undefined
209+
trackPreValidation("error", "dispatcher_failed", 0, Date.now() - startTime, false, ctx, errMsg)
210+
return { blocked: false }
211+
}
212+
213+
const data = (validationResult.data ?? {}) as Record<string, any>
214+
const errors = Array.isArray(data.errors) ? data.errors : []
215+
const isValid = data.valid !== false && errors.length === 0
216+
217+
if (isValid) {
218+
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated, ctx)
219+
return { blocked: false }
220+
}
221+
222+
// Only block on high-confidence structural errors
223+
const structuralErrors = errors.filter((e: any) => {
224+
const msg = (e.message ?? "").toLowerCase()
225+
return /\b(column|table|view|relation|identifier|not found|does not exist)\b/.test(msg)
226+
})
227+
228+
if (structuralErrors.length === 0) {
229+
// Non-structural errors (ambiguous cases) — let them through
230+
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated, ctx)
231+
return { blocked: false }
232+
}
233+
234+
const errorMsgs = structuralErrors.map((e: any) => e.message).join("\n")
235+
trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, ctx, errorMsgs)
236+
// Shadow mode: caller discards the result. When blocking is enabled in the
237+
// future, build errorOutput here with the structural errors and
238+
// schemaContext keys for user-facing guidance.
239+
return { blocked: false }
240+
} catch {
241+
// Validation failure should never block execution
242+
const ctx: TrackCtx = { warehouse_type: "unknown", query_type: queryType, masked_sql_hash: maskedSqlHash }
243+
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false, ctx)
244+
return { blocked: false }
245+
}
246+
}
247+
248+
interface TrackCtx {
249+
warehouse_type: string
250+
query_type: string
251+
masked_sql_hash: string
252+
}
253+
254+
function trackPreValidation(
255+
outcome: "skipped" | "passed" | "blocked" | "error",
256+
reason: string,
257+
schema_columns: number,
258+
duration_ms: number,
259+
schema_truncated: boolean,
260+
ctx: TrackCtx,
261+
error_message?: string,
262+
) {
263+
// Mask schema identifiers (table / column names, paths, user IDs) from the
264+
// validator error BEFORE it leaves the process — these are PII-adjacent and
265+
// must not land in App Insights as raw strings.
266+
const masked = error_message ? Telemetry.maskString(error_message).slice(0, 500) : undefined
267+
Telemetry.track({
268+
type: "sql_pre_validation",
269+
timestamp: Date.now(),
270+
session_id: Telemetry.getContext().sessionId,
271+
outcome,
272+
reason,
273+
warehouse_type: ctx.warehouse_type,
274+
query_type: ctx.query_type,
275+
masked_sql_hash: ctx.masked_sql_hash,
276+
schema_columns,
277+
schema_truncated,
278+
duration_ms,
279+
...(masked && { error_message: masked }),
280+
})
281+
}
282+
// altimate_change end
283+
94284
function formatResult(result: SqlExecuteResult): string {
95285
if (result.row_count === 0) return "(0 rows)"
96286

packages/opencode/test/altimate/connections.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,36 @@ describe("ConnectionRegistry", () => {
5151
)
5252
})
5353

54+
test("cassandra gives helpful hint instead of generic unsupported error", async () => {
55+
Registry.setConfigs({
56+
mydb: { type: "cassandra", host: "localhost" },
57+
})
58+
await expect(Registry.get("mydb")).rejects.toThrow("not yet supported")
59+
await expect(Registry.get("mydb")).rejects.toThrow("cqlsh")
60+
})
61+
62+
test("cockroachdb suggests using postgres type", async () => {
63+
Registry.setConfigs({
64+
mydb: { type: "cockroachdb", host: "localhost" },
65+
})
66+
await expect(Registry.get("mydb")).rejects.toThrow("postgres")
67+
})
68+
69+
test("timescaledb suggests using postgres type", async () => {
70+
Registry.setConfigs({
71+
mydb: { type: "timescaledb", host: "localhost" },
72+
})
73+
await expect(Registry.get("mydb")).rejects.toThrow("postgres")
74+
})
75+
76+
test("truly unknown type gives generic unsupported error with supported list", async () => {
77+
Registry.setConfigs({
78+
mydb: { type: "neo4j", host: "localhost" },
79+
})
80+
await expect(Registry.get("mydb")).rejects.toThrow("Unsupported database type")
81+
await expect(Registry.get("mydb")).rejects.toThrow("Supported:")
82+
})
83+
5484
test("getConfig returns config for known connection", () => {
5585
Registry.setConfigs({
5686
mydb: { type: "postgres", host: "localhost" },
@@ -608,6 +638,44 @@ trino_project:
608638
fs.rmSync(tmpDir, { recursive: true })
609639
}
610640
})
641+
642+
test("clickhouse adapter maps correctly from dbt profiles", async () => {
643+
const fs = await import("fs")
644+
const os = await import("os")
645+
const path = await import("path")
646+
647+
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "dbt-test-"))
648+
const profilesPath = path.join(tmpDir, "profiles.yml")
649+
650+
fs.writeFileSync(
651+
profilesPath,
652+
`
653+
ch_project:
654+
outputs:
655+
dev:
656+
type: clickhouse
657+
host: clickhouse.example.com
658+
port: 8443
659+
user: default
660+
password: secret
661+
database: analytics
662+
schema: default
663+
`,
664+
)
665+
666+
try {
667+
const connections = await parseDbtProfiles(profilesPath)
668+
expect(connections).toHaveLength(1)
669+
expect(connections[0].type).toBe("clickhouse")
670+
expect(connections[0].config.type).toBe("clickhouse")
671+
expect(connections[0].config.host).toBe("clickhouse.example.com")
672+
expect(connections[0].config.port).toBe(8443)
673+
expect(connections[0].config.user).toBe("default")
674+
expect(connections[0].config.database).toBe("analytics")
675+
} finally {
676+
fs.rmSync(tmpDir, { recursive: true })
677+
}
678+
})
611679
})
612680

613681
// ---------------------------------------------------------------------------

packages/opencode/test/altimate/schema-finops-dbt.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,53 @@ describe("FinOps: SQL template generation", () => {
158158
const built = HistoryTemplates.buildHistoryQuery("databricks", 7, 50)
159159
expect(built?.sql).toContain("system.query.history")
160160
})
161+
162+
test("builds ClickHouse history SQL with clamped integer days and limit", () => {
163+
const built = HistoryTemplates.buildHistoryQuery("clickhouse", 7, 100)
164+
expect(built).not.toBeNull()
165+
expect(built?.sql).toContain("system.query_log")
166+
expect(built?.sql).toContain("QueryFinish")
167+
// Days and limit should be integer-substituted, not bind params
168+
expect(built?.binds).toEqual([])
169+
// Verify the clamped values are in the SQL
170+
expect(built?.sql).toContain("today() - 7")
171+
expect(built?.sql).toContain("LIMIT 100")
172+
})
173+
174+
test("ClickHouse buildHistoryQuery clamps extreme days and limit values", () => {
175+
// Days clamped to [1, 365]
176+
const extremeDays = HistoryTemplates.buildHistoryQuery("clickhouse", 9999, 50)
177+
expect(extremeDays?.sql).toContain("today() - 365")
178+
179+
const zeroDays = HistoryTemplates.buildHistoryQuery("clickhouse", 0, 50)
180+
// Math.floor(0) || 30 = 30 (0 is falsy), then Math.max(1, Math.min(30, 365)) = 30
181+
expect(zeroDays?.sql).toContain("today() - 30")
182+
183+
// Limit clamped to [1, 10000]
184+
const extremeLimit = HistoryTemplates.buildHistoryQuery("clickhouse", 7, 999999)
185+
expect(extremeLimit?.sql).toContain("LIMIT 10000")
186+
187+
const zeroLimit = HistoryTemplates.buildHistoryQuery("clickhouse", 7, 0)
188+
// Math.floor(0) || 100 = 100 (0 is falsy), then Math.max(1, Math.min(100, 10000)) = 100
189+
expect(zeroLimit?.sql).toContain("LIMIT 100")
190+
})
191+
192+
test("ClickHouse buildHistoryQuery handles NaN and float inputs safely", () => {
193+
// NaN days defaults to 30 via || 30 fallback
194+
const nanDays = HistoryTemplates.buildHistoryQuery("clickhouse", NaN, 50)
195+
expect(nanDays?.sql).toContain("today() - 30")
196+
expect(nanDays?.sql).not.toContain("NaN")
197+
198+
// NaN limit defaults to 100 via || 100 fallback
199+
const nanLimit = HistoryTemplates.buildHistoryQuery("clickhouse", 7, NaN)
200+
expect(nanLimit?.sql).toContain("LIMIT 100")
201+
expect(nanLimit?.sql).not.toContain("NaN")
202+
203+
// Float values should be floored
204+
const floatInputs = HistoryTemplates.buildHistoryQuery("clickhouse", 7.9, 50.5)
205+
expect(floatInputs?.sql).toContain("today() - 7")
206+
expect(floatInputs?.sql).toContain("LIMIT 50")
207+
})
161208
})
162209

163210
describe("warehouse-advisor", () => {

packages/opencode/test/telemetry/telemetry.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,12 @@ const ALL_EVENT_TYPES: Telemetry.Event["type"][] = [
245245
"sql_execute_failure",
246246
"feature_suggestion",
247247
"core_failure",
248+
"sql_pre_validation",
248249
]
249250

250251
describe("telemetry.event-types", () => {
251252
test("all event types are valid", () => {
252-
expect(ALL_EVENT_TYPES.length).toBe(42)
253+
expect(ALL_EVENT_TYPES.length).toBe(43)
253254
})
254255
})
255256

0 commit comments

Comments
 (0)