diff --git a/packages/cubejs-backend-native/src/bridge_test_exports.rs b/packages/cubejs-backend-native/src/bridge_test_exports.rs index eb1cb4b15aa4b..f569174040f89 100644 --- a/packages/cubejs-backend-native/src/bridge_test_exports.rs +++ b/packages/cubejs-backend-native/src/bridge_test_exports.rs @@ -69,6 +69,9 @@ use cubesqlplanner::cube_bridge::{ multi_stage_filter::{ multi_stage_filter_references_bridge_fields_meta, NativeMultiStageFilterReferences, }, + multi_stage_grain::{ + multi_stage_grain_references_bridge_fields_meta, NativeMultiStageGrainReferences, + }, pre_aggregation_description::{ pre_aggregation_description_bridge_fields_meta, NativePreAggregationDescription, PreAggregationDescription, @@ -450,6 +453,7 @@ bridge_registry! { "memberExpressionDefinition" => NativeMemberExpressionDefinition, member_expression_definition_bridge_fields_meta, invoke_member_expression_definition; "memberOrderBy" => NativeMemberOrderBy, member_order_by_bridge_fields_meta, invoke_member_order_by; "multiStageFilter" => NativeMultiStageFilterReferences, multi_stage_filter_references_bridge_fields_meta, invoke_multi_stage_filter; + "multiStageGrain" => NativeMultiStageGrainReferences, multi_stage_grain_references_bridge_fields_meta, invoke_multi_stage_grain; "preAggregationDescription" => NativePreAggregationDescription, pre_aggregation_description_bridge_fields_meta, invoke_pre_aggregation_description; "preAggregationObj" => NativePreAggregationObj, pre_aggregation_obj_bridge_fields_meta, invoke_pre_aggregation_obj; "preAggregationTimeDimension" => NativePreAggregationTimeDimension, pre_aggregation_time_dimension_bridge_fields_meta, invoke_pre_aggregation_time_dimension; @@ -748,6 +752,7 @@ fn invoke_measure_definition(b: &NativeMeasureDefinition) -> r.record("case", b.case()); r.record("filters", b.filters()); r.record("filter", b.filter()); + r.record("grain", b.grain()); r.record("drill_filters", b.drill_filters()); r.record("order_by", b.order_by()); r.record("mask_sql", b.mask_sql()); @@ -764,6 +769,13 @@ fn invoke_multi_stage_filter( InvokeResult::new() } +fn invoke_multi_stage_grain( + _b: &NativeMultiStageGrainReferences, +) -> InvokeResult { + // Static-only bridge — same shape as `invoke_multi_stage_filter`. + InvokeResult::new() +} + fn invoke_expression_struct(b: &NativeExpressionStruct) -> InvokeResult { let mut r = InvokeResult::new(); r.record("add_filters", b.add_filters()); diff --git a/packages/cubejs-backend-native/test/bridge/bridge-fixtures.ts b/packages/cubejs-backend-native/test/bridge/bridge-fixtures.ts index 609170b026e90..67fb875c3af11 100644 --- a/packages/cubejs-backend-native/test/bridge/bridge-fixtures.ts +++ b/packages/cubejs-backend-native/test/bridge/bridge-fixtures.ts @@ -83,6 +83,17 @@ export const multiStageFilterFixture = (): unknown => ({ include: [{ member: 'orders.amount', operator: 'gt', values: ['0'] }], }); +// MultiStageGrainReferences mirrors the filter bridge — static-only. The +// bridge is structurally permissive (both `excludeReferences` and +// `keepOnlyReferences` could deserialize at once); the `.nand` lives on the +// JS Joi validator. The fixture populates only one of them to match the +// schema contract. `include` is a plain reference list, not the structured +// filter items the filter bridge uses. +export const multiStageGrainFixture = (): unknown => ({ + excludeReferences: ['orders.region'], + includeReferences: ['orders.category'], +}); + export const memberDefinitionFixture = (): unknown => ({ type: 'dimension', // sql is optional @@ -284,6 +295,7 @@ export const FIXTURES: Record = { memberExpressionDefinition: memberExpressionDefinitionFixture, memberOrderBy: memberOrderByFixture, multiStageFilter: multiStageFilterFixture, + multiStageGrain: multiStageGrainFixture, preAggregationDescription: preAggregationDescriptionFixture, preAggregationObj: preAggregationObjFixture, preAggregationTimeDimension: preAggregationTimeDimensionFixture, diff --git a/packages/cubejs-backend-native/test/bridge/object-bridges-coverage.test.ts b/packages/cubejs-backend-native/test/bridge/object-bridges-coverage.test.ts index dbe371322c323..629ae33d5945a 100644 --- a/packages/cubejs-backend-native/test/bridge/object-bridges-coverage.test.ts +++ b/packages/cubejs-backend-native/test/bridge/object-bridges-coverage.test.ts @@ -181,6 +181,7 @@ const BRIDGES: BridgeSpec[] = [ 'drill_filters', 'filter', 'filters', + 'grain', 'group_by_references', 'mask_sql', 'measure_type', @@ -203,6 +204,10 @@ const BRIDGES: BridgeSpec[] = [ name: 'multiStageFilter', expected: ['exclude', 'include', 'keep_only', 'mode'], }, + { + name: 'multiStageGrain', + expected: ['exclude', 'include', 'keep_only'], + }, { name: 'preAggregationDescription', expected: [ diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts index 67db3b77e4097..b86c36102d24b 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts @@ -54,6 +54,16 @@ export type MultiStageFilterDirective = { keepOnlyReferences?: string[]; }; +export type MultiStageGrainDirective = { + exclude?: (...args: Array) => Array; + keepOnly?: (...args: Array) => Array; + include?: (...args: Array) => Array; + // Resolved sibling fields populated by `evaluateMultiStageReferences`. + excludeReferences?: string[]; + keepOnlyReferences?: string[]; + includeReferences?: string[]; +}; + export type DimensionDefinition = { type: string; sql(): string; @@ -98,6 +108,7 @@ export type MeasureDefinition = { groupBy?: (...args: Array) => Array; reduceBy?: (...args: Array) => Array; addGroupBy?: (...args: Array) => Array; + grain?: MultiStageGrainDirective; timeShift?: TimeShiftDefinition[]; groupByReferences?: string[]; reduceByReferences?: string[]; @@ -626,6 +637,17 @@ export class CubeEvaluator extends CubeSymbols { member.filter.keepOnlyReferences = this.evaluateReferences(cubeName, member.filter.keepOnly); } } + if (member.grain) { + if (typeof member.grain.exclude === 'function') { + member.grain.excludeReferences = this.evaluateReferences(cubeName, member.grain.exclude); + } + if (typeof member.grain.keepOnly === 'function') { + member.grain.keepOnlyReferences = this.evaluateReferences(cubeName, member.grain.keepOnly); + } + if (typeof member.grain.include === 'function') { + member.grain.includeReferences = this.evaluateReferences(cubeName, member.grain.include); + } + } } } } diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts index d01c5dd1e5b77..ccb4aa918168a 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeValidator.ts @@ -868,6 +868,12 @@ const MultiStageFilter = Joi.object().keys({ ), }).nand('exclude', 'keepOnly'); +const MultiStageGrain = Joi.object().keys({ + exclude: Joi.func(), + keepOnly: Joi.func(), + include: Joi.func(), +}).nand('exclude', 'keepOnly'); + const CaseSchema = Joi.object().keys({ when: Joi.array().items(Joi.object().keys({ sql: Joi.func().required(), @@ -916,6 +922,7 @@ const MeasuresSchema = Joi.object().pattern(identifierRegex, Joi.alternatives(). reduceBy: Joi.func(), addGroupBy: Joi.func(), filter: MultiStageFilter, + grain: MultiStageGrain, timeShift: Joi.alternatives().conditional(Joi.array().length(1), { then: Joi.array().items(timeShiftItemOptional), otherwise: Joi.array().items(timeShiftItemRequired) diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts index 5cc43fd99b91a..b7eaed26cd680 100644 --- a/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts @@ -20,6 +20,7 @@ export const transpiledFieldsPatterns: Array = [ /^measures\.[_a-zA-Z][_a-zA-Z0-9]*\.(timeShift|time_shift)\.[0-9]+\.(timeDimension|time_dimension)$/, /^measures\.[_a-zA-Z][_a-zA-Z0-9]*\.(reduceBy|reduce_by|groupBy|group_by|addGroupBy|add_group_by)$/, /^(measures|dimensions)\.[_a-zA-Z][_a-zA-Z0-9]*\.filter\.(exclude|keepOnly|keep_only)$/, + /^measures\.[_a-zA-Z][_a-zA-Z0-9]*\.grain\.(exclude|keepOnly|keep_only|include)$/, /^(measures|dimensions)\.[_a-zA-Z][_a-zA-Z0-9]*\.case\.switch$/, /^dimensions\.[_a-zA-Z][_a-zA-Z0-9]*\.(reduceBy|reduce_by|groupBy|group_by|addGroupBy|add_group_by|key)$/, /^(preAggregations|pre_aggregations)\.[_a-zA-Z][_a-zA-Z0-9]*\.indexes\.[_a-zA-Z][_a-zA-Z0-9]*\.columns$/, diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/multi-stage-grain.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/multi-stage-grain.test.ts new file mode 100644 index 0000000000000..d7a44751903c4 --- /dev/null +++ b/packages/cubejs-schema-compiler/test/integration/postgres/multi-stage-grain.test.ts @@ -0,0 +1,271 @@ +import { getEnv } from '@cubejs-backend/shared'; +import { prepareYamlCompiler } from '../../unit/PrepareCompiler'; +import { dbRunner } from './PostgresDBRunner'; + +// Smoke tests for the multi-stage `grain:` directive going through the +// JS schema compiler end-to-end (YAML → Joi → transpiler → Tesseract → SQL). +// The Rust planner has the exhaustive coverage; this file just confirms the +// JS pipeline accepts each grain shape and produces correct results. +// +// Inline data (6 rows; (completed, books) spans two rows so that +// `keep_only: [status, category]` resolves to a value distinct from +// `total_amount`): +// id status category amount created_at +// 1 completed books 100 2024-01-10 +// 2 completed electronics 200 2024-01-15 +// 3 pending books 50 2024-02-12 +// 4 pending electronics 75 2024-02-18 +// 5 cancelled books 30 2024-03-10 +// 6 completed books 70 2024-01-20 +// +// by status: completed=370, pending=125, cancelled=30 +// by category: books=250, electronics=275 +// by (status,category): (completed,books)=170, (completed,electronics)=200, +// (pending,books)=50, (pending,electronics)=75, +// (cancelled,books)=30 +// total: 525 +describe('Multi-Stage grain directive', () => { + jest.setTimeout(200000); + + const { compiler, joinGraph, cubeEvaluator } = prepareYamlCompiler(` +cubes: + - name: orders + sql: > + SELECT 1 as ID, 'completed' as STATUS, 'books' as CATEGORY, 100 as AMOUNT, '2024-01-10T00:00:00.000Z'::timestamptz as CREATED_AT + union all + SELECT 2 as ID, 'completed' as STATUS, 'electronics' as CATEGORY, 200 as AMOUNT, '2024-01-15T00:00:00.000Z'::timestamptz as CREATED_AT + union all + SELECT 3 as ID, 'pending' as STATUS, 'books' as CATEGORY, 50 as AMOUNT, '2024-02-12T00:00:00.000Z'::timestamptz as CREATED_AT + union all + SELECT 4 as ID, 'pending' as STATUS, 'electronics' as CATEGORY, 75 as AMOUNT, '2024-02-18T00:00:00.000Z'::timestamptz as CREATED_AT + union all + SELECT 5 as ID, 'cancelled' as STATUS, 'books' as CATEGORY, 30 as AMOUNT, '2024-03-10T00:00:00.000Z'::timestamptz as CREATED_AT + union all + SELECT 6 as ID, 'completed' as STATUS, 'books' as CATEGORY, 70 as AMOUNT, '2024-01-20T00:00:00.000Z'::timestamptz as CREATED_AT + + dimensions: + - name: id + sql: ID + type: number + primary_key: true + + - name: status + sql: STATUS + type: string + + - name: category + sql: CATEGORY + type: string + + - name: created_at + sql: CREATED_AT + type: time + + measures: + - name: total_amount + sql: AMOUNT + type: sum + + # ── single-element variants ──────────────────────────────────── + - name: amount_grain_exclude_status + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + exclude: + - orders.status + + - name: amount_grain_keep_only_status + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + keep_only: + - orders.status + + - name: amount_grain_include_status + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + include: + - orders.status + + # ── two-element arrays ──────────────────────────────────────── + - name: amount_grain_exclude_status_id + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + exclude: + - orders.status + - orders.id + + - name: amount_grain_keep_only_status_category + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + keep_only: + - orders.status + - orders.category + + - name: amount_grain_include_status_id + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + include: + - orders.status + - orders.id + + # ── keep_only + include combination ────────────────────────── + # keep_only shrinks the inherited partition to [status]; include + # then extends the leaf grain with [id]. Outer re-aggregates over + # id back to the query grain — per-status total broadcast across + # categories. + - name: amount_grain_keep_status_include_id + sql: "{CUBE.total_amount}" + type: sum + multi_stage: true + grain: + keep_only: + - orders.status + include: + - orders.id + `); + + if (getEnv('nativeSqlPlanner')) { + // ── single-element variants ────────────────────────────────── + it('exclude: drops a dim from the partition', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_exclude_status'], + dimensions: ['orders.status', 'orders.category'], + order: [{ id: 'orders.status' }, { id: 'orders.category' }], + timezone: 'UTC', + }, [ + // exclude_status removes status from the window's PARTITION BY → each + // row gets the per-category total regardless of its status. + { orders__status: 'cancelled', orders__category: 'books', orders__total_amount: '30', orders__amount_grain_exclude_status: '250' }, + { orders__status: 'completed', orders__category: 'books', orders__total_amount: '170', orders__amount_grain_exclude_status: '250' }, + { orders__status: 'completed', orders__category: 'electronics', orders__total_amount: '200', orders__amount_grain_exclude_status: '275' }, + { orders__status: 'pending', orders__category: 'books', orders__total_amount: '50', orders__amount_grain_exclude_status: '250' }, + { orders__status: 'pending', orders__category: 'electronics', orders__total_amount: '75', orders__amount_grain_exclude_status: '275' }, + ], { joinGraph, cubeEvaluator, compiler })); + + it('keep_only: shrinks the partition to the listed dim', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_keep_only_status'], + dimensions: ['orders.status', 'orders.category'], + order: [{ id: 'orders.status' }, { id: 'orders.category' }], + timezone: 'UTC', + }, [ + // keep_only [status] reduces the partition to status only → each row + // gets the per-status total regardless of its category. + { orders__status: 'cancelled', orders__category: 'books', orders__total_amount: '30', orders__amount_grain_keep_only_status: '30' }, + { orders__status: 'completed', orders__category: 'books', orders__total_amount: '170', orders__amount_grain_keep_only_status: '370' }, + { orders__status: 'completed', orders__category: 'electronics', orders__total_amount: '200', orders__amount_grain_keep_only_status: '370' }, + { orders__status: 'pending', orders__category: 'books', orders__total_amount: '50', orders__amount_grain_keep_only_status: '125' }, + { orders__status: 'pending', orders__category: 'electronics', orders__total_amount: '75', orders__amount_grain_keep_only_status: '125' }, + ], { joinGraph, cubeEvaluator, compiler })); + + it('keep_only: dim absent from query collapses to grand total', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_keep_only_status'], + dimensions: ['orders.category'], + order: [{ id: 'orders.category' }], + timezone: 'UTC', + }, [ + // status not in the query → keep_only intersection with the query dims + // is empty → the measure collapses to a grand total (525) per row. + { orders__category: 'books', orders__total_amount: '250', orders__amount_grain_keep_only_status: '525' }, + { orders__category: 'electronics', orders__total_amount: '275', orders__amount_grain_keep_only_status: '525' }, + ], { joinGraph, cubeEvaluator, compiler })); + + it('include: extends the leaf grain, outer re-aggregates', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_include_status'], + dimensions: ['orders.category'], + order: [{ id: 'orders.category' }], + timezone: 'UTC', + }, [ + // Inner CTE groups by (category, status); outer SUMs back over status. + // SUM is associative so the per-category result equals total_amount — + // the test verifies pipeline acceptance, not a math divergence. + { orders__category: 'books', orders__total_amount: '250', orders__amount_grain_include_status: '250' }, + { orders__category: 'electronics', orders__total_amount: '275', orders__amount_grain_include_status: '275' }, + ], { joinGraph, cubeEvaluator, compiler })); + + // ── two-element arrays ─────────────────────────────────────── + it('exclude: two-element array drops both dims', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_exclude_status_id'], + dimensions: ['orders.status', 'orders.category', 'orders.id'], + order: [{ id: 'orders.status' }, { id: 'orders.category' }, { id: 'orders.id' }], + timezone: 'UTC', + }, [ + // exclude [status, id] removes both from the partition → partition is + // [category] alone → each row gets the per-category total. + { orders__status: 'cancelled', orders__category: 'books', orders__id: 5, orders__total_amount: '30', orders__amount_grain_exclude_status_id: '250' }, + { orders__status: 'completed', orders__category: 'books', orders__id: 1, orders__total_amount: '100', orders__amount_grain_exclude_status_id: '250' }, + { orders__status: 'completed', orders__category: 'books', orders__id: 6, orders__total_amount: '70', orders__amount_grain_exclude_status_id: '250' }, + { orders__status: 'completed', orders__category: 'electronics', orders__id: 2, orders__total_amount: '200', orders__amount_grain_exclude_status_id: '275' }, + { orders__status: 'pending', orders__category: 'books', orders__id: 3, orders__total_amount: '50', orders__amount_grain_exclude_status_id: '250' }, + { orders__status: 'pending', orders__category: 'electronics', orders__id: 4, orders__total_amount: '75', orders__amount_grain_exclude_status_id: '275' }, + ], { joinGraph, cubeEvaluator, compiler })); + + it('keep_only: two-element array narrows to the (status, category) cell', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_keep_only_status_category'], + dimensions: ['orders.status', 'orders.category', 'orders.id'], + order: [{ id: 'orders.status' }, { id: 'orders.category' }, { id: 'orders.id' }], + timezone: 'UTC', + }, [ + // keep_only [status, category] reduces the partition to that pair → + // each row gets the per-(status,category) total. id rows within the + // same (status, category) share the value (e.g. id=1 and id=6 both + // get 170 for (completed, books)). + { orders__status: 'cancelled', orders__category: 'books', orders__id: 5, orders__total_amount: '30', orders__amount_grain_keep_only_status_category: '30' }, + { orders__status: 'completed', orders__category: 'books', orders__id: 1, orders__total_amount: '100', orders__amount_grain_keep_only_status_category: '170' }, + { orders__status: 'completed', orders__category: 'books', orders__id: 6, orders__total_amount: '70', orders__amount_grain_keep_only_status_category: '170' }, + { orders__status: 'completed', orders__category: 'electronics', orders__id: 2, orders__total_amount: '200', orders__amount_grain_keep_only_status_category: '200' }, + { orders__status: 'pending', orders__category: 'books', orders__id: 3, orders__total_amount: '50', orders__amount_grain_keep_only_status_category: '50' }, + { orders__status: 'pending', orders__category: 'electronics', orders__id: 4, orders__total_amount: '75', orders__amount_grain_keep_only_status_category: '75' }, + ], { joinGraph, cubeEvaluator, compiler })); + + it('include: two-element array extends the leaf grain', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_include_status_id'], + dimensions: ['orders.category'], + order: [{ id: 'orders.category' }], + timezone: 'UTC', + }, [ + // Leaf grain extends to (category, status, id). Outer SUMs back to + // the per-category total — same numbers as total_amount, the test + // verifies that a two-element include list threads through. + { orders__category: 'books', orders__total_amount: '250', orders__amount_grain_include_status_id: '250' }, + { orders__category: 'electronics', orders__total_amount: '275', orders__amount_grain_include_status_id: '275' }, + ], { joinGraph, cubeEvaluator, compiler })); + + // ── keep_only + include combination ────────────────────────── + it('keep_only + include: keep narrows, include extends', async () => dbRunner.runQueryTest({ + measures: ['orders.total_amount', 'orders.amount_grain_keep_status_include_id'], + dimensions: ['orders.status', 'orders.category'], + order: [{ id: 'orders.status' }, { id: 'orders.category' }], + timezone: 'UTC', + }, [ + // keep_only [status] narrows parent grain to [status]; include [id] + // adds id to the leaf. Outer re-aggregates by (status, category) → + // per-status total broadcast across categories (completed=370 to both + // books and electronics; pending=125 to both; cancelled=30 to books). + { orders__status: 'cancelled', orders__category: 'books', orders__total_amount: '30', orders__amount_grain_keep_status_include_id: '30' }, + { orders__status: 'completed', orders__category: 'books', orders__total_amount: '170', orders__amount_grain_keep_status_include_id: '370' }, + { orders__status: 'completed', orders__category: 'electronics', orders__total_amount: '200', orders__amount_grain_keep_status_include_id: '370' }, + { orders__status: 'pending', orders__category: 'books', orders__total_amount: '50', orders__amount_grain_keep_status_include_id: '125' }, + { orders__status: 'pending', orders__category: 'electronics', orders__total_amount: '75', orders__amount_grain_keep_status_include_id: '125' }, + ], { joinGraph, cubeEvaluator, compiler })); + } else { + // These tests rely on Tesseract; v1 planner does not implement the directive. + test.skip('exclude: drops a dim from the partition', () => { expect(1).toBe(1); }); + test.skip('keep_only: shrinks the partition to the listed dim', () => { expect(1).toBe(1); }); + test.skip('keep_only: dim absent from query collapses to grand total', () => { expect(1).toBe(1); }); + test.skip('include: extends the leaf grain, outer re-aggregates', () => { expect(1).toBe(1); }); + test.skip('exclude: two-element array drops both dims', () => { expect(1).toBe(1); }); + test.skip('keep_only: two-element array narrows to the (status, category) cell', () => { expect(1).toBe(1); }); + test.skip('include: two-element array extends the leaf grain', () => { expect(1).toBe(1); }); + test.skip('keep_only + include: keep narrows, include extends', () => { expect(1).toBe(1); }); + } +}); diff --git a/packages/cubejs-schema-compiler/test/unit/cube-validator.test.ts b/packages/cubejs-schema-compiler/test/unit/cube-validator.test.ts index 77bb51c8a6028..183b44abe4fcc 100644 --- a/packages/cubejs-schema-compiler/test/unit/cube-validator.test.ts +++ b/packages/cubejs-schema-compiler/test/unit/cube-validator.test.ts @@ -762,6 +762,178 @@ describe('Cube Validation', () => { expect(validationResult.error).toBeFalsy(); }); + + it('multi-stage grain — full directive accepted', async () => { + const cubeValidator = new CubeValidator(new CubeSymbols()); + const cube = { + name: 'name', + sql: () => '', + fileName: 'fileName', + measures: { + measure_with_grain: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { + exclude: () => [], + include: () => [], + } + } + } + }; + + const validationResult = cubeValidator.validate(cube, { + error: (message: any, _e: any) => { + console.log(message); + } + } as any); + + expect(validationResult.error).toBeFalsy(); + }); + + it('multi-stage grain — partial directives accepted', async () => { + const cubeValidator = new CubeValidator(new CubeSymbols()); + const cube = { + name: 'name', + sql: () => '', + fileName: 'fileName', + measures: { + only_exclude: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { exclude: () => [] } + }, + only_keep_only: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { keepOnly: () => [] } + }, + only_include: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { include: () => [] } + }, + } + }; + + const validationResult = cubeValidator.validate(cube, { + error: (message: any, _e: any) => { + console.log(message); + } + } as any); + + expect(validationResult.error).toBeFalsy(); + }); + + it('multi-stage grain — exclude and keepOnly are mutually exclusive', async () => { + const cubeValidator = new CubeValidator(new CubeSymbols()); + const cube = { + name: 'name', + sql: () => '', + fileName: 'fileName', + measures: { + both_set: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { + exclude: () => [], + keepOnly: () => [], + } + } + } + }; + + const validationResult = cubeValidator.validate(cube, { + error: (message: any, _e: any) => { + console.log(message); + expect(message).toContain('exclude'); + expect(message).toContain('keepOnly'); + } + } as any); + + expect(validationResult.error).toBeTruthy(); + }); + + it('multi-stage grain — exclude must be a function', async () => { + const cubeValidator = new CubeValidator(new CubeSymbols()); + const cube = { + name: 'name', + sql: () => '', + fileName: 'fileName', + measures: { + bad_exclude: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { exclude: ['some_dim'] } + } + } + }; + + const validationResult = cubeValidator.validate(cube, { + error: (message: any, _e: any) => { + console.log(message); + expect(message).toContain('measures.bad_exclude.grain.exclude'); + } + } as any); + + expect(validationResult.error).toBeTruthy(); + }); + + it('multi-stage grain — include must be a function', async () => { + const cubeValidator = new CubeValidator(new CubeSymbols()); + const cube = { + name: 'name', + sql: () => '', + fileName: 'fileName', + measures: { + bad_include: { + multiStage: true, + type: 'sum', + sql: () => '', + grain: { include: ['some_dim'] } + } + } + }; + + const validationResult = cubeValidator.validate(cube, { + error: (message: any, _e: any) => { + console.log(message); + expect(message).toContain('measures.bad_include.grain.include'); + } + } as any); + + expect(validationResult.error).toBeTruthy(); + }); + + it('multi-stage grain — rejected on multi-stage dimension', async () => { + const cubeValidator = new CubeValidator(new CubeSymbols()); + const cube = { + name: 'name', + sql: () => '', + fileName: 'fileName', + dimensions: { + dim_with_grain: { + multiStage: true, + type: 'string', + sql: () => '', + grain: { include: () => [] } + } + } + }; + + const validationResult = cubeValidator.validate(cube, { + error: (message: any, _e: any) => { + console.log(message); + } + } as any); + + expect(validationResult.error).toBeTruthy(); + }); }); it('OriginalSqlSchema', async () => { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs index 5e96209f192eb..b9b54f3b5a5ef 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs @@ -2,6 +2,7 @@ use super::case_variant::CaseVariant; use super::member_order_by::{MemberOrderBy, NativeMemberOrderBy}; use super::member_sql::{MemberSql, NativeMemberSql}; use super::multi_stage_filter::{MultiStageFilterReferences, NativeMultiStageFilterReferences}; +use super::multi_stage_grain::{MultiStageGrainReferences, NativeMultiStageGrainReferences}; use super::struct_with_sql_member::{NativeStructWithSqlMember, StructWithSqlMember}; use cubenativeutils::wrappers::serializer::{ NativeDeserialize, NativeDeserializer, NativeSerialize, @@ -68,6 +69,9 @@ pub trait MeasureDefinition { #[nbridge(field, optional)] fn filter(&self) -> Result>, CubeError>; + #[nbridge(field, optional)] + fn grain(&self) -> Result>, CubeError>; + #[nbridge(field, optional, vec)] fn drill_filters(&self) -> Result>>, CubeError>; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs index a6d22a6c9f8d3..19759f1796dcd 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs @@ -34,6 +34,7 @@ pub mod member_expression; pub mod member_order_by; pub mod member_sql; pub mod multi_stage_filter; +pub mod multi_stage_grain; pub mod options_member; pub mod pre_aggregation_description; pub mod pre_aggregation_obj; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/multi_stage_grain.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/multi_stage_grain.rs new file mode 100644 index 0000000000000..514a087696114 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/cube_bridge/multi_stage_grain.rs @@ -0,0 +1,20 @@ +use cubenativeutils::wrappers::serializer::{NativeDeserialize, NativeSerialize}; +use cubenativeutils::wrappers::NativeContextHolder; +use cubenativeutils::wrappers::NativeObjectHandle; +use cubenativeutils::CubeError; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::rc::Rc; + +#[derive(Serialize, Deserialize, Debug, Clone, nativebridge::NativeBridgeStatic)] +pub struct MultiStageGrainReferencesStatic { + #[serde(rename = "excludeReferences")] + pub exclude: Option>, + #[serde(rename = "keepOnlyReferences")] + pub keep_only: Option>, + #[serde(rename = "includeReferences")] + pub include: Option>, +} + +#[nativebridge::native_bridge(MultiStageGrainReferencesStatic, with_static_meta)] +pub trait MultiStageGrainReferences {} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs index 6b6089eb47880..ba2a6217ad3c2 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/dimension.rs @@ -48,7 +48,10 @@ impl MultiStageDimensionCalculation { pub fn join_dimensions(&self) -> Result>, CubeError> { let mut result = if let Ok(dimension) = self.multi_stage_dimension.as_dimension() { - dimension.add_group_by().cloned().unwrap_or_default() + dimension + .multi_stage() + .and_then(|m| m.grain.include.clone()) + .unwrap_or_default() } else { vec![] }; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/collectors/join_hints_collector.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/collectors/join_hints_collector.rs index e93dc9a872af5..121fcd4adaeef 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/collectors/join_hints_collector.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/collectors/join_hints_collector.rs @@ -28,8 +28,8 @@ impl TraversalVisitor for JoinHintsCollector { ) -> Result, CubeError> { if node.is_multi_stage() { if let Ok(dim) = node.as_dimension() { - if let Some(add_group_by) = dim.add_group_by() { - for item in add_group_by.iter() { + if let Some(include) = dim.multi_stage().and_then(|m| m.grain.include.as_ref()) { + for item in include.iter() { self.apply(item, &())?; } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs index 0f0d652e7952f..9a429eecdc96b 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs @@ -1,4 +1,4 @@ -use crate::planner::{MeasureTimeShifts, MemberSymbol}; +use crate::planner::{MeasureTimeShifts, MemberSymbol, MultiStageGrain}; use std::rc::Rc; /// Description of the time-series CTE driving a rolling-window @@ -113,15 +113,12 @@ pub enum MultiStageInodeMemberType { /// Non-leaf node in a multi-stage tree. Bundles the semantic /// `inode_type` (Rank / Aggregate / Calculate / Dimension / -/// RollingWindow) with the partition-shaping flags driven by the -/// measure's data-model directives: `reduce_by`, `add_group_by`, -/// `group_by`, `time_shift`. +/// RollingWindow) with the partition-shaping `grain` carried over from +/// the measure's data-model directives and an optional `time_shift`. #[derive(Clone)] pub struct MultiStageInodeMember { inode_type: MultiStageInodeMemberType, - reduce_by: Vec>, - add_group_by: Vec>, - group_by: Option>>, + grain: MultiStageGrain, time_shift: Option, /// Optimisation flag: this Aggregate inode is a safe candidate for /// the `window`-based render — single measure dep, additive identity @@ -134,16 +131,12 @@ pub struct MultiStageInodeMember { impl MultiStageInodeMember { pub fn new( inode_type: MultiStageInodeMemberType, - reduce_by: Vec>, - add_group_by: Vec>, - group_by: Option>>, + grain: MultiStageGrain, time_shift: Option, ) -> Self { Self { inode_type, - reduce_by, - add_group_by, - group_by, + grain, time_shift, use_window_path: false, } @@ -162,30 +155,8 @@ impl MultiStageInodeMember { &self.inode_type } - pub fn reduce_by(&self) -> Vec { - self.reduce_by.iter().map(|s| s.full_name()).collect() - } - - pub fn add_group_by(&self) -> Vec { - self.add_group_by.iter().map(|s| s.full_name()).collect() - } - - pub fn reduce_by_symbols(&self) -> &Vec> { - &self.reduce_by - } - - pub fn add_group_by_symbols(&self) -> &Vec> { - &self.add_group_by - } - - pub fn group_by(&self) -> Option> { - self.group_by - .as_ref() - .map(|g| g.iter().map(|s| s.full_name()).collect()) - } - - pub fn group_by_symbols(&self) -> &Option>> { - &self.group_by + pub fn grain(&self) -> &MultiStageGrain { + &self.grain } pub fn time_shift(&self) -> &Option { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs index bbb86fa306745..f43d5170bbf8d 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs @@ -7,6 +7,7 @@ use crate::planner::planners::{multi_stage::RollingWindowType, QueryPlanner, Sim use crate::planner::query_tools::QueryTools; use crate::planner::GranularityHelper; use crate::planner::MemberSymbol; +use crate::planner::MultiStageGrain; use crate::planner::{OrderByItem, QueryProperties}; use cubenativeutils::CubeError; @@ -203,10 +204,7 @@ impl MultiStageMemberQueryPlanner { &self, multi_stage_member: &MultiStageInodeMember, ) -> Result, CubeError> { - let partition_by = self.member_partition_by_logical( - &multi_stage_member.reduce_by_symbols(), - &multi_stage_member.group_by_symbols(), - ); + let partition_by = self.member_partition_by_logical(multi_stage_member.grain()); // Rank always uses a window function. Aggregate inodes are // routed through `FullKeyAggregate` by default; only the narrow @@ -520,24 +518,20 @@ impl MultiStageMemberQueryPlanner { .collect_vec() } - fn member_partition_by_logical( - &self, - reduce_by: &Vec>, - group_by: &Option>>, - ) -> Vec> { + fn member_partition_by_logical(&self, grain: &MultiStageGrain) -> Vec> { let dimensions = self.all_dimensions(); - let dimensions = if !reduce_by.is_empty() { + let dimensions = if let Some(exclude) = &grain.exclude { dimensions .into_iter() - .filter(|d| !reduce_by.iter().any(|m| d.has_member_in_reference_chain(m))) + .filter(|d| !exclude.iter().any(|m| d.has_member_in_reference_chain(m))) .collect_vec() } else { dimensions }; - let dimensions = if let Some(group_by) = group_by { + let dimensions = if let Some(keep_only) = &grain.keep_only { dimensions .into_iter() - .filter(|d| group_by.iter().any(|m| d.has_member_in_reference_chain(m))) + .filter(|d| keep_only.iter().any(|m| d.has_member_in_reference_chain(m))) .collect_vec() } else { dimensions diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs index ff2aa64584056..8ce7f3895d6eb 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs @@ -22,6 +22,7 @@ use crate::planner::MeasureKind; use crate::planner::MemberSymbol; use crate::planner::MultiStageFilter; use crate::planner::MultiStageFilterMode; +use crate::planner::MultiStageGrain; use crate::planner::QueryProperties; use cubenativeutils::CubeError; use indexmap::IndexMap; @@ -141,10 +142,9 @@ impl MultiStageQueryPlanner { /// Classifies `base_member` into a `MultiStageInodeMember` — picks /// the inode kind (Rank / Aggregate / Calculate for a measure, - /// Dimension for a dimension) and pulls the partition-shaping - /// flags (`reduce_by`, `add_group_by`, `group_by`, `time_shift`) - /// out of the data-model definition. Returns the inode together - /// with the leaf's `is_ungrupped` flag. + /// Dimension for a dimension) and carries over the partition-shaping + /// `grain` and optional `time_shift` from the data-model definition. + /// Returns the inode together with the leaf's `is_ungrupped` flag. fn create_multi_stage_inode_member( &self, base_member: Rc, @@ -164,39 +164,35 @@ impl MultiStageQueryPlanner { _ => self.query_properties.ungrouped(), }; - let reduce_by = measure.reduce_by().cloned().unwrap_or_default(); - let add_group_by = measure.add_group_by().cloned().unwrap_or_default(); - let group_by = measure.group_by().cloned(); + let grain = measure + .multi_stage() + .map(|ms| ms.grain.clone()) + .unwrap_or_default(); + // Window-path eligibility intentionally checks only `include`: + // `exclude` and `keep_only` are realised through the window's + // PARTITION BY at render time, so they don't disqualify the + // path. `include` extends the leaf grain, which the JOIN-model + // is required for. Revisit if window-path expands to cases + // where exclude/keep_only affect render correctness. + let has_include = grain.include.as_ref().is_some_and(|v| !v.is_empty()); let use_window_path = matches!(member_type, MultiStageInodeMemberType::Aggregate) - && add_group_by.is_empty() + && !has_include && Self::is_window_path_eligible(&base_member); ( - MultiStageInodeMember::new( - member_type, - reduce_by, - add_group_by, - group_by, - time_shift, - ) - .with_use_window_path(use_window_path), + MultiStageInodeMember::new(member_type, grain, time_shift) + .with_use_window_path(use_window_path), is_ungrupped, ) } else { - let add_group_by = if let Ok(dimension) = base_member.as_dimension() { - dimension.add_group_by().cloned().unwrap_or_default() - } else { - vec![] - }; + let grain = base_member + .as_dimension() + .ok() + .and_then(|d| d.multi_stage().map(|ms| ms.grain.clone())) + .unwrap_or_default(); resolved_multi_stage_dimensions .insert(base_member.clone().resolve_reference_chain().full_name()); ( - MultiStageInodeMember::new( - MultiStageInodeMemberType::Dimension, - vec![], - add_group_by, - None, - None, - ), + MultiStageInodeMember::new(MultiStageInodeMemberType::Dimension, grain, None), false, ) }; @@ -278,31 +274,29 @@ impl MultiStageQueryPlanner { } } - /// Mirror of `MultiStageMemberQueryPlanner::member_partition_by_logical`: - /// drops `reduce_by` dims and (when `group_by` is set) keeps only the - /// dims explicitly listed. Used at planning time to decide whether - /// reduce_by / group_by actually shrinks the partition vs the leaf - /// grain. + /// Applies the partition-shaping part of `grain` to a parent-state + /// dimension list: `exclude` removes matching dims, then `keep_only` + /// intersects what's left. `include` is appended outside this helper + /// via `add_dimensions`. /// /// FIXME: merge with `MultiStageMemberQueryPlanner::member_partition_by_logical` - /// — both apply the same reduce_by/group_by reshape on different inputs; - /// keeping two copies invites silent drift when only one is updated. + /// — both apply the same grain reshape on different inputs; keeping two + /// copies invites silent drift when only one is updated. fn partition_filter( dims: &Vec>, - reduce_by: &Vec>, - group_by: &Option>>, + grain: &MultiStageGrain, ) -> Vec> { - let dims: Vec> = if !reduce_by.is_empty() { + let dims: Vec> = if let Some(exclude) = &grain.exclude { dims.iter() - .filter(|d| !reduce_by.iter().any(|m| d.has_member_in_reference_chain(m))) + .filter(|d| !exclude.iter().any(|m| d.has_member_in_reference_chain(m))) .cloned() .collect() } else { dims.clone() }; - if let Some(group_by) = group_by { + if let Some(keep_only) = &grain.keep_only { dims.into_iter() - .filter(|d| group_by.iter().any(|m| d.has_member_in_reference_chain(m))) + .filter(|d| keep_only.iter().any(|m| d.has_member_in_reference_chain(m))) .collect() } else { dims @@ -453,9 +447,8 @@ impl MultiStageQueryPlanner { /// already-built descriptions, tries a rolling-window path /// (`try_plan_rolling_window`), and otherwise returns either a /// leaf `Measure` or an inode description whose children come - /// from `make_childs`. Adjusts the state for inodes with any - /// `add_group_by`, time-shift or per-member filter changes the - /// inode demands. + /// from `make_childs`. Adjusts the state for any grain reshape, + /// time-shift or per-member filter changes the inode demands. fn make_queries_descriptions( &self, member: Rc, @@ -511,7 +504,11 @@ impl MultiStageQueryPlanner { let (multi_stage_member, is_ungrupped) = self .create_multi_stage_inode_member(member.clone(), resolved_multi_stage_dimensions)?; - let mut dimensions_to_add = multi_stage_member.add_group_by_symbols().clone(); + let mut dimensions_to_add = multi_stage_member + .grain() + .include + .clone() + .unwrap_or_default(); if let Some(case) = member.case() { if let Some(switch_dim) = case.case_switch_dimension() { @@ -526,16 +523,16 @@ impl MultiStageQueryPlanner { // 1. filter directive — pick `state` (Relative/None) or // `root_state` (Fixed) as the base and apply exclude / // keep_only / include against it. - // 2. reduce_by / group_by — shrink parent grain to the - // partition grain implied by directives. - // 3. add_group_by — extend the result with extra leaf dims. + // 2. grain.exclude / grain.keep_only — shrink parent grain to + // the partition grain implied by the directive. + // 3. grain.include — extend the result with extra leaf dims. // 4. time_shift / filter cleanup. - // Step 2 must precede step 3: `group_by` is a keep-only filter - // and would silently drop dims that step 3 needs to introduce. + // Step 2 must precede step 3: `keep_only` is an intersection and + // would silently drop dims that step 3 needs to introduce. // // The window-path Aggregate inode skips step 2: the leaf stays - // at the parent state plus any add_group_by extension, and the - // window function does the reduce_by collapse at outer level. + // at the parent state plus any `include` extension, and the + // window function does the `exclude` collapse at outer level. let use_window_path = multi_stage_member.use_window_path(); let new_state = { let mut new_state = match directive_filter.as_ref().map(|f| &f.mode) { @@ -553,12 +550,9 @@ impl MultiStageQueryPlanner { MultiStageInodeMemberType::Aggregate ) { - let reduce_by = multi_stage_member.reduce_by_symbols().clone(); - let group_by = multi_stage_member.group_by_symbols().clone(); - let dims = - Self::partition_filter(new_state.dimensions(), &reduce_by, &group_by); - let time_dims = - Self::partition_filter(new_state.time_dimensions(), &reduce_by, &group_by); + let grain = multi_stage_member.grain(); + let dims = Self::partition_filter(new_state.dimensions(), grain); + let time_dims = Self::partition_filter(new_state.time_dimensions(), grain); new_state.set_dimensions(dims); new_state.set_time_dimensions(time_dims); } @@ -589,9 +583,9 @@ impl MultiStageQueryPlanner { // build keys-side descriptions per child on the parent state // so the FullKeyAggregate can broadcast measure values back // to the full query grain. Window-path Aggregate inodes - // (sum-of-sum / sum-of-count without add_group_by) handle - // broadcast via the window expression instead and don't need - // keys_input. + // (sum-of-sum / sum-of-count with no leaf-extending `include`) + // handle broadcast via the window expression instead and don't + // need keys_input. let mut keys_input: Vec> = vec![]; if !use_window_path { let new_state_has = |sym: &Rc| { @@ -788,9 +782,7 @@ impl MultiStageQueryPlanner { let inode_member = MultiStageInodeMember::new( MultiStageInodeMemberType::RollingWindow(rolling_window_descr), - vec![], - vec![], - None, + MultiStageGrain::default(), None, ); diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs index c2314f8539063..a391c1c7916a2 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs @@ -16,7 +16,7 @@ pub struct MultiStageQueryDescription { input: Vec>, /// Dim-grid sources for the JOIN-based assembly. Empty for the /// window-based path. Populated by `make_queries_descriptions` when - /// reduce_by / group_by actually shrinks the partition grain vs the + /// the grain reshape actually shrinks the partition grain vs the /// leaf grain — in that case `input` is rebuilt at partition grain /// and the original full-grain inputs move here as keys. keys_input: Vec>, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/common/multi_stage.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/common/multi_stage.rs index e632244c5a666..4f64597c10ee0 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/common/multi_stage.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/common/multi_stage.rs @@ -1,7 +1,8 @@ use super::super::measure_symbol::MeasureTimeShifts; use super::super::MemberSymbol; use crate::cube_bridge::dimension_definition::DimensionDefinition; -use crate::cube_bridge::measure_definition::MeasureDefinition; +use crate::cube_bridge::measure_definition::{MeasureDefinition, MeasureDefinitionStatic}; +use crate::cube_bridge::multi_stage_grain::MultiStageGrainReferences; use crate::planner::filter::compiler::FilterCompiler; use crate::planner::filter::FilterItem; use crate::planner::Compiler; @@ -51,12 +52,21 @@ pub struct MultiStageFilter { pub include_measure: Vec, } +/// Set operation on the inherited grain context of a multi-stage member. +/// +/// The three lists mutate the parent grain — `exclude` removes, +/// `keep_only` intersects, `include` adds. +#[derive(Clone, Default)] +pub struct MultiStageGrain { + pub exclude: Option>>, + pub keep_only: Option>>, + pub include: Option>>, +} + #[derive(Clone)] pub struct MultiStageProperties { - pub add_group_by: Option>>, + pub grain: MultiStageGrain, pub filter: Option, - pub reduce_by: Option>>, - pub group_by: Option>>, pub time_shift: Option, } @@ -71,17 +81,16 @@ impl MultiStageProperties { return Ok(None); } - let static_data = definition.static_data(); - let reduce_by = resolve_reference_paths(&static_data.reduce_by_references, compiler)?; - let add_group_by = resolve_reference_paths(&static_data.add_group_by_references, compiler)?; - let group_by = resolve_reference_paths(&static_data.group_by_references, compiler)?; + let grain = match definition.grain()? { + Some(g) => build_grain_from_directive(g, compiler)?, + None => build_grain_from_legacy(&definition.static_data(), compiler)?, + }; + let filter = build_filter(cube_name, definition.filter()?, compiler)?; Ok(Some(Self { - add_group_by, + grain, filter, - reduce_by, - group_by, time_shift, })) } @@ -95,15 +104,16 @@ impl MultiStageProperties { return Ok(None); } - let add_group_by = + let include = resolve_reference_paths(&definition.static_data().add_group_by_references, compiler)?; let filter = build_filter(cube_name, definition.filter()?, compiler)?; Ok(Some(Self { - add_group_by, + grain: MultiStageGrain { + include, + ..Default::default() + }, filter, - reduce_by: None, - group_by: None, time_shift: None, })) } @@ -134,11 +144,15 @@ impl MultiStageProperties { None => None, }; + let grain = MultiStageGrain { + exclude: map_refs(&self.grain.exclude)?, + keep_only: map_refs(&self.grain.keep_only)?, + include: map_refs(&self.grain.include)?, + }; + Ok(Self { - add_group_by: map_refs(&self.add_group_by)?, + grain, filter, - reduce_by: map_refs(&self.reduce_by)?, - group_by: map_refs(&self.group_by)?, time_shift: self.time_shift.clone(), }) } @@ -160,6 +174,34 @@ fn resolve_reference_paths( } } +fn build_grain_from_directive( + grain: Rc, + compiler: &mut Compiler, +) -> Result { + let static_data = grain.static_data(); + if static_data.exclude.is_some() && static_data.keep_only.is_some() { + return Err(CubeError::user( + "Multi-stage grain cannot specify both `exclude` and `keep_only` — they are mutually exclusive ways of restricting the inherited context.".to_string(), + )); + } + Ok(MultiStageGrain { + exclude: resolve_reference_paths(&static_data.exclude, compiler)?, + keep_only: resolve_reference_paths(&static_data.keep_only, compiler)?, + include: resolve_reference_paths(&static_data.include, compiler)?, + }) +} + +fn build_grain_from_legacy( + static_data: &MeasureDefinitionStatic, + compiler: &mut Compiler, +) -> Result { + Ok(MultiStageGrain { + exclude: resolve_reference_paths(&static_data.reduce_by_references, compiler)?, + keep_only: resolve_reference_paths(&static_data.group_by_references, compiler)?, + include: resolve_reference_paths(&static_data.add_group_by_references, compiler)?, + }) +} + fn build_filter( _cube_name: &String, filter: Option>, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs index 70530edee2edd..9db8ddcad6f79 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs @@ -176,12 +176,6 @@ impl DimensionSymbol { &self.mask_sql } - pub fn add_group_by(&self) -> Option<&Vec>> { - self.multi_stage - .as_ref() - .and_then(|m| m.add_group_by.as_ref()) - } - pub fn dimension_type(&self) -> &str { self.kind.dimension_type_str() } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/measure_symbol.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/measure_symbol.rs index 79b19c94149d2..c10feac1c58d4 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/measure_symbol.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/measure_symbol.rs @@ -460,20 +460,6 @@ impl MeasureSymbol { &self.measure_order_by } - pub fn reduce_by(&self) -> Option<&Vec>> { - self.multi_stage.as_ref().and_then(|m| m.reduce_by.as_ref()) - } - - pub fn add_group_by(&self) -> Option<&Vec>> { - self.multi_stage - .as_ref() - .and_then(|m| m.add_group_by.as_ref()) - } - - pub fn group_by(&self) -> Option<&Vec>> { - self.multi_stage.as_ref().and_then(|m| m.group_by.as_ref()) - } - pub fn multi_stage(&self) -> Option<&MultiStageProperties> { self.multi_stage.as_ref() } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_measure_definition.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_measure_definition.rs index bf8b11c92efbb..5e9ef880c22bc 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_measure_definition.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_measure_definition.rs @@ -5,11 +5,13 @@ use crate::cube_bridge::measure_definition::{ use crate::cube_bridge::member_order_by::MemberOrderBy; use crate::cube_bridge::member_sql::MemberSql; use crate::cube_bridge::multi_stage_filter::MultiStageFilterReferences; +use crate::cube_bridge::multi_stage_grain::MultiStageGrainReferences; use crate::cube_bridge::struct_with_sql_member::StructWithSqlMember; use crate::impl_static_data; use crate::test_fixtures::cube_bridge::yaml::measure::YamlMeasureDefinition; use crate::test_fixtures::cube_bridge::{ - MockMemberOrderBy, MockMemberSql, MockMultiStageFilterReferences, MockStructWithSqlMember, + MockMemberOrderBy, MockMemberSql, MockMultiStageFilterReferences, + MockMultiStageGrainReferences, MockStructWithSqlMember, }; use cubenativeutils::CubeError; use std::any::Any; @@ -45,6 +47,8 @@ pub struct MockMeasureDefinition { #[builder(default)] filter: Option>, #[builder(default)] + grain: Option>, + #[builder(default)] order_by: Option>>, #[builder(default, setter(strip_option(fallback = resolved_mask_sql_opt)))] resolved_mask_sql: Option, @@ -141,6 +145,17 @@ impl MeasureDefinition for MockMeasureDefinition { .map(|f| f.clone() as Rc)) } + fn has_grain(&self) -> Result { + Ok(self.grain.is_some()) + } + + fn grain(&self) -> Result>, CubeError> { + Ok(self + .grain + .as_ref() + .map(|g| g.clone() as Rc)) + } + fn has_order_by(&self) -> Result { Ok(self.order_by.is_some()) } @@ -343,6 +358,76 @@ mod tests { ); } + #[test] + fn test_from_yaml_with_grain() { + let yaml = indoc! {" + type: sum + sql: \"{CUBE.amount}\" + multi_stage: true + grain: + include: + - region + - category + "}; + + let measure = MockMeasureDefinition::from_yaml(yaml).unwrap(); + + assert!(measure.has_grain().unwrap()); + let grain = measure.grain().unwrap().expect("grain present"); + let static_data = grain.static_data(); + assert_eq!( + static_data.include, + Some(vec!["region".to_string(), "category".to_string()]) + ); + assert_eq!(static_data.exclude, None); + assert_eq!(static_data.keep_only, None); + } + + #[test] + fn test_from_yaml_with_grain_partial() { + let yaml = indoc! {" + type: sum + sql: \"{CUBE.amount}\" + multi_stage: true + grain: + exclude: + - region + "}; + + let measure = MockMeasureDefinition::from_yaml(yaml).unwrap(); + let grain = measure.grain().unwrap().expect("grain present"); + let static_data = grain.static_data(); + + assert_eq!(static_data.exclude, Some(vec!["region".to_string()])); + assert_eq!(static_data.keep_only, None); + assert_eq!(static_data.include, None); + } + + #[test] + fn test_from_yaml_with_grain_qualifies_references() { + let yaml = indoc! {" + type: sum + sql: \"{CUBE.amount}\" + multi_stage: true + grain: + include: + - region + - other_cube.dim + "}; + + let yaml_def: YamlMeasureDefinition = serde_yaml::from_str(yaml).unwrap(); + let measure = yaml_def.build_with_cube_name(Some("orders")); + let grain = measure.grain().unwrap().expect("grain present"); + + assert_eq!( + grain.static_data().include, + Some(vec![ + "orders.region".to_string(), + "other_cube.dim".to_string() + ]) + ); + } + #[test] fn test_from_yaml_with_case() { let yaml = indoc! {" diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_multi_stage_grain.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_multi_stage_grain.rs new file mode 100644 index 0000000000000..f04081b2687a5 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_multi_stage_grain.rs @@ -0,0 +1,33 @@ +use crate::cube_bridge::multi_stage_grain::{ + MultiStageGrainReferences, MultiStageGrainReferencesStatic, +}; +use crate::impl_static_data; +use std::any::Any; +use std::rc::Rc; +use typed_builder::TypedBuilder; + +#[derive(TypedBuilder)] +pub struct MockMultiStageGrainReferences { + #[builder(default)] + exclude: Option>, + #[builder(default)] + keep_only: Option>, + #[builder(default)] + include: Option>, +} + +impl_static_data!( + MockMultiStageGrainReferences, + MultiStageGrainReferencesStatic, + exclude, + keep_only, + include +); + +impl MultiStageGrainReferences for MockMultiStageGrainReferences { + crate::impl_static_data_method!(MultiStageGrainReferencesStatic); + + fn as_any(self: Rc) -> Rc { + self + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mod.rs index d3d6b74cb2eae..e2fca83a923d6 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mod.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mod.rs @@ -27,6 +27,7 @@ mod mock_member_expression_definition; mod mock_member_order_by; mod mock_member_sql; mod mock_multi_stage_filter; +mod mock_multi_stage_grain; mod mock_pre_aggregation_description; mod mock_pre_aggregation_obj; mod mock_pre_aggregation_time_dimension; @@ -64,6 +65,7 @@ pub use mock_member_expression_definition::MockMemberExpressionDefinition; pub use mock_member_order_by::MockMemberOrderBy; pub use mock_member_sql::MockMemberSql; pub use mock_multi_stage_filter::MockMultiStageFilterReferences; +pub use mock_multi_stage_grain::MockMultiStageGrainReferences; pub use mock_pre_aggregation_description::MockPreAggregationDescription; pub use mock_pre_aggregation_time_dimension::MockPreAggregationTimeDimension; pub use mock_schema::{MockSchema, MockSchemaBuilder}; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/measure.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/measure.rs index d7775c848d8fe..428dc4716d031 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/measure.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/measure.rs @@ -5,7 +5,7 @@ use crate::test_fixtures::cube_bridge::yaml::case::YamlCaseVariant; use crate::test_fixtures::cube_bridge::yaml::mask::YamlMask; use crate::test_fixtures::cube_bridge::{ MockMeasureDefinition, MockMemberOrderBy, MockMultiStageFilterReferences, - MockStructWithSqlMember, + MockMultiStageGrainReferences, MockStructWithSqlMember, }; use serde::Deserialize; use std::rc::Rc; @@ -29,6 +29,8 @@ pub struct YamlMeasureDefinition { #[serde(default)] filter: Option, #[serde(default)] + grain: Option, + #[serde(default)] sql: Option, #[serde(default)] case: Option, @@ -122,6 +124,28 @@ impl YamlMultiStageFilter { } } +#[derive(Debug, Deserialize)] +pub struct YamlMultiStageGrain { + #[serde(default)] + exclude: Option>, + #[serde(default)] + keep_only: Option>, + #[serde(default)] + include: Option>, +} + +impl YamlMultiStageGrain { + pub(super) fn build(self, cube_name: Option<&str>) -> Rc { + Rc::new( + MockMultiStageGrainReferences::builder() + .exclude(qualify_references(self.exclude, cube_name)) + .keep_only(qualify_references(self.keep_only, cube_name)) + .include(qualify_references(self.include, cube_name)) + .build(), + ) + } +} + #[derive(Debug, Deserialize)] struct YamlFilter { sql: String, @@ -197,6 +221,7 @@ impl YamlMeasureDefinition { }; let filter = self.filter.map(|f| f.build(cube_name)); + let grain = self.grain.map(|g| g.build(cube_name)); Rc::new( MockMeasureDefinition::builder() @@ -215,6 +240,7 @@ impl YamlMeasureDefinition { .filters(filters) .drill_filters(drill_filters) .filter(filter) + .grain(grain) .order_by(order_by) .resolved_mask_sql_opt(self.mask.map(|m| m.to_sql_string())) .build(), diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml index d3bdb54bea748..30d8ace1d9ad9 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml @@ -221,6 +221,14 @@ cubes: - orders.status - orders.created_at.month + - name: amount_grain_keep_only_status + type: sum + sql: "{CUBE.total_amount}" + multi_stage: true + grain: + keep_only: + - orders.status + - name: amount_prev_month type: number sql: "{CUBE.total_amount}" diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_filter.yaml b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_filter.yaml index 60f61e6fa2bdf..8de1de5e0ec3b 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_filter.yaml +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_filter.yaml @@ -66,3 +66,14 @@ cubes: - member: orders.status operator: equals values: [completed] + - name: revenue_with_grain + type: number + sql: "{CUBE.revenue}" + multi_stage: true + add_group_by: + - status + grain: + exclude: + - status + include: + - city diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/group_by.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/group_by.rs index fdf8880297dd6..96e4d12145419 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/group_by.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/group_by.rs @@ -98,3 +98,26 @@ async fn test_group_by_equals_query_dims() { insta::assert_snapshot!(result); } } + +// `grain.keep_only: [status]` narrows the partition to `[status]`; the +// query has only `category`, so the measure value is the per-status total +// broadcast across categories. +#[tokio::test(flavor = "multi_thread")] +async fn test_grain_keep_only_status_top_level() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.amount_grain_keep_only_status + dimensions: + - orders.category + order: + - id: orders.category + "#}; + + ctx.build_sql(query).unwrap(); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__group_by__grain_keep_only_status_top_level.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__group_by__grain_keep_only_status_top_level.snap new file mode 100644 index 0000000000000..5b34525bb0089 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__group_by__grain_keep_only_status_top_level.snap @@ -0,0 +1,10 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/group_by.rs +assertion_line: 121 +expression: result +--- +orders__category | orders__amount_grain_keep_only_status +-----------------+-------------------------------------- +books | 2250.00 +clothing | 2250.00 +electronics | 2250.00 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/measure_symbol.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/measure_symbol.rs index 594d78d17327a..c390cc4fb8fd2 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/measure_symbol.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/measure_symbol.rs @@ -490,13 +490,15 @@ mod multi_stage { assert!(measure.is_multi_stage()); let ms = measure.multi_stage().expect("multi_stage present"); - let reduce_by = ms.reduce_by.as_ref().expect("reduce_by"); - assert_eq!(reduce_by.len(), 1); - assert_eq!(reduce_by[0].full_name(), "orders.status"); + let exclude = ms.grain.exclude.as_ref().expect("exclude"); + assert_eq!(exclude.len(), 1); + assert_eq!(exclude[0].full_name(), "orders.status"); + + let include = ms.grain.include.as_ref().expect("include"); + assert_eq!(include.len(), 1); + assert_eq!(include[0].full_name(), "orders.city"); - let add_group_by = ms.add_group_by.as_ref().expect("add_group_by"); - assert_eq!(add_group_by.len(), 1); - assert_eq!(add_group_by[0].full_name(), "orders.city"); + assert!(ms.grain.keep_only.is_none()); } #[test] @@ -531,9 +533,9 @@ mod multi_stage { assert!(dim.is_multi_stage()); let ms = dim.multi_stage().expect("multi_stage present"); - let add_group_by = ms.add_group_by.as_ref().expect("add_group_by"); - assert_eq!(add_group_by.len(), 1); - assert_eq!(add_group_by[0].full_name(), "orders.status"); + let include = ms.grain.include.as_ref().expect("include"); + assert_eq!(include.len(), 1); + assert_eq!(include[0].full_name(), "orders.status"); let filter = ms.filter.as_ref().expect("filter present"); assert_eq!(filter.mode, MultiStageFilterMode::Relative); @@ -544,8 +546,8 @@ mod multi_stage { assert_eq!(filter.include_dimension.len(), 1); assert!(filter.include_measure.is_empty()); assert!(filter.include_time_dimension.is_empty()); - assert!(ms.reduce_by.is_none()); - assert!(ms.group_by.is_none()); + assert!(ms.grain.exclude.is_none()); + assert!(ms.grain.keep_only.is_none()); } #[test] @@ -556,8 +558,6 @@ mod multi_stage { assert!(!measure.is_multi_stage()); assert!(measure.multi_stage().is_none()); - assert!(measure.reduce_by().is_none()); - assert!(measure.add_group_by().is_none()); } #[test] @@ -571,6 +571,38 @@ mod multi_stage { assert_eq!(filter.mode, MultiStageFilterMode::Relative); } + #[test] + fn legacy_fields_populate_grain() { + let ctx = ctx(); + let m = ctx.create_measure("orders.revenue_filtered").unwrap(); + let measure = m.as_measure().unwrap(); + let ms = measure.multi_stage().expect("multi_stage present"); + + let include = ms.grain.include.as_ref().expect("include"); + assert_eq!(include[0].full_name(), "orders.city"); + let exclude = ms.grain.exclude.as_ref().expect("exclude"); + assert_eq!(exclude[0].full_name(), "orders.status"); + assert!(ms.grain.keep_only.is_none()); + } + + #[test] + fn grain_directive_overrides_legacy_fields() { + let ctx = ctx(); + let m = ctx.create_measure("orders.revenue_with_grain").unwrap(); + let measure = m.as_measure().unwrap(); + let ms = measure.multi_stage().expect("multi_stage present"); + + let exclude = ms.grain.exclude.as_ref().expect("exclude"); + assert_eq!(exclude.len(), 1); + assert_eq!(exclude[0].full_name(), "orders.status"); + + let include = ms.grain.include.as_ref().expect("include"); + assert_eq!(include.len(), 1); + assert_eq!(include[0].full_name(), "orders.city"); + + assert!(ms.grain.keep_only.is_none()); + } + #[test] fn measure_filter_keep_only_and_exclude_mutually_exclusive() { let schema = MockSchema::from_yaml_file("common/multi_stage_filter_invalid.yaml");