Skip to content

Commit e6ae16e

Browse files
committed
feat(tsql,webapp): reject cross-queue merges of per-queue counter states
deltaSumTimestamp states are kept per queue, and merging them across queues silently returns wrong totals, on the dashboard and the public query API alike. Columns can now declare a mergeGroupKey, and the compiler rejects queries that merge such a column without grouping by that key or pinning it to a single value. The error names the column, explains the failure, and includes a corrected example query.
1 parent 9750ceb commit e6ae16e

4 files changed

Lines changed: 235 additions & 0 deletions

File tree

apps/webapp/app/v3/querySchemas.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,7 @@ export const queueMetricsSchema: TableSchema = {
668668
// invalid (mixes unrelated odometers): totals must GROUP BY queue, then sum outside.
669669
enqueue_delta: {
670670
name: "enqueue_delta",
671+
mergeGroupKey: "queue",
671672
...column("String", {
672673
description:
673674
"Runs enqueued (cumulative-counter delta). Read with deltaSumTimestampMerge(enqueue_delta) grouped by queue. For totals across queues, sum the per-queue results in an outer query, never merge across queues.",
@@ -678,6 +679,7 @@ export const queueMetricsSchema: TableSchema = {
678679
},
679680
started_delta: {
680681
name: "started_delta",
682+
mergeGroupKey: "queue",
681683
...column("String", {
682684
description:
683685
"Runs dequeued/started (throughput). Read with deltaSumTimestampMerge(started_delta) grouped by queue. For totals across queues, sum the per-queue results in an outer query, never merge across queues.",
@@ -689,6 +691,7 @@ export const queueMetricsSchema: TableSchema = {
689691
},
690692
ack_delta: {
691693
name: "ack_delta",
694+
mergeGroupKey: "queue",
692695
...column("String", {
693696
description:
694697
"Runs acked (completed). Read with deltaSumTimestampMerge(ack_delta) grouped by queue; sum per-queue results for totals.",
@@ -699,6 +702,7 @@ export const queueMetricsSchema: TableSchema = {
699702
},
700703
nack_delta: {
701704
name: "nack_delta",
705+
mergeGroupKey: "queue",
702706
...column("String", {
703707
description:
704708
"Runs nacked. Read with deltaSumTimestampMerge(nack_delta) grouped by queue; sum per-queue results for totals.",
@@ -709,6 +713,7 @@ export const queueMetricsSchema: TableSchema = {
709713
},
710714
dlq_delta: {
711715
name: "dlq_delta",
716+
mergeGroupKey: "queue",
712717
...column("String", {
713718
description:
714719
"Runs dead-lettered. Read with deltaSumTimestampMerge(dlq_delta) grouped by queue; sum per-queue results for totals.",

internal-packages/tsql/src/query/printer.test.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3995,6 +3995,7 @@ describe("cross-queue counter totals via subquery (env-wide throughput shape)",
39953995
queue_name: { name: "queue_name", ...column("String") },
39963996
started_delta: {
39973997
name: "started_delta",
3998+
mergeGroupKey: "queue_name",
39983999
...column("String"),
39994000
groupable: false,
40004001
sortable: false,
@@ -4047,3 +4048,96 @@ describe("cross-queue counter totals via subquery (env-wide throughput shape)",
40474048
expect(Object.values(params)).toContain("org_test123");
40484049
});
40494050
});
4051+
4052+
describe("mergeGroupKey validation", () => {
4053+
const metricsSchema: TableSchema = {
4054+
name: "metrics",
4055+
clickhouseName: "trigger_dev.queue_metrics_v1",
4056+
timeConstraint: "bucket_at",
4057+
columns: {
4058+
bucket_at: { name: "bucket_at", ...column("DateTime64") },
4059+
queue: { name: "queue", clickhouseName: "queue_name", ...column("String") },
4060+
started_delta: {
4061+
name: "started_delta",
4062+
mergeGroupKey: "queue",
4063+
...column("String"),
4064+
groupable: false,
4065+
sortable: false,
4066+
filterable: false,
4067+
},
4068+
organization_id: { name: "organization_id", ...column("String") },
4069+
},
4070+
tenantColumns: { organizationId: "organization_id" },
4071+
};
4072+
4073+
function compile(
4074+
query: string,
4075+
enforced: Record<string, unknown> = { organization_id: { op: "eq", value: "org_x" } }
4076+
) {
4077+
const context = createPrinterContext({
4078+
schema: createSchemaRegistry([metricsSchema]),
4079+
enforcedWhereClause: enforced as never,
4080+
timeRange: {
4081+
from: new Date("2024-01-01T00:00:00Z"),
4082+
to: new Date("2024-01-08T00:00:00Z"),
4083+
},
4084+
});
4085+
return printToClickHouse(parseTSQLSelect(query), context);
4086+
}
4087+
4088+
it("rejects an ungrouped, unpinned merge with an actionable message", () => {
4089+
expect(() =>
4090+
compile(
4091+
"SELECT timeBucket() AS t, deltaSumTimestampMerge(started_delta) AS started FROM metrics GROUP BY t"
4092+
)
4093+
).toThrowError(
4094+
/Merging 'started_delta' across every queue[\s\S]*GROUP BY queue\)[\s\S]*WHERE queue = 'my-queue'[\s\S]*inner GROUP BY t, queue and outer GROUP BY t/
4095+
);
4096+
});
4097+
4098+
it("allows the merge when queue is in the GROUP BY", () => {
4099+
const { sql } = compile(
4100+
"SELECT timeBucket() AS t, queue, deltaSumTimestampMerge(started_delta) AS started FROM metrics GROUP BY t, queue"
4101+
);
4102+
expect(sql).toContain("deltaSumTimestampMerge(started_delta)");
4103+
});
4104+
4105+
it("allows the merge when queue is pinned by an equality filter", () => {
4106+
const { sql } = compile(
4107+
"SELECT deltaSumTimestampMerge(started_delta) AS started FROM metrics WHERE queue = 'emails'"
4108+
);
4109+
expect(sql).toContain("deltaSumTimestampMerge(started_delta)");
4110+
});
4111+
4112+
it("allows the merge when the enforced clause pins queue to one value", () => {
4113+
const { sql } = compile(
4114+
"SELECT deltaSumTimestampMerge(started_delta) AS started FROM metrics",
4115+
{ organization_id: { op: "eq", value: "org_x" }, queue: { op: "in", values: ["emails"] } }
4116+
);
4117+
expect(sql).toContain("deltaSumTimestampMerge(started_delta)");
4118+
});
4119+
4120+
it("rejects the merge when the enforced clause spans several queues", () => {
4121+
expect(() =>
4122+
compile("SELECT deltaSumTimestampMerge(started_delta) AS started FROM metrics", {
4123+
organization_id: { op: "eq", value: "org_x" },
4124+
queue: { op: "in", values: ["emails", "webhooks"] },
4125+
})
4126+
).toThrowError(/only combine correctly within one queue/);
4127+
});
4128+
4129+
it("allows a grouped inner merge summed by the outer query", () => {
4130+
const { sql } = compile(
4131+
"SELECT t, sum(started) AS started FROM (SELECT timeBucket() AS t, queue, deltaSumTimestampMerge(started_delta) AS started FROM metrics GROUP BY t, queue) GROUP BY t ORDER BY t"
4132+
);
4133+
expect(sql).toContain("GROUP BY t, queue_name");
4134+
});
4135+
4136+
it("rejects an ungrouped merge inside a subquery", () => {
4137+
expect(() =>
4138+
compile(
4139+
"SELECT t, sum(started) AS started FROM (SELECT timeBucket() AS t, deltaSumTimestampMerge(started_delta) AS started FROM metrics GROUP BY t) GROUP BY t"
4140+
)
4141+
).toThrowError(/only combine correctly within one queue/);
4142+
});
4143+
});

internal-packages/tsql/src/query/printer.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,8 @@ export class ClickHousePrinter {
385385
nextJoin = nextJoin.next_join;
386386
}
387387

388+
this.validateMergeScopedColumns(node);
389+
388390
// Extract SELECT column aliases BEFORE visiting columns
389391
// This allows ORDER BY/HAVING to reference aliased columns
390392
const savedAliases = this.selectAliases;
@@ -1883,6 +1885,134 @@ export class ClickHousePrinter {
18831885
// Note: projectId and environmentId are optional - no validation needed
18841886
}
18851887

1888+
/**
1889+
* Reject queries that merge a scope-keyed aggregate state column (`mergeGroupKey`)
1890+
* across values of its key: such merges silently return wrong numbers. Valid shapes
1891+
* group by the key column or pin it to a single value (in the query's WHERE or via
1892+
* the enforced clause). Runs per SELECT scope; subqueries validate themselves.
1893+
*/
1894+
private validateMergeScopedColumns(node: SelectQuery): void {
1895+
for (const tableSchema of this.tableContexts.values()) {
1896+
for (const column of Object.values(tableSchema.columns)) {
1897+
const key = column.mergeGroupKey;
1898+
if (!key) continue;
1899+
if (!this.scopeReferencesColumn(node, column.name)) continue;
1900+
if (this.groupByIncludesColumn(node, key)) continue;
1901+
if (this.wherePinsColumn(node.where, key)) continue;
1902+
if (this.enforcedPinsColumn(tableSchema, key)) continue;
1903+
throw new QueryError(
1904+
`Merging '${column.name}' across every ${key} returns wrong totals: its aggregate ` +
1905+
`states are kept per ${key} and only combine correctly within one ${key}. Either ` +
1906+
`add '${key}' to the GROUP BY and sum the per-${key} results in an outer query, ` +
1907+
`for example: SELECT sum(v) AS total FROM (SELECT ${key}, ` +
1908+
`deltaSumTimestampMerge(${column.name}) AS v FROM ${tableSchema.name} ` +
1909+
`GROUP BY ${key}). Or filter to a single ${key}, for example: ` +
1910+
`WHERE ${key} = 'my-${key}'. For a time series, bucket both layers: ` +
1911+
`inner GROUP BY t, ${key} and outer GROUP BY t.`
1912+
);
1913+
}
1914+
}
1915+
}
1916+
1917+
private scopeReferencesColumn(node: SelectQuery, name: string): boolean {
1918+
const parts: unknown[] = [
1919+
node.select,
1920+
node.prewhere,
1921+
node.where,
1922+
node.group_by,
1923+
node.having,
1924+
node.order_by,
1925+
];
1926+
return parts.some((part) => this.expressionReferencesColumn(part, name));
1927+
}
1928+
1929+
private expressionReferencesColumn(
1930+
expr: unknown,
1931+
name: string,
1932+
seen = new WeakSet<object>()
1933+
): boolean {
1934+
if (expr === null || typeof expr !== "object") return false;
1935+
if (seen.has(expr)) return false;
1936+
seen.add(expr);
1937+
if (Array.isArray(expr)) {
1938+
return expr.some((item) => this.expressionReferencesColumn(item, name, seen));
1939+
}
1940+
const candidate = expr as { expression_type?: string; chain?: unknown[] };
1941+
if (
1942+
candidate.expression_type === "select_query" ||
1943+
candidate.expression_type === "select_set_query"
1944+
) {
1945+
return false;
1946+
}
1947+
if (
1948+
candidate.expression_type === "field" &&
1949+
Array.isArray(candidate.chain) &&
1950+
candidate.chain[candidate.chain.length - 1] === name
1951+
) {
1952+
return true;
1953+
}
1954+
return Object.entries(expr).some(
1955+
([property, value]) =>
1956+
property !== "type" &&
1957+
property !== "parent" &&
1958+
this.expressionReferencesColumn(value, name, seen)
1959+
);
1960+
}
1961+
1962+
private groupByIncludesColumn(node: SelectQuery, name: string): boolean {
1963+
return (node.group_by ?? []).some((expr) => {
1964+
const field = expr as Field;
1965+
return (
1966+
field.expression_type === "field" &&
1967+
Array.isArray(field.chain) &&
1968+
field.chain[field.chain.length - 1] === name
1969+
);
1970+
});
1971+
}
1972+
1973+
// Pins only count on the top-level AND chain: a pin inside an OR guarantees nothing.
1974+
private wherePinsColumn(where: Expression | undefined, name: string): boolean {
1975+
if (!where) return false;
1976+
if (where.expression_type === "and") {
1977+
return (where as And).exprs.some((expr) => this.wherePinsColumn(expr, name));
1978+
}
1979+
if (where.expression_type !== "compare_operation") return false;
1980+
const cmp = where as CompareOperation;
1981+
const isKeyField = (side: Expression) => {
1982+
const field = side as Field;
1983+
return (
1984+
field.expression_type === "field" &&
1985+
Array.isArray(field.chain) &&
1986+
field.chain[field.chain.length - 1] === name
1987+
);
1988+
};
1989+
const fieldSide = [cmp.left, cmp.right].find(isKeyField);
1990+
if (!fieldSide) return false;
1991+
if (cmp.op === CompareOperationOp.Eq) return true;
1992+
if (cmp.op === CompareOperationOp.In || cmp.op === CompareOperationOp.GlobalIn) {
1993+
const other = fieldSide === cmp.left ? cmp.right : cmp.left;
1994+
if ((other as Constant).expression_type === "constant") return true;
1995+
const tuple = other as Tuple;
1996+
return tuple.expression_type === "tuple" && tuple.exprs.length === 1;
1997+
}
1998+
return false;
1999+
}
2000+
2001+
private enforcedPinsColumn(tableSchema: TableSchema, key: string): boolean {
2002+
const names = [key];
2003+
const clickhouseName = tableSchema.columns[key]?.clickhouseName;
2004+
if (clickhouseName) names.push(clickhouseName);
2005+
for (const name of names) {
2006+
const condition = this.context.enforcedWhereClause[name] as
2007+
| { op?: string; values?: unknown[] }
2008+
| undefined;
2009+
if (!condition) continue;
2010+
if (condition.op === "eq") return true;
2011+
if (condition.op === "in" && condition.values?.length === 1) return true;
2012+
}
2013+
return false;
2014+
}
2015+
18862016
/**
18872017
* Format a Date as a ClickHouse-compatible DateTime64 string.
18882018
* ClickHouse expects format: 'YYYY-MM-DD HH:MM:SS.mmm' (in UTC)

internal-packages/tsql/src/query/schema.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ export interface ColumnSchema {
128128
* = counter (missing buckets get 0).
129129
*/
130130
fillMode?: "zero" | "carry";
131+
/**
132+
* Aggregate-state column whose states only merge correctly within one value of the
133+
* named column (e.g. per-queue counter states). Queries referencing it must GROUP BY
134+
* that column or pin it to a single value; other shapes fail to compile.
135+
*/
136+
mergeGroupKey?: string;
131137
/**
132138
* Example value for documentation purposes.
133139
*

0 commit comments

Comments
 (0)