From 2fb0e036659c933d405e19892af0c48323ed6848 Mon Sep 17 00:00:00 2001 From: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> Date: Thu, 21 May 2026 21:46:55 +0400 Subject: [PATCH] fix(tesseract): Prevent raw cube refs leaking into multi-stage pre-agg queries Signed-off-by: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> --- .../aggregate_multiplied_subquery.rs | 81 ++++++++++++------- .../optimizers/pre_aggregation/optimizer.rs | 13 ++- 2 files changed, 66 insertions(+), 28 deletions(-) diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs index 85ef84e7b2c05..fbef1a3f16785 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs @@ -39,14 +39,32 @@ impl LogicalNode for AggregateMultipliedSubquery { pre_aggregation_override, } = AggregateMultipliedSubqueryInputUnPacker::new(&self, &inputs)?; + // When pre_aggregation_override is set, the override is the only + // rendered branch and `inputs()` packs only it — `keys_subquery`, + // `source`, `dimension_subqueries` are reused unchanged from `self`. + let (keys_subquery, source, dimension_subqueries) = if pre_aggregation_override.is_some() { + ( + self.keys_subquery.clone(), + self.source.clone(), + self.dimension_subqueries.clone(), + ) + } else { + ( + keys_subquery.unwrap().clone().into_logical_node()?, + self.source.with_plan_node(source.unwrap().clone())?, + dimension_subqueries + .unwrap() + .iter() + .map(|itm| itm.clone().into_logical_node()) + .collect::, _>>()?, + ) + }; + let result = Self { schema: self.schema.clone(), - keys_subquery: keys_subquery.clone().into_logical_node()?, - source: self.source.with_plan_node(source.clone())?, - dimension_subqueries: dimension_subqueries - .iter() - .map(|itm| itm.clone().into_logical_node()) - .collect::, _>>()?, + keys_subquery, + source, + dimension_subqueries, pre_aggregation_override: match pre_aggregation_override { Some(node) => Some(node.clone().into_logical_node()?), None => None, @@ -72,6 +90,15 @@ pub struct AggregateMultipliedSubqueryInputPacker; impl AggregateMultipliedSubqueryInputPacker { pub fn pack(aggregate: &AggregateMultipliedSubquery) -> Vec { + // When a pre-aggregation matched this multiplied subquery, the override + // is the only branch ever rendered. Pack only it so that plan walkers + // (cube-name collection, future rewriters, visitors) don't follow the + // dead `keys_subquery` / `source` / `dimension_subqueries` branches — + // those still reference raw cube tables and would leak `cube.table` + // identifiers into engines that don't have them (e.g. CubeStore). + if let Some(override_query) = &aggregate.pre_aggregation_override { + return vec![override_query.as_plan_node()]; + } let mut result = vec![]; result.push(aggregate.keys_subquery.as_plan_node()); result.push(aggregate.source.as_plan_node()); @@ -81,17 +108,14 @@ impl AggregateMultipliedSubqueryInputPacker { .iter() .map(|itm| itm.as_plan_node()), ); - if let Some(override_query) = &aggregate.pre_aggregation_override { - result.push(override_query.as_plan_node()); - } result } } pub struct AggregateMultipliedSubqueryInputUnPacker<'a> { - keys_subquery: &'a PlanNode, - source: &'a PlanNode, - dimension_subqueries: &'a [PlanNode], + keys_subquery: Option<&'a PlanNode>, + source: Option<&'a PlanNode>, + dimension_subqueries: Option<&'a [PlanNode]>, pre_aggregation_override: Option<&'a PlanNode>, } @@ -102,31 +126,34 @@ impl<'a> AggregateMultipliedSubqueryInputUnPacker<'a> { ) -> Result { check_inputs_len(&inputs, Self::inputs_len(aggregate), aggregate.node_name())?; + if aggregate.pre_aggregation_override.is_some() { + return Ok(Self { + keys_subquery: None, + source: None, + dimension_subqueries: None, + pre_aggregation_override: Some(&inputs[0]), + }); + } + let keys_subquery = &inputs[0]; let source = &inputs[1]; let dim_end = 2 + aggregate.dimension_subqueries.len(); let dimension_subqueries = &inputs[2..dim_end]; - let pre_aggregation_override = if aggregate.pre_aggregation_override.is_some() { - Some(&inputs[dim_end]) - } else { - None - }; Ok(Self { - keys_subquery, - source, - dimension_subqueries, - pre_aggregation_override, + keys_subquery: Some(keys_subquery), + source: Some(source), + dimension_subqueries: Some(dimension_subqueries), + pre_aggregation_override: None, }) } fn inputs_len(aggregate: &AggregateMultipliedSubquery) -> usize { - 2 + aggregate.dimension_subqueries.len() - + if aggregate.pre_aggregation_override.is_some() { - 1 - } else { - 0 - } + if aggregate.pre_aggregation_override.is_some() { + 1 + } else { + 2 + aggregate.dimension_subqueries.len() + } } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 7d939319c307c..dcace5d035850 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -254,6 +254,15 @@ impl PreAggregationOptimizer { return Ok(None); } + // Multi-stage rewrite is only safe when the outer source is a + // FullKeyAggregate: it references its dependent CTEs by name, so once + // each multistage member has been rewritten the outer SELECT contains + // no raw cube references. A LogicalJoin source would carry raw + // `cube.table` identifiers into the rewritten plan; combined with + // `is_external` becoming true (multi-stage pre-agg usages collected), + // the resulting SQL would be rendered with CubeStore templates and + // routed to CubeStore, which has no such table — surfacing as + // `Table was not found`. let source = if let QuerySource::FullKeyAggregate(full_key_aggregate) = query.source() { let result = FullKeyAggregate::builder() .schema(full_key_aggregate.schema().clone()) @@ -262,7 +271,9 @@ impl PreAggregationOptimizer { .build(); Rc::new(result).into() } else { - query.source().clone() + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; + return Ok(None); }; // Reject mixed external/non-external pre-aggregation usages