Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,32 @@ impl LogicalNode for AggregateMultipliedSubquery {
pre_aggregation_override,
} = AggregateMultipliedSubqueryInputUnPacker::new(&self, &inputs)?;

// When pre_aggregation_override is set, the override is the only
// rendered branch and `inputs()` packs only it — `keys_subquery`,
// `source`, `dimension_subqueries` are reused unchanged from `self`.
let (keys_subquery, source, dimension_subqueries) = if pre_aggregation_override.is_some() {
(
self.keys_subquery.clone(),
self.source.clone(),
self.dimension_subqueries.clone(),
)
} else {
(
keys_subquery.unwrap().clone().into_logical_node()?,
self.source.with_plan_node(source.unwrap().clone())?,
dimension_subqueries
.unwrap()
.iter()
.map(|itm| itm.clone().into_logical_node())
.collect::<Result<Vec<_>, _>>()?,
)
};
Comment on lines +45 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observation (non-blocking): The .unwrap() calls on lines 53–54 are safe here because the else branch guarantees pre_aggregation_override is None, which means the unpacker always sets keys_subquery, source, and dimension_subqueries to Some. The logic is sound, but worth noting for future maintainers — the safety depends on the pack/unpack invariant staying in sync.


let result = Self {
schema: self.schema.clone(),
keys_subquery: keys_subquery.clone().into_logical_node()?,
source: self.source.with_plan_node(source.clone())?,
dimension_subqueries: dimension_subqueries
.iter()
.map(|itm| itm.clone().into_logical_node())
.collect::<Result<Vec<_>, _>>()?,
keys_subquery,
source,
dimension_subqueries,
pre_aggregation_override: match pre_aggregation_override {
Some(node) => Some(node.clone().into_logical_node()?),
None => None,
Expand All @@ -72,6 +90,15 @@ pub struct AggregateMultipliedSubqueryInputPacker;

impl AggregateMultipliedSubqueryInputPacker {
pub fn pack(aggregate: &AggregateMultipliedSubquery) -> Vec<PlanNode> {
// When a pre-aggregation matched this multiplied subquery, the override
// is the only branch ever rendered. Pack only it so that plan walkers
// (cube-name collection, future rewriters, visitors) don't follow the
// dead `keys_subquery` / `source` / `dimension_subqueries` branches —
// those still reference raw cube tables and would leak `cube.table`
// identifiers into engines that don't have them (e.g. CubeStore).
if let Some(override_query) = &aggregate.pre_aggregation_override {
return vec![override_query.as_plan_node()];
}
let mut result = vec![];
result.push(aggregate.keys_subquery.as_plan_node());
result.push(aggregate.source.as_plan_node());
Expand All @@ -81,17 +108,14 @@ impl AggregateMultipliedSubqueryInputPacker {
.iter()
.map(|itm| itm.as_plan_node()),
);
if let Some(override_query) = &aggregate.pre_aggregation_override {
result.push(override_query.as_plan_node());
}
result
}
}

pub struct AggregateMultipliedSubqueryInputUnPacker<'a> {
keys_subquery: &'a PlanNode,
source: &'a PlanNode,
dimension_subqueries: &'a [PlanNode],
keys_subquery: Option<&'a PlanNode>,
source: Option<&'a PlanNode>,
dimension_subqueries: Option<&'a [PlanNode]>,
pre_aggregation_override: Option<&'a PlanNode>,
}

Expand All @@ -102,31 +126,34 @@ impl<'a> AggregateMultipliedSubqueryInputUnPacker<'a> {
) -> Result<Self, CubeError> {
check_inputs_len(&inputs, Self::inputs_len(aggregate), aggregate.node_name())?;

if aggregate.pre_aggregation_override.is_some() {
return Ok(Self {
keys_subquery: None,
source: None,
dimension_subqueries: None,
pre_aggregation_override: Some(&inputs[0]),
});
}

let keys_subquery = &inputs[0];
let source = &inputs[1];
let dim_end = 2 + aggregate.dimension_subqueries.len();
let dimension_subqueries = &inputs[2..dim_end];
let pre_aggregation_override = if aggregate.pre_aggregation_override.is_some() {
Some(&inputs[dim_end])
} else {
None
};

Ok(Self {
keys_subquery,
source,
dimension_subqueries,
pre_aggregation_override,
keys_subquery: Some(keys_subquery),
source: Some(source),
dimension_subqueries: Some(dimension_subqueries),
pre_aggregation_override: None,
})
}

fn inputs_len(aggregate: &AggregateMultipliedSubquery) -> usize {
2 + aggregate.dimension_subqueries.len()
+ if aggregate.pre_aggregation_override.is_some() {
1
} else {
0
}
if aggregate.pre_aggregation_override.is_some() {
1
} else {
2 + aggregate.dimension_subqueries.len()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ impl PreAggregationOptimizer {
return Ok(None);
}

// Multi-stage rewrite is only safe when the outer source is a
// FullKeyAggregate: it references its dependent CTEs by name, so once
// each multistage member has been rewritten the outer SELECT contains
// no raw cube references. A LogicalJoin source would carry raw
// `cube.table` identifiers into the rewritten plan; combined with
// `is_external` becoming true (multi-stage pre-agg usages collected),
// the resulting SQL would be rendered with CubeStore templates and
// routed to CubeStore, which has no such table — surfacing as
// `Table <cube.table> was not found`.
let source = if let QuerySource::FullKeyAggregate(full_key_aggregate) = query.source() {
let result = FullKeyAggregate::builder()
.schema(full_key_aggregate.schema().clone())
Expand All @@ -262,7 +271,9 @@ impl PreAggregationOptimizer {
.build();
Rc::new(result).into()
} else {
query.source().clone()
self.usages.truncate(saved_usages_len);
self.usage_counter = saved_counter;
return Ok(None);
};
Comment on lines 273 to 277
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix. The previous code fell through with query.source().clone(), silently allowing a LogicalJoin source to carry raw cube.table identifiers into the rewritten plan. The rollback + early return is the correct conservative choice — better to skip the rewrite than produce invalid SQL for CubeStore.


// Reject mixed external/non-external pre-aggregation usages
Expand Down
Loading