Skip to content

Commit 46b7106

Browse files
authored
feat(tesseract): Support separate pre-aggregations for different multi-stage subqueries (cube-js#10684)
1 parent 02695af commit 46b7106

27 files changed

Lines changed: 984 additions & 134 deletions

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,14 +297,57 @@ export class PreAggregationPartitionRangeLoader {
297297
.map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`)
298298
.join(' UNION ALL ');
299299

300+
const baseTargetTableName = allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`;
301+
302+
// Build per-usage target table names if usageMapping is present
303+
let usageTargetTableNames: Record<string, string> | undefined;
304+
if (this.preAggregation.usageMapping) {
305+
usageTargetTableNames = {};
306+
for (const [suffix, usageInfo] of Object.entries(this.preAggregation.usageMapping)) {
307+
if (usageInfo.dateRange && this.preAggregation.partitionGranularity) {
308+
// Load partition ranges specific to this usage's dateRange.
309+
// Use partitionRange (generated locally via timeSeries, always in DEFAULT_TS_FORMAT)
310+
// instead of buildRangeEnd (from DB, may include Z suffix depending on driver timestampFormat).
311+
const usageDateRange = PreAggregationPartitionRangeLoader.intersectDateRanges(
312+
[loadResults[0]?.partitionRange?.[0] || null, loadResults[loadResults.length - 1]?.partitionRange?.[1] || null] as QueryDateRange,
313+
usageInfo.dateRange as QueryDateRange,
314+
);
315+
if (usageDateRange) {
316+
const usagePartitions = loadResults.filter(r => {
317+
if (!r.partitionRange) return true;
318+
const [pStart, pEnd] = r.partitionRange;
319+
const [uStart, uEnd] = usageDateRange;
320+
return pEnd >= uStart && pStart <= uEnd;
321+
});
322+
const usageTableNames = usagePartitions.map(r => r.targetTableName);
323+
if (usageTableNames.length === 1) {
324+
[usageTargetTableNames[suffix]] = usageTableNames;
325+
} else if (usageTableNames.length > 0) {
326+
const usageUnion = usageTableNames
327+
.map(t => `SELECT * FROM ${t}`)
328+
.join(' UNION ALL ');
329+
usageTargetTableNames[suffix] = `(${usageUnion})`;
330+
} else {
331+
usageTargetTableNames[suffix] = baseTargetTableName;
332+
}
333+
} else {
334+
usageTargetTableNames[suffix] = baseTargetTableName;
335+
}
336+
} else {
337+
usageTargetTableNames[suffix] = baseTargetTableName;
338+
}
339+
}
340+
}
341+
300342
return {
301-
targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`,
343+
targetTableName: baseTargetTableName,
302344
refreshKeyValues: loadResults.map(t => t.refreshKeyValues),
303345
lastUpdatedAt,
304346
buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd,
305347
lambdaTable,
306348
rollupLambdaId: this.preAggregation.rollupLambdaId,
307349
isMultiTableUnion: allTableTargetNames.length > 1,
350+
usageTargetTableNames,
308351
};
309352
} else {
310353
return new PreAggregationLoader(

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ export type LoadPreAggregationResult = {
149149
rollupLambdaId?: string;
150150
partitionRange?: QueryDateRange;
151151
isMultiTableUnion?: boolean;
152+
usageTargetTableNames?: Record<string, string>;
152153
};
153154

154155
export type PreAggregationTableToTempTable = [string, LoadPreAggregationResult];
@@ -192,6 +193,7 @@ export type PreAggregationDescription = {
192193
sealAt?: string;
193194
rollupLambdaId?: string;
194195
lastRollupLambda?: boolean;
196+
usageMapping?: Record<string, { dateRange?: QueryDateRange }>;
195197
};
196198

197199
export const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy(
@@ -570,10 +572,22 @@ export class PreAggregations {
570572
);
571573
}
572574

573-
return [p.tableName, usedPreAggregation];
575+
return [p.tableName, usedPreAggregation] as PreAggregationTableToTempTable;
574576
};
575577

576-
return preAggregationPromise().then(res => preAggregationsTablesToTempTables.concat([res]));
578+
return preAggregationPromise().then(([tableName, result]) => {
579+
const { usageTargetTableNames } = result;
580+
if (usageTargetTableNames && Object.keys(usageTargetTableNames).length > 0) {
581+
const entries: PreAggregationTableToTempTable[] = Object.entries(usageTargetTableNames).map(
582+
([suffix, usageTarget]) => [
583+
`${tableName}${suffix}`,
584+
{ ...result, targetTableName: usageTarget, usageTargetTableNames: undefined },
585+
]
586+
);
587+
return preAggregationsTablesToTempTables.concat(entries);
588+
}
589+
return preAggregationsTablesToTempTables.concat([[tableName, result]]);
590+
});
577591
}).reduce((promise, fn) => promise.then(fn), Promise.resolve([]));
578592

579593
return preAggregationsTablesToTempTablesPromise.then(preAggregationsTablesToTempTables => ({

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,7 @@ export class QueryCache {
429429
? queryAndParams
430430
: [queryAndParams, []];
431431
const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce(
432-
(query, [tableName, { targetTableName }]) => (
433-
QueryCache.replaceAll(tableName, targetTableName, query)
434-
),
432+
(query, [tableName, { targetTableName }]) => QueryCache.replaceAll(tableName, targetTableName, query),
435433
keyQuery
436434
);
437435
return Array.isArray(queryAndParams)

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -959,11 +959,9 @@ export class BaseQuery {
959959
try {
960960
const buildResult = nativeBuildSqlAndParams(queryParams);
961961

962-
const [query, params, preAggregation] = buildResult;
962+
const [query, params, preAggResult] = buildResult;
963963
const paramsArray = [...params];
964-
if (preAggregation) {
965-
this.preAggregations.preAggregationForQuery = preAggregation;
966-
}
964+
this.applyNativePreAggResult(preAggResult);
967965
return [query, paramsArray];
968966
} catch (e) {
969967
if (e.name === 'TesseractUserError') {
@@ -1009,8 +1007,21 @@ export class BaseQuery {
10091007

10101008
const buildResult = nativeBuildSqlAndParams(queryParams);
10111009

1012-
const [, , preAggregation] = buildResult;
1013-
return preAggregation;
1010+
const [, , preAggResult] = buildResult;
1011+
this.applyNativePreAggResult(preAggResult);
1012+
return this.preAggregations.preAggregationForQuery;
1013+
}
1014+
1015+
applyNativePreAggResult(preAggResult) {
1016+
if (!preAggResult) return;
1017+
if (Array.isArray(preAggResult)) {
1018+
this.preAggregations.preAggregationUsageInfos = preAggResult;
1019+
const first = preAggResult[0];
1020+
this.preAggregations.preAggregationForQuery =
1021+
this.getPreAggregationByName(first.cubeName, first.preAggregationName);
1022+
} else {
1023+
this.preAggregations.preAggregationForQuery = preAggResult;
1024+
}
10141025
}
10151026

10161027
allCubeMembers(path) {
@@ -3496,7 +3507,7 @@ export class BaseQuery {
34963507
}
34973508

34983509
escapeStringLiteral(str) {
3499-
return `'${str.replace(/'/g, "''")}'`;
3510+
return `'${str.replace(/'/g, '\'\'')}'`;
35003511
}
35013512

35023513
autoPrefixAndEvaluateSql(cubeName, sql, isMemberExpr = false) {

packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@ export type FullPreAggregationDescription = any;
9494
*/
9595
export type TransformedQuery = any;
9696

97+
export type UsageDateRangeInfo = {
98+
dateRange?: [string, string];
99+
};
100+
101+
export type PreAggregationUsageInfo = {
102+
cubeName: string;
103+
preAggregationName: string;
104+
external: boolean;
105+
usages: Record<string, UsageDateRangeInfo>;
106+
};
107+
97108
export class PreAggregations {
98109
private readonly query: BaseQuery;
99110

@@ -107,6 +118,8 @@ export class PreAggregations {
107118

108119
public preAggregationForQuery: PreAggregationForQuery | undefined = undefined;
109120

121+
public preAggregationUsageInfos: PreAggregationUsageInfo[] | undefined = undefined;
122+
110123
public constructor(query: BaseQuery, historyQueries, cubeLatticeCache) {
111124
this.query = query;
112125
this.historyQueries = historyQueries;
@@ -137,6 +150,10 @@ export class PreAggregations {
137150
const isInPreAggregationQuery = this.query.options.preAggregationQuery;
138151
if (!isInPreAggregationQuery) {
139152
const preAggregationForQuery = this.findPreAggregationForQuery();
153+
// Check usageInfos after findPreAggregationForQuery (which may populate them)
154+
if (this.preAggregationUsageInfos && this.preAggregationUsageInfos.length > 0) {
155+
return this.preAggregationDescriptionsForUsageInfos(this.preAggregationUsageInfos);
156+
}
140157
if (preAggregationForQuery) {
141158
return this.preAggregationDescriptionsFor(preAggregationForQuery);
142159
}
@@ -165,6 +182,48 @@ export class PreAggregations {
165182
return join.joins.map(j => j.originalTo).concat([join.root]);
166183
}
167184

185+
private preAggregationDescriptionsForUsageInfos(usageInfos: PreAggregationUsageInfo[]): FullPreAggregationDescription[] {
186+
return usageInfos.flatMap(usageInfo => {
187+
const preAggObj = this.getRollupPreAggregationByName(usageInfo.cubeName, usageInfo.preAggregationName);
188+
if (!preAggObj || !('preAggregationName' in preAggObj)) {
189+
return [];
190+
}
191+
const foundPreAgg = preAggObj as PreAggregationForQuery;
192+
193+
// One description per physical pre-aggregation, with usageMapping attached
194+
const descriptions = this.preAggregationDescriptionsFor(foundPreAgg);
195+
196+
// Compute the union of all usage date ranges so that partitions cover
197+
// every usage (e.g. time_shift may require earlier partitions).
198+
const mergedDateRange = PreAggregations.mergeUsageDateRanges(usageInfo.usages);
199+
200+
return descriptions.map(desc => ({
201+
...desc,
202+
usageMapping: usageInfo.usages,
203+
...(mergedDateRange && desc.matchedTimeDimensionDateRange ? { matchedTimeDimensionDateRange: mergedDateRange } : {}),
204+
}));
205+
});
206+
}
207+
208+
private static mergeUsageDateRanges(usages: Record<string, UsageDateRangeInfo>): [string, string] | null {
209+
let minDate: string | null = null;
210+
let maxDate: string | null = null;
211+
212+
for (const usage of Object.values(usages)) {
213+
if (usage.dateRange) {
214+
const [from, to] = usage.dateRange;
215+
if (!minDate || from < minDate) {
216+
minDate = from;
217+
}
218+
if (!maxDate || to > maxDate) {
219+
maxDate = to;
220+
}
221+
}
222+
}
223+
224+
return minDate && maxDate ? [minDate, maxDate] : null;
225+
}
226+
168227
private preAggregationDescriptionsFor(foundPreAggregation: PreAggregationForQuery): FullPreAggregationDescription[] {
169228
let preAggregations: PreAggregationForQuery[] = [foundPreAggregation];
170229
if (foundPreAggregation.preAggregation.type === 'rollupJoin') {

0 commit comments

Comments
 (0)