diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs index 6d6a8d1fbaf57..9a21f8f37827d 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs @@ -39,6 +39,40 @@ impl PrettyPrint for MultiStageSubqueryRef { } } +/// Dim-grid source for the JOIN-based assembly: a set of CTE refs that +/// supply the keys grid. Each ref's own logical schema declares the +/// output (leaf) grain; the FKA strategy derives JOIN-keys against +/// `multi_stage_subquery_refs` from the intersection of schemas. +#[derive(Clone, TypedBuilder)] +pub struct FullKeyAggregateKeysInput { + #[builder(default)] + refs: Vec>, +} + +impl FullKeyAggregateKeysInput { + pub fn refs(&self) -> &Vec> { + &self.refs + } + + pub fn is_empty(&self) -> bool { + self.refs.is_empty() + } +} + +impl PrettyPrint for FullKeyAggregateKeysInput { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("FullKeyAggregateKeysInput:", state); + let inner = state.new_level(); + if !self.refs.is_empty() { + result.println("refs:", &inner); + let details = inner.new_level(); + for r in self.refs.iter() { + r.pretty_print(result, &details); + } + } + } +} + /// Top-level aggregating source that stitches together several /// multi-stage / multi-fact CTEs into one keyed result. The /// physical builder picks a join strategy from `multi_stage_subquery_refs` @@ -49,6 +83,12 @@ pub struct FullKeyAggregate { use_full_join_and_coalesce: bool, #[builder(default)] multi_stage_subquery_refs: Vec>, + /// Optional dim-grid input to LEFT JOIN measure-side refs against. + /// `None` when keys-side can be derived from the measure refs; + /// populated for the JOIN-based path (measure refs sit at partition + /// grain, dim grid lives at leaf grain). + #[builder(default)] + keys_input: Option>, } impl FullKeyAggregate { @@ -67,6 +107,10 @@ impl FullKeyAggregate { &self.multi_stage_subquery_refs } + pub fn keys_input(&self) -> Option<&Rc> { + self.keys_input.as_ref() + } + pub fn is_empty(&self) -> bool { self.multi_stage_subquery_refs.is_empty() } @@ -89,6 +133,7 @@ impl LogicalNode for FullKeyAggregate { .schema(self.schema().clone()) .use_full_join_and_coalesce(self.use_full_join_and_coalesce()) .multi_stage_subquery_refs(self.multi_stage_subquery_refs().clone()) + .keys_input(self.keys_input.clone()) .build(), )) } @@ -125,5 +170,9 @@ impl PrettyPrint for FullKeyAggregate { subquery_ref.pretty_print(result, &details_state); } } + if let Some(keys_input) = self.keys_input() { + result.println("keys_input:", &state); + keys_input.pretty_print(result, &details_state); + } } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs index 8f53c6f82b463..4374f24f1a13b 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs @@ -1,5 +1,5 @@ use super::FullKeyAggregateStrategy; -use crate::logical_plan::{FullKeyAggregate, LogicalJoin}; +use crate::logical_plan::{FullKeyAggregate, LogicalJoin, MultiStageSubqueryRef}; use crate::physical_plan::sql_nodes::SqlNodesFactory; use crate::physical_plan::ReferencesBuilder; use crate::physical_plan::{ @@ -8,9 +8,17 @@ use crate::physical_plan::{ }; use crate::physical_plan_builder::PhysicalPlanBuilder; use crate::physical_plan_builder::PushDownBuilderContext; +use crate::planner::MemberSymbol; use cubenativeutils::CubeError; use std::rc::Rc; +/// Keys-based assembly: build a `UNION DISTINCT` of dim projections over the +/// keys-side refs, then LEFT JOIN each measure-side ref by the dims its +/// schema actually carries. Two modes: +/// - keys-side comes from an explicit `keys_input` (JOIN-model: measure +/// refs live at partition grain, keys refs at leaf grain); +/// - keys-side derived from the measure refs themselves (chosen when the +/// dialect lacks FULL JOIN and the logical plan carries no `keys_input`). pub(super) struct KeysFullKeyAggregateStrategy<'a> { builder: &'a PhysicalPlanBuilder, } @@ -19,6 +27,16 @@ impl<'a> KeysFullKeyAggregateStrategy<'a> { pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc { Rc::new(Self { builder }) } + + fn dim_in_schema( + schema: &crate::logical_plan::LogicalSchema, + member: &Rc, + ) -> bool { + let target = member.clone().resolve_reference_chain().full_name(); + schema + .all_dimensions() + .any(|d| d.clone().resolve_reference_chain().full_name() == target) + } } impl FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'_> { @@ -28,11 +46,56 @@ impl FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'_> { context: &PushDownBuilderContext, ) -> Result, CubeError> { let query_tools = self.builder.query_tools(); - let mut keys_queries = vec![]; - let mut data_queries = vec![]; - let mut keys_context = context.clone(); - keys_context.dimensions_query = true; + // Decide the source for keys-side. Either an explicit set of refs + // from the logical plan (JOIN-model), or derive from measure refs + // themselves (no `keys_input` in the plan). + let (keys_refs, has_explicit_keys): (Vec>, bool) = + if let Some(keys_input) = full_key_aggregate.keys_input() { + (keys_input.refs().clone(), true) + } else { + ( + full_key_aggregate.multi_stage_subquery_refs().clone(), + false, + ) + }; + + // Dimensions projected on the keys-side. With explicit keys this is + // the leaf grain (the refs' own schemas); otherwise it's the FKA + // output grain (= query grain). + let key_dims: Vec> = if has_explicit_keys && !keys_refs.is_empty() { + keys_refs[0].schema().all_dimensions().cloned().collect() + } else { + full_key_aggregate + .schema() + .all_dimensions() + .cloned() + .collect() + }; + + // Build a DISTINCT projection of `key_dims` over each keys ref. + let mut keys_projections = vec![]; + for keys_ref in keys_refs.iter() { + let ref_schema = context.get_multi_stage_schema(keys_ref.name())?; + let ref_source = SingleAliasedSource::new_from_table_reference( + keys_ref.name().clone(), + ref_schema.clone(), + None, + ); + let mut select_builder = SelectBuilder::new(From::new(FromSource::Single(ref_source))); + for dim in key_dims.iter() { + let alias = ref_schema.resolve_member_alias(dim); + select_builder + .add_projection_member_reference(dim, QualifiedColumnName::new(None, alias)); + } + select_builder.set_distinct(); + keys_projections.push(Rc::new( + select_builder.build(query_tools.clone(), SqlNodesFactory::new()), + )); + } + + // Build data (measure) sub-selects for each measure ref. + let mut data_queries = vec![]; for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() { let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?; let multi_stage_source = SingleAliasedSource::new_from_table_reference( @@ -40,92 +103,89 @@ impl FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'_> { multi_stage_schema.clone(), None, ); - let mut keys_select_builder = - SelectBuilder::new(From::new(FromSource::Single(multi_stage_source.clone()))); - for dim in full_key_aggregate.schema().all_dimensions() { - let alias = multi_stage_schema.resolve_member_alias(dim); - let reference = QualifiedColumnName::new(None, alias); - keys_select_builder.add_projection_member_reference(dim, reference); - } - let sql_context = SqlNodesFactory::new(); - keys_select_builder.set_distinct(); - let keys_select = - Rc::new(keys_select_builder.build(query_tools.clone(), sql_context.clone())); - keys_queries.push(keys_select); - - let data_select_builder = - SelectBuilder::new(From::new(FromSource::Single(multi_stage_source))); - let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context)); - data_queries.push(data_select); + let data_select = Rc::new( + SelectBuilder::new(From::new(FromSource::Single(multi_stage_source))) + .build(query_tools.clone(), SqlNodesFactory::new()), + ); + data_queries.push((data_select, multi_stage_ref.schema().clone())); } + if data_queries.is_empty() { let empty_join = LogicalJoin::builder().build(); return self.builder.process_node(&empty_join, context); } - if data_queries.len() == 1 { - let select = data_queries[0].clone(); - let result = From::new_from_subselect(select, "fk_aggregate".to_string()); - return Ok(result); - } - - let keys_from = From::new_from_union( - Rc::new(Union::new_from_subselects(&keys_queries)), - "pk_aggregate_keys_source".to_string(), - ); - let references_builder = ReferencesBuilder::new(keys_from.clone()); - let mut keys_select_builder = SelectBuilder::new(keys_from); - - for member in full_key_aggregate.schema().all_dimensions() { - let alias = references_builder.resolve_alias_for_member(&member, &None); - if alias.is_none() { - return Err(CubeError::internal(format!( - "Source for {} not found in full key aggregate subqueries", - member.full_name() - ))); - } - let reference = QualifiedColumnName::new(None, alias.unwrap()); - keys_select_builder.add_projection_member_reference(member, reference); + // Fast path: single measure ref and no explicit keys side — the ref + // already has full key coverage on its own, no need to UNION keys. + if data_queries.len() == 1 && !has_explicit_keys { + let (select, _) = data_queries.into_iter().next().unwrap(); + return Ok(From::new_from_subselect(select, "fk_aggregate".to_string())); } - keys_select_builder.set_distinct(); - - let sql_context = SqlNodesFactory::new(); - let keys_select = Rc::new(keys_select_builder.build(query_tools.clone(), sql_context)); + // Combine keys projections via UNION (DISTINCT inside each plus + // UNION DISTINCT) and resolve canonical aliases. let keys_alias = "fk_aggregate_keys".to_string(); + let keys_select = if keys_projections.len() == 1 { + keys_projections.into_iter().next().unwrap() + } else { + let keys_from = From::new_from_union( + Rc::new(Union::new_from_subselects(&keys_projections)), + "fk_aggregate_keys_source".to_string(), + ); + let references_builder = ReferencesBuilder::new(keys_from.clone()); + let mut outer = SelectBuilder::new(keys_from); + for dim in key_dims.iter() { + let alias = references_builder + .resolve_alias_for_member(dim, &None) + .ok_or_else(|| { + CubeError::internal(format!( + "Source for {} not found in full key aggregate subqueries", + dim.full_name() + )) + })?; + outer.add_projection_member_reference(dim, QualifiedColumnName::new(None, alias)); + } + outer.set_distinct(); + Rc::new(outer.build(query_tools.clone(), SqlNodesFactory::new())) + }; let mut join_builder = JoinBuilder::new_from_subselect(keys_select.clone(), keys_alias.clone()); - for (i, query) in data_queries.into_iter().enumerate() { - let query_alias = format!("q_{}", i); - let conditions = full_key_aggregate - .schema() - .all_dimensions() + for (idx, (query, query_logical_schema)) in data_queries.into_iter().enumerate() { + let query_alias = format!("q_{}", idx); + // JOIN-keys are the dims present in both sides. With explicit + // keys-side this collapses to the measure ref's partition grain; + // without it the measure ref already has all key_dims. + let conditions = key_dims + .iter() + .filter(|d| Self::dim_in_schema(query_logical_schema.as_ref(), d)) .map(|dim| -> Result<_, CubeError> { - let alias_in_keys_query = keys_select.schema().resolve_member_alias(dim); - let keys_query_ref = Expr::Reference(QualifiedColumnName::new( + let alias_in_keys = keys_select.schema().resolve_member_alias(dim); + let keys_ref_expr = Expr::Reference(QualifiedColumnName::new( Some(keys_alias.clone()), - alias_in_keys_query, + alias_in_keys, )); - let alias_in_data_query = query.schema().resolve_member_alias(dim); - let data_query_ref = Expr::Reference(QualifiedColumnName::new( + let alias_in_data = query.schema().resolve_member_alias(dim); + let data_ref_expr = Expr::Reference(QualifiedColumnName::new( Some(query_alias.clone()), - alias_in_data_query, + alias_in_data, )); - - Ok(vec![(keys_query_ref, data_query_ref)]) + Ok(vec![(keys_ref_expr, data_ref_expr)]) }) .collect::, _>>()?; + // Null-safe dimension join when keys are derived from the + // measure refs (FULL JOIN-equivalent shape). For the JOIN-model + // path (explicit keys) it's a one-to-one match — null-safety is + // unnecessary but stays correct. join_builder.left_join_subselect( query, - query_alias.clone(), - JoinCondition::new_dimension_join(conditions, true), + query_alias, + JoinCondition::new_dimension_join(conditions, !has_explicit_keys), ); } - let result = join_builder.build(); - Ok(From::new_from_join(result)) + Ok(From::new_from_join(join_builder.build())) } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs index 2f7cd6863ae86..0683b3dca586f 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs @@ -36,7 +36,12 @@ impl<'a> LogicalNodeProcessor<'a, FullKeyAggregate> for FullKeyAggregateProcesso context: &PushDownBuilderContext, ) -> Result { let strategy: Rc = - if !full_key_aggregate.schema().has_dimensions() { + if full_key_aggregate.keys_input().is_some() { + // JOIN-model: keys side comes from the logical plan. + // `KeysFullKeyAggregateStrategy` handles both the explicit + // `keys_input` shape and the derived-from-measure-refs shape. + KeysFullKeyAggregateStrategy::new(self.builder) + } else if !full_key_aggregate.schema().has_dimensions() { InnerJoinFullKeyAggregateStrategy::new(self.builder) } else if self.builder.templates().supports_full_join() { FullJoinFullKeyAggregateStrategy::new(self.builder) diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs index e1b2f596c8851..0f0d652e7952f 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs @@ -123,6 +123,12 @@ pub struct MultiStageInodeMember { add_group_by: Vec>, group_by: Option>>, time_shift: Option, + /// Optimisation flag: this Aggregate inode is a safe candidate for + /// the `window`-based render — single measure dep, additive identity + /// rollup, no leaf-extending modifiers. When `true`, assembly skips + /// the JOIN-model and `member_query_planner` emits a window function. + /// Default `false`. + use_window_path: bool, } impl MultiStageInodeMember { @@ -139,9 +145,19 @@ impl MultiStageInodeMember { add_group_by, group_by, time_shift, + use_window_path: false, } } + pub fn with_use_window_path(mut self, value: bool) -> Self { + self.use_window_path = value; + self + } + + pub fn use_window_path(&self) -> bool { + self.use_window_path + } + pub fn inode_type(&self) -> &MultiStageInodeMemberType { &self.inode_type } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs index 017ada18774c4..bbb86fa306745 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs @@ -195,11 +195,10 @@ impl MultiStageMemberQueryPlanner { } /// Builds a measure-calculation CTE (Rank / Aggregate / - /// Calculate). Picks the partition-by from the inode's - /// `reduce_by` / `group_by` settings, chooses a window-function - /// flavour (Rank, Window, or None) when the partition is - /// narrower than the full dimension set, and wires the input - /// CTEs into a `FullKeyAggregate` source. + /// Calculate). Wires the input CTEs into a `FullKeyAggregate` + /// source; for the JOIN-based path (when the description carries + /// `keys_input`) also wires keys-side refs through + /// `FullKeyAggregateKeysInput`. fn plan_for_cte_query( &self, multi_stage_member: &MultiStageInodeMember, @@ -209,14 +208,19 @@ impl MultiStageMemberQueryPlanner { &multi_stage_member.group_by_symbols(), ); + // Rank always uses a window function. Aggregate inodes are + // routed through `FullKeyAggregate` by default; only the narrow + // optimisation-eligible subset (planner sets `use_window_path`) + // is emitted as a Window expression and additionally requires + // partition_by to be a strict subset of all dimensions — + // otherwise the window collapses into a plain group-by. let window_function_to_use = match multi_stage_member.inode_type() { MultiStageInodeMemberType::Rank => MultiStageCalculationWindowFunction::Rank, - MultiStageInodeMemberType::Aggregate => { - if partition_by.len() != self.all_dimensions().len() { - MultiStageCalculationWindowFunction::Window - } else { - MultiStageCalculationWindowFunction::None - } + MultiStageInodeMemberType::Aggregate + if multi_stage_member.use_window_path() + && partition_by.len() != self.all_dimensions().len() => + { + MultiStageCalculationWindowFunction::Window } _ => MultiStageCalculationWindowFunction::None, }; @@ -257,6 +261,34 @@ impl MultiStageMemberQueryPlanner { }) .collect_vec(); + let keys_input = if self.description.keys_input().is_empty() { + None + } else { + let refs = self + .description + .keys_input() + .iter() + .map(|d| { + let schema = LogicalSchema::default() + .set_time_dimensions(d.state().time_dimensions().clone()) + .set_dimensions(d.state().dimensions().clone()) + .set_measures(vec![d.member_node().clone()]) + .into_rc(); + Rc::new( + MultiStageSubqueryRef::builder() + .name(d.alias().clone()) + .symbols(vec![d.member_node().clone()]) + .schema(schema) + .build(), + ) + }) + .unique_by(|r| r.name().clone()) + .collect_vec(); + Some(Rc::new( + FullKeyAggregateKeysInput::builder().refs(refs).build(), + )) + }; + let full_key_aggregate_schema = self.input_schema(); let result = MultiStageMeasureCalculation::builder() .schema(schema) @@ -270,6 +302,7 @@ impl MultiStageMemberQueryPlanner { .schema(full_key_aggregate_schema) .use_full_join_and_coalesce(true) .multi_stage_subquery_refs(input_sources) + .keys_input(keys_input) .build(), )) .build(); diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs index 2a74aeb8e0506..1d9bae30a6335 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs @@ -13,6 +13,7 @@ use crate::planner::filter::BaseFilter; use crate::planner::filter::FilterItem; use crate::planner::filter::FilterOperator; use crate::planner::query_tools::QueryTools; +use crate::planner::symbols::AggregationType; use crate::planner::Case; use crate::planner::CaseSwitchDefinition; use crate::planner::CaseSwitchItem; @@ -143,6 +144,9 @@ impl MultiStageQueryPlanner { let reduce_by = measure.reduce_by().clone().unwrap_or_default(); let add_group_by = measure.add_group_by().clone().unwrap_or_default(); let group_by = measure.group_by().clone(); + let use_window_path = matches!(member_type, MultiStageInodeMemberType::Aggregate) + && add_group_by.is_empty() + && Self::is_window_path_eligible(&base_member); ( MultiStageInodeMember::new( member_type, @@ -150,7 +154,8 @@ impl MultiStageQueryPlanner { add_group_by, group_by, time_shift, - ), + ) + .with_use_window_path(use_window_path), is_ungrupped, ) } else { @@ -220,6 +225,67 @@ impl MultiStageQueryPlanner { } } + /// Aggregate inode is window-path eligible when it has exactly one + /// measure dep, the outer aggregation is `sum`, and the inner + /// aggregation rolls up as a sum (i.e. inner ∈ {sum, count}). This + /// is the narrow subset where `sum(sum(x)) OVER (...)` is a faithful + /// rollup — sum is associative and count rolls up as sum. + fn is_window_path_eligible(base_member: &Rc) -> bool { + let Ok(outer) = base_member.as_measure() else { + return false; + }; + let outer_is_sum = matches!( + outer.kind(), + MeasureKind::Aggregated(a) if a.agg_type() == AggregationType::Sum + ); + if !outer_is_sum { + return false; + } + let deps = base_member.get_dependencies(); + let [dep] = deps.as_slice() else { + return false; + }; + let Ok(inner) = dep.clone().resolve_reference_chain().as_measure() else { + return false; + }; + match inner.kind() { + MeasureKind::Count(_) => true, + MeasureKind::Aggregated(a) => a.agg_type() == AggregationType::Sum, + _ => false, + } + } + + /// Mirror of `MultiStageMemberQueryPlanner::member_partition_by_logical`: + /// drops `reduce_by` dims and (when `group_by` is set) keeps only the + /// dims explicitly listed. Used at planning time to decide whether + /// reduce_by / group_by actually shrinks the partition vs the leaf + /// grain. + /// + /// FIXME: merge with `MultiStageMemberQueryPlanner::member_partition_by_logical` + /// — both apply the same reduce_by/group_by reshape on different inputs; + /// keeping two copies invites silent drift when only one is updated. + fn partition_filter( + dims: &Vec>, + reduce_by: &Vec>, + group_by: &Option>>, + ) -> Vec> { + let dims: Vec> = if !reduce_by.is_empty() { + dims.iter() + .filter(|d| !reduce_by.iter().any(|m| d.has_member_in_reference_chain(m))) + .cloned() + .collect() + } else { + dims.clone() + }; + if let Some(group_by) = group_by { + dims.into_iter() + .filter(|d| group_by.iter().any(|m| d.has_member_in_reference_chain(m))) + .collect() + } else { + dims + } + } + /// Default child-generation path: for each measure or /// multi-stage-dimension dependency, recurses into /// `make_queries_descriptions` and adds the result as an input @@ -265,6 +331,7 @@ impl MultiStageQueryPlanner { ), new_state.clone(), vec![], + vec![], alias, ); result.push(description.clone()); @@ -414,6 +481,7 @@ impl MultiStageQueryPlanner { ), state.clone(), vec![], + vec![], alias.clone(), ) } else { @@ -428,11 +496,36 @@ impl MultiStageQueryPlanner { } } - let new_state = if !dimensions_to_add.is_empty() - || multi_stage_member.time_shift().is_some() - || state.has_filters_for_member(&member_name) - { + // new_state is the leaf grain on which children are computed. + // For JOIN-model Aggregate inodes modifiers apply in this order: + // 1. reduce_by / group_by — shrink parent grain to the + // partition grain implied by directives. + // 2. add_group_by — extend the result with extra leaf dims. + // 3. time_shift / filter cleanup. + // Step 1 must precede step 2: `group_by` is a keep-only filter + // and would silently drop dims that step 2 needs to introduce. + // + // The window-path Aggregate inode skips step 1: the leaf stays + // at the parent state plus any add_group_by extension, and the + // window function does the reduce_by collapse at outer level. + let use_window_path = multi_stage_member.use_window_path(); + let new_state = { let mut new_state = state.as_ref().clone(); + if !use_window_path + && matches!( + multi_stage_member.inode_type(), + MultiStageInodeMemberType::Aggregate + ) + { + let reduce_by = multi_stage_member.reduce_by_symbols().clone(); + let group_by = multi_stage_member.group_by_symbols().clone(); + let dims = + Self::partition_filter(new_state.dimensions(), &reduce_by, &group_by); + let time_dims = + Self::partition_filter(new_state.time_dimensions(), &reduce_by, &group_by); + new_state.set_dimensions(dims); + new_state.set_time_dimensions(time_dims); + } if !dimensions_to_add.is_empty() { new_state.add_dimensions(dimensions_to_add.clone()); } @@ -443,20 +536,53 @@ impl MultiStageQueryPlanner { new_state.remove_filter_for_member(&member_name); } Rc::new(new_state) - } else { - state.clone() }; let mut input = vec![]; self.make_childs( member.clone(), - new_state, + new_state.clone(), &mut input, descriptions, resolved_multi_stage_dimensions, cte_state, )?; + // JOIN-model: when new_state misses any dim that was on the + // parent's `state`, this inode shrinks the parent grain. We + // build keys-side descriptions per child on the parent state + // so the FullKeyAggregate can broadcast measure values back + // to the full query grain. Window-path Aggregate inodes + // (sum-of-sum / sum-of-count without add_group_by) handle + // broadcast via the window expression instead and don't need + // keys_input. + let mut keys_input: Vec> = vec![]; + if !use_window_path { + let new_state_has = |sym: &Rc| { + let sym_name = sym.clone().resolve_reference_chain().full_name(); + new_state + .dimensions() + .iter() + .chain(new_state.time_dimensions().iter()) + .any(|d| d.clone().resolve_reference_chain().full_name() == sym_name) + }; + let any_missing = state + .dimensions() + .iter() + .chain(state.time_dimensions().iter()) + .any(|d| !new_state_has(d)); + if any_missing { + self.make_childs( + member.clone(), + state.clone(), + &mut keys_input, + descriptions, + resolved_multi_stage_dimensions, + cte_state, + )?; + } + } + let alias = cte_state.next_cte_name(); MultiStageQueryDescription::new( MultiStageMember::new( @@ -467,6 +593,7 @@ impl MultiStageQueryPlanner { ), state.clone(), input, + keys_input, alias.clone(), ) }; @@ -640,6 +767,7 @@ impl MultiStageQueryPlanner { ), state.clone(), input, + vec![], alias.clone(), ); descriptions.push(description.clone()); @@ -678,6 +806,7 @@ impl MultiStageQueryPlanner { ), state.clone(), vec![], + vec![], "time_series_get_range".to_string(), ); descriptions.push(time_series_get_range_node.clone()); @@ -728,6 +857,7 @@ impl MultiStageQueryPlanner { ), state.clone(), vec![], + vec![], "time_series".to_string(), ); descriptions.push(time_series_node.clone()); @@ -758,6 +888,7 @@ impl MultiStageQueryPlanner { ), state, vec![], + vec![], alias.clone(), ); descriptions.push(description.clone()); diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs index c412b93125648..c2314f8539063 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs @@ -14,6 +14,12 @@ pub struct MultiStageQueryDescription { member: Rc, state: Rc, input: Vec>, + /// Dim-grid sources for the JOIN-based assembly. Empty for the + /// window-based path. Populated by `make_queries_descriptions` when + /// reduce_by / group_by actually shrinks the partition grain vs the + /// leaf grain — in that case `input` is rebuilt at partition grain + /// and the original full-grain inputs move here as keys. + keys_input: Vec>, alias: String, } @@ -22,12 +28,14 @@ impl MultiStageQueryDescription { member: Rc, state: Rc, input: Vec>, + keys_input: Vec>, alias: String, ) -> Rc { Rc::new(Self { member, state, input, + keys_input, alias, }) } @@ -68,6 +76,10 @@ impl MultiStageQueryDescription { &self.input } + pub fn keys_input(&self) -> &Vec> { + &self.keys_input + } + pub fn is_leaf(&self) -> bool { self.input.is_empty() } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml index 3d915bafeb624..cc39af2fb78d2 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml @@ -90,6 +90,63 @@ cubes: type: avg sql: amount + - name: unique_customers + type: count_distinct + sql: customer_id + + - name: unique_customers_reduce_category + type: sum + sql: "{CUBE.unique_customers}" + multi_stage: true + reduce_by: + - orders.category + + - name: avg_amount_reduce_category + type: avg + sql: "{CUBE.avg_amount}" + multi_stage: true + reduce_by: + - orders.category + + - name: count_reduce_category + type: sum + sql: "{CUBE.count}" + multi_stage: true + reduce_by: + - orders.category + + - name: max_total_amount_reduce_category + type: max + sql: "{CUBE.total_amount}" + multi_stage: true + reduce_by: + - orders.category + + - name: amount_plus_customers_reduce_category + type: number + sql: "{CUBE.amount_reduce_category} + {CUBE.unique_customers_reduce_category}" + multi_stage: true + + - name: max_amount + type: max + sql: amount + + - name: max_sum_reduce_category + type: sum + sql: "{CUBE.max_amount}" + multi_stage: true + reduce_by: + - orders.category + + - name: total_by_customer_reduce_category + type: sum + sql: "{CUBE.total_amount}" + multi_stage: true + add_group_by: + - orders.customer_id + reduce_by: + - orders.category + - name: amount_by_id type: number sql: "{CUBE.total_amount}" diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/pg_service.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/pg_service.rs index 79357c86b271a..87cea88e905b8 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/pg_service.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/pg_service.rs @@ -25,7 +25,7 @@ static CLEANUP_CONTAINER_ID: OnceLock = OnceLock::new(); extern "C" fn cleanup_container() { if let Some(id) = CLEANUP_CONTAINER_ID.get() { let _ = std::process::Command::new("docker") - .args(["rm", "-f", id]) + .args(["rm", "-f", "-v", id]) .output(); } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs index 3cae1efd87fdf..b47565f070354 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs @@ -9,6 +9,140 @@ fn create_context() -> TestContext { const SEED: &str = "integration_multi_stage_tables.sql"; +fn assert_uses_window(sql: &str) { + assert!( + sql.contains("OVER (PARTITION BY"), + "expected SQL to use a window function (`... OVER (PARTITION BY ...)`),\n\ + got:\n{}", + sql, + ); +} + +fn assert_no_window(sql: &str) { + assert!( + !sql.contains("OVER (PARTITION BY"), + "expected SQL to assemble via JOIN-model (no window function),\n\ + got:\n{}", + sql, + ); +} + +// add_group_by + reduce_by together: leaf grain extends with customer_id +// while partition grain shrinks by removing category. Three distinct grains: +// leaf = (status, category, customer_id) ← per-customer sum(amount) +// query = (status, category) +// partition = (status,) ← reduce_by removes category +// Expected per JOIN-semantic (outer sum collapses add_group_by + reduce_by +// down to partition, broadcast to query grid): total sum(amount) per status. +// cancelled = 200, completed = 1400, pending = 650. +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_add_group_by_combo() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.total_by_customer_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +// Inner base = max (idempotent), outer multi-stage = sum. JOIN-model +// computes overall max(amount) per status, broadcast across categories +// (100 / 400 / 250 on this seed). +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_sum_of_max() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.max_sum_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +// Inner additive (sum), outer idempotent (max). JOIN-model computes overall +// sum(amount) per status broadcast across categories +// (200 / 1400 / 650 on this seed). +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_max_of_sum() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.max_total_amount_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +// Calculated multi-stage measure summing two reduce_by Aggregate children. +// Each child renders through the JOIN-model path independently; the parent +// Calculate inode then stitches both via FullKeyAggregate on the query grain. +// Expected per status (broadcast across categories): +// amount_reduce_category: 200 / 1400 / 650 +// unique_customers_reduce_category: 3 / 3 / 3 +// sum: 203 / 1403 / 653 +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_calculated_over_two() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.amount_plus_customers_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + // Mixed: the sum-of-sum child renders through window-path, the + // count_distinct child through JOIN-model, and the parent Calculate + // stitches both via FullKeyAggregate. SQL contains at least one window. + let sql = ctx.build_sql(query).unwrap(); + assert_uses_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + #[tokio::test(flavor = "multi_thread")] async fn test_reduce_by_single_dim() { let ctx = create_context(); @@ -24,7 +158,8 @@ async fn test_reduce_by_single_dim() { - id: orders.category "#}; - ctx.build_sql(query).unwrap(); + let sql = ctx.build_sql(query).unwrap(); + assert_uses_window(&sql); if let Some(result) = ctx.try_execute_pg(query, SEED).await { insta::assert_snapshot!(result); @@ -46,7 +181,8 @@ async fn test_reduce_by_other_dim() { - id: orders.category "#}; - ctx.build_sql(query).unwrap(); + let sql = ctx.build_sql(query).unwrap(); + assert_uses_window(&sql); if let Some(result) = ctx.try_execute_pg(query, SEED).await { insta::assert_snapshot!(result); @@ -68,7 +204,10 @@ async fn test_reduce_by_multiple_dims() { - id: orders.category "#}; - ctx.build_sql(query).unwrap(); + // reduce_by drops every query dim → partition equals nothing; window-path + // is skipped as redundant and we fall back to plain group-by. + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); if let Some(result) = ctx.try_execute_pg(query, SEED).await { insta::assert_snapshot!(result); @@ -88,7 +227,90 @@ async fn test_reduce_by_dim_not_in_query() { - id: orders.status "#}; - ctx.build_sql(query).unwrap(); + // reduce_by targets a dim absent from the query → partition equals + // the query grain, window is redundant, falls back to plain group-by. + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +// avg with reduce_by — JOIN-model computes overall avg(amount) per status +// (= 66.67 / 233.33 / 108.33 on this seed) instead of the window-path +// avg-of-bucket-avgs which would have diverged on uneven bucket sizes. +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_avg() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.avg_amount_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +// Multi-stage measure has outer `type: sum` over base `count` — this is the +// correct user-level shape for "total count per partition" (count rolls up as +// sum). JOIN-model picks the partition-grain leaf and broadcasts to query. +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_count() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.count_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + let sql = ctx.build_sql(query).unwrap(); + assert_uses_window(&sql); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +// Multi-stage measure has outer `type: sum` over base `count_distinct` — +// the correct shape for "rolled-up distinct count per partition". On this +// seed customers don't overlap across statuses, so sum of per-status +// count_distinct equals the true distinct count (3/3/3); when partitions +// overlap callers should use an HLL-based path instead. +#[tokio::test(flavor = "multi_thread")] +async fn test_reduce_by_count_distinct() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.unique_customers_reduce_category + dimensions: + - orders.status + - orders.category + order: + - id: orders.status + - id: orders.category + "#}; + + let sql = ctx.build_sql(query).unwrap(); + assert_no_window(&sql); if let Some(result) = ctx.try_execute_pg(query, SEED).await { insta::assert_snapshot!(result); @@ -114,7 +336,8 @@ async fn test_reduce_by_with_time() { - id: orders.category "#}; - ctx.build_sql(query).unwrap(); + let sql = ctx.build_sql(query).unwrap(); + assert_uses_window(&sql); if let Some(result) = ctx.try_execute_pg(query, SEED).await { insta::assert_snapshot!(result); diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_add_group_by_combo.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_add_group_by_combo.snap new file mode 100644 index 0000000000000..1a06d4d3062ee --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_add_group_by_combo.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 39 +expression: result +--- +orders__status | orders__category | orders__total_by_customer_reduce_category +---------------+------------------+------------------------------------------ +cancelled | books | 200.00 +cancelled | clothing | 200.00 +cancelled | electronics | 200.00 +completed | books | 1400.00 +completed | clothing | 1400.00 +completed | electronics | 1400.00 +pending | books | 650.00 +pending | clothing | 650.00 +pending | electronics | 650.00 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_avg.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_avg.snap new file mode 100644 index 0000000000000..eea5a39efa5c8 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_avg.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 239 +expression: result +--- +orders__status | orders__category | orders__avg_amount_reduce_category +---------------+------------------+----------------------------------- +cancelled | books | 66.6666666666666667 +cancelled | clothing | 66.6666666666666667 +cancelled | electronics | 66.6666666666666667 +completed | books | 233.3333333333333333 +completed | clothing | 233.3333333333333333 +completed | electronics | 233.3333333333333333 +pending | books | 108.3333333333333333 +pending | clothing | 108.3333333333333333 +pending | electronics | 108.3333333333333333 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_calculated_over_two.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_calculated_over_two.snap new file mode 100644 index 0000000000000..ddc43d16e582f --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_calculated_over_two.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 141 +expression: result +--- +orders__status | orders__category | orders__amount_plus_customers_reduce_category +---------------+------------------+---------------------------------------------- +cancelled | books | 203.00 +cancelled | clothing | 203.00 +cancelled | electronics | 203.00 +completed | books | 1403.00 +completed | clothing | 1403.00 +completed | electronics | 1403.00 +pending | books | 653.00 +pending | clothing | 653.00 +pending | electronics | 653.00 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_count.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_count.snap new file mode 100644 index 0000000000000..82e62614a4e1d --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_count.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 248 +expression: result +--- +orders__status | orders__category | orders__count_reduce_category +---------------+------------------+------------------------------ +cancelled | books | 3 +cancelled | clothing | 3 +cancelled | electronics | 3 +completed | books | 6 +completed | clothing | 6 +completed | electronics | 6 +pending | books | 6 +pending | clothing | 6 +pending | electronics | 6 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_count_distinct.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_count_distinct.snap new file mode 100644 index 0000000000000..90b91050a127e --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_count_distinct.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 275 +expression: result +--- +orders__status | orders__category | orders__unique_customers_reduce_category +---------------+------------------+----------------------------------------- +cancelled | books | 3 +cancelled | clothing | 3 +cancelled | electronics | 3 +completed | books | 3 +completed | clothing | 3 +completed | electronics | 3 +pending | books | 3 +pending | clothing | 3 +pending | electronics | 3 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_max_of_sum.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_max_of_sum.snap new file mode 100644 index 0000000000000..f804a25bd04a2 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_max_of_sum.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 117 +expression: result +--- +orders__status | orders__category | orders__max_total_amount_reduce_category +---------------+------------------+----------------------------------------- +cancelled | books | 200.00 +cancelled | clothing | 200.00 +cancelled | electronics | 200.00 +completed | books | 1400.00 +completed | clothing | 1400.00 +completed | electronics | 1400.00 +pending | books | 650.00 +pending | clothing | 650.00 +pending | electronics | 650.00 diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_sum_of_max.snap b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_sum_of_max.snap new file mode 100644 index 0000000000000..a22ba5bd7c230 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__reduce_by__reduce_by_sum_of_max.snap @@ -0,0 +1,16 @@ +--- +source: cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/reduce_by.rs +assertion_line: 66 +expression: result +--- +orders__status | orders__category | orders__max_sum_reduce_category +---------------+------------------+-------------------------------- +cancelled | books | 100.00 +cancelled | clothing | 100.00 +cancelled | electronics | 100.00 +completed | books | 400.00 +completed | clothing | 400.00 +completed | electronics | 400.00 +pending | books | 250.00 +pending | clothing | 250.00 +pending | electronics | 250.00