Skip to content

Commit ba9882f

Browse files
authored
fix: sql_pre_validation telemetry review followups (#651)
* fix: address consensus review findings for sql_pre_validation Applies fixes from multi-model code review (Claude + GPT 5.4 + Gemini 3.1 Pro). Critical bugs (would corrupt shadow telemetry): - C1: validationResult.success was ignored — dispatcher failures with empty data evaluated as isValid=true, recording engine crashes as 'passed/valid'. Now checks .success first and emits outcome='error', reason='dispatcher_failed' on failure. - C2: database name was dropped from schema keys — multi-database warehouses collapsed DB1.PUBLIC.USERS and DB2.PUBLIC.USERS into one entry, and qualified queries got false-positive 'blocked' events. Keys now include database.schema.table; columns deduped per table to defend against residual collisions. Major fixes: - M1 (event-loop blocking): preValidateSql now yields via setImmediate before the synchronous bun:sqlite scan, so concurrent work isn't blocked while listColumns runs on large warehouses. - M2 (correlation fields): sql_pre_validation event now carries warehouse_type, query_type, and masked_sql_hash so shadow outcomes can be joined to sql_execute_failure events during analysis. - m5 (stale test list): sql_pre_validation added to ALL_EVENT_TYPES completeness check in telemetry.test.ts; count bumped 42 → 43. Minor fixes: - m3 (fragile matching): widened structural-error regex to include relation / identifier variants with word boundaries. - m4 (dead code): removed errorOutput construction — caller discards the result in shadow mode. Comment documents where to rebuild it when blocking mode is enabled. * fix: include 'view' in structural-error matching Follow-up from late Kimi K2.5 review: warehouses report view-not-found errors with 'view' keyword (e.g., 'View X does not exist'), which the regex previously missed — recorded as non_structural instead of blocked.
1 parent f59ef40 commit ba9882f

3 files changed

Lines changed: 70 additions & 34 deletions

File tree

packages/opencode/src/altimate/telemetry/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,8 +644,14 @@ export namespace Telemetry {
644644
session_id: string
645645
/** skipped = no cache or stale, passed = valid SQL, blocked = invalid SQL caught, error = validation itself failed */
646646
outcome: "skipped" | "passed" | "blocked" | "error"
647-
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, validation_exception */
647+
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, dispatcher_failed, validation_exception */
648648
reason: string
649+
/** warehouse driver type (postgres, snowflake, bigquery, ...) — enables per-warehouse catch-rate analysis */
650+
warehouse_type: string
651+
/** read / write / unknown — enables per-query-type analysis */
652+
query_type: string
653+
/** SHA-256 prefix of masked SQL — join key to sql_execute_failure events for same query */
654+
masked_sql_hash: string
649655
schema_columns: number
650656
/** true when schema scan hit the column-scan cap — flags samples biased by large-warehouse truncation */
651657
schema_truncated: boolean

packages/opencode/src/altimate/tools/sql-execute.ts

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export const SqlExecuteTool = Tool.define("sql_execute", {
4343
// but does NOT block execution. Used to measure catch rate before deciding
4444
// whether to enable blocking in a future release. Fire-and-forget so it
4545
// doesn't add latency to the sql_execute hot path.
46-
preValidateSql(args.query, args.warehouse).catch(() => {})
46+
preValidateSql(args.query, args.warehouse, queryType).catch(() => {})
4747
// altimate_change end
4848

4949
try {
@@ -115,22 +115,39 @@ interface PreValidationResult {
115115
error?: string
116116
}
117117

118-
async function preValidateSql(sql: string, warehouse?: string): Promise<PreValidationResult> {
118+
async function preValidateSql(sql: string, warehouse: string | undefined, queryType: string): Promise<PreValidationResult> {
119119
const startTime = Date.now()
120+
// Yield the event loop before heavy synchronous SQLite work so concurrent
121+
// tasks aren't blocked. Bun's sqlite API is sync and listColumns can touch
122+
// hundreds of thousands of rows for large warehouses.
123+
await new Promise<void>((resolve) => setImmediate(resolve))
124+
125+
// Precompute correlation fields used in every telemetry event this function emits.
126+
const maskedSqlHash = Telemetry.hashError(Telemetry.maskString(sql))
127+
120128
try {
121129
// Resolve the warehouse the same way sql.execute's fallback path does:
122130
// when caller omits `warehouse`, sql.execute uses Registry.list()[0].
123131
// Matching that here keeps the shadow validation aligned with actual
124132
// execution (dbt-routed queries are a known gap — they short-circuit
125133
// before this fallback, so validation may use a different warehouse
126134
// than the one dbt selects).
135+
const registered = Registry.list().warehouses
127136
let warehouseName = warehouse
128137
if (!warehouseName) {
129-
const registered = Registry.list().warehouses
130138
warehouseName = registered[0]?.name
131139
}
140+
const warehouseInfo = registered.find((w) => w.name === warehouseName)
141+
const warehouseType = warehouseInfo?.type ?? "unknown"
142+
143+
const ctx: TrackCtx = {
144+
warehouse_type: warehouseType,
145+
query_type: queryType,
146+
masked_sql_hash: maskedSqlHash,
147+
}
148+
132149
if (!warehouseName) {
133-
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false)
150+
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
134151
return { blocked: false }
135152
}
136153

@@ -139,31 +156,39 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid
139156

140157
const warehouseStatus = status.warehouses.find((w) => w.name === warehouseName)
141158
if (!warehouseStatus?.last_indexed) {
142-
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false)
159+
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
143160
return { blocked: false }
144161
}
145162

146163
// Check cache freshness
147164
const cacheAge = Date.now() - new Date(warehouseStatus.last_indexed).getTime()
148165
if (cacheAge > CACHE_TTL_MS) {
149-
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false)
166+
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false, ctx)
150167
return { blocked: false }
151168
}
152169

153170
// Build schema context from cached columns
154171
const columns = cache.listColumns(warehouseName, COLUMN_SCAN_LIMIT)
155172
const schemaTruncated = columns.length >= COLUMN_SCAN_LIMIT
156173
if (columns.length === 0) {
157-
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false)
174+
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false, ctx)
158175
return { blocked: false }
159176
}
160177

161-
const schemaContext: Record<string, any> = {}
178+
// Build schema context keyed by fully-qualified name (database.schema.table)
179+
// so multi-database warehouses don't collide on schema+table alone.
180+
// Dedupe columns per table to defend against residual collisions.
181+
const schemaContext: Record<string, { name: string; type: string; nullable: boolean }[]> = {}
182+
const seenColumns: Record<string, Set<string>> = {}
162183
for (const col of columns) {
163-
const tableName = col.schema_name ? `${col.schema_name}.${col.table}` : col.table
184+
const tableName = [col.database, col.schema_name, col.table].filter(Boolean).join(".")
185+
if (!tableName) continue
164186
if (!schemaContext[tableName]) {
165187
schemaContext[tableName] = []
188+
seenColumns[tableName] = new Set()
166189
}
190+
if (seenColumns[tableName].has(col.name)) continue
191+
seenColumns[tableName].add(col.name)
167192
schemaContext[tableName].push({
168193
name: col.name,
169194
type: col.data_type || "VARCHAR",
@@ -178,60 +203,61 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid
178203
schema_context: schemaContext,
179204
})
180205

206+
// If the dispatcher itself failed, don't treat missing data as "valid".
207+
if (!validationResult.success) {
208+
const errMsg = typeof validationResult.error === "string" ? validationResult.error : undefined
209+
trackPreValidation("error", "dispatcher_failed", 0, Date.now() - startTime, false, ctx, errMsg)
210+
return { blocked: false }
211+
}
212+
181213
const data = (validationResult.data ?? {}) as Record<string, any>
182214
const errors = Array.isArray(data.errors) ? data.errors : []
183215
const isValid = data.valid !== false && errors.length === 0
184216

185217
if (isValid) {
186-
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated)
218+
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated, ctx)
187219
return { blocked: false }
188220
}
189221

190222
// Only block on high-confidence structural errors
191223
const structuralErrors = errors.filter((e: any) => {
192224
const msg = (e.message ?? "").toLowerCase()
193-
return msg.includes("column") || msg.includes("table") || msg.includes("not found") || msg.includes("does not exist")
225+
return /\b(column|table|view|relation|identifier|not found|does not exist)\b/.test(msg)
194226
})
195227

196228
if (structuralErrors.length === 0) {
197229
// Non-structural errors (ambiguous cases) — let them through
198-
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated)
230+
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated, ctx)
199231
return { blocked: false }
200232
}
201233

202-
// Build helpful error with available columns
203234
const errorMsgs = structuralErrors.map((e: any) => e.message).join("\n")
204-
const referencedTables = Object.keys(schemaContext).slice(0, 10)
205-
const availableColumns = referencedTables
206-
.map((t) => `${t}: ${schemaContext[t].map((c: any) => c.name).join(", ")}`)
207-
.join("\n")
208-
209-
const errorOutput = [
210-
`Pre-execution validation failed (validated against cached schema):`,
211-
``,
212-
errorMsgs,
213-
``,
214-
`Available tables and columns:`,
215-
availableColumns,
216-
``,
217-
`Fix the query and retry. If the schema cache is outdated, run schema_index to refresh it.`,
218-
].join("\n")
219-
220-
trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, errorMsgs)
221-
return { blocked: true, error: errorOutput }
235+
trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, ctx, errorMsgs)
236+
// Shadow mode: caller discards the result. When blocking is enabled in the
237+
// future, build errorOutput here with the structural errors and
238+
// schemaContext keys for user-facing guidance.
239+
return { blocked: false }
222240
} catch {
223241
// Validation failure should never block execution
224-
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false)
242+
const ctx: TrackCtx = { warehouse_type: "unknown", query_type: queryType, masked_sql_hash: maskedSqlHash }
243+
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false, ctx)
225244
return { blocked: false }
226245
}
227246
}
228247

248+
interface TrackCtx {
249+
warehouse_type: string
250+
query_type: string
251+
masked_sql_hash: string
252+
}
253+
229254
function trackPreValidation(
230255
outcome: "skipped" | "passed" | "blocked" | "error",
231256
reason: string,
232257
schema_columns: number,
233258
duration_ms: number,
234259
schema_truncated: boolean,
260+
ctx: TrackCtx,
235261
error_message?: string,
236262
) {
237263
// Mask schema identifiers (table / column names, paths, user IDs) from the
@@ -244,6 +270,9 @@ function trackPreValidation(
244270
session_id: Telemetry.getContext().sessionId,
245271
outcome,
246272
reason,
273+
warehouse_type: ctx.warehouse_type,
274+
query_type: ctx.query_type,
275+
masked_sql_hash: ctx.masked_sql_hash,
247276
schema_columns,
248277
schema_truncated,
249278
duration_ms,

packages/opencode/test/telemetry/telemetry.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,12 @@ const ALL_EVENT_TYPES: Telemetry.Event["type"][] = [
245245
"sql_execute_failure",
246246
"feature_suggestion",
247247
"core_failure",
248+
"sql_pre_validation",
248249
]
249250

250251
describe("telemetry.event-types", () => {
251252
test("all event types are valid", () => {
252-
expect(ALL_EVENT_TYPES.length).toBe(42)
253+
expect(ALL_EVENT_TYPES.length).toBe(43)
253254
})
254255
})
255256

0 commit comments

Comments
 (0)