Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext};
use crate::logical_plan::{AggregateMultipliedSubquery, AggregateMultipliedSubquerySource};
use crate::physical_plan::ReferencesBuilder;
use crate::physical_plan::VisitorContext;
use crate::physical_plan::{
Expr, From, JoinBuilder, JoinCondition, MemberExpression, QualifiedColumnName, Select,
SelectBuilder,
Expand Down Expand Up @@ -59,6 +60,24 @@ impl<'a> LogicalNodeProcessor<'a, AggregateMultipliedSubquery>

match &aggregate_multiplied_subquery.source {
AggregateMultipliedSubquerySource::Cube(cube) => {
// Bind a dedicated VisitorContext to the join's right-hand side
// so that primary-key dimensions render against `pk_cube_alias`
// (the source cube join). Without it, the outer factory's
// render_references — populated later for the SELECT — map
// these dimensions to the inner `keys` subquery alias, and
// both sides of the ON clause collapse to `keys.<pk> = keys.<pk>`.
// Clone the parent factory rather than rebuilding from context so
// that any state already added above (currently none, but this
// makes the lineage explicit for future maintenance) is preserved.
let mut join_context_factory = context_factory.clone();
join_context_factory
.add_cube_name_reference(cube.cube().name().clone(), pk_cube_alias.clone());
let join_visitor_context = Rc::new(VisitorContext::new(
query_tools.clone(),
&join_context_factory,
None,
));

let conditions = primary_keys_dimensions
.iter()
.map(|dim| -> Result<_, CubeError> {
Expand All @@ -67,7 +86,10 @@ impl<'a> LogicalNodeProcessor<'a, AggregateMultipliedSubquery>
Some(keys_query_alias.clone()),
alias_in_keys_query,
));
let pk_cube_expr = Expr::Member(MemberExpression::new(dim.clone()));
let pk_cube_expr = Expr::new_member_with_context(
dim.clone(),
join_visitor_context.clone(),
);
Ok(vec![(keys_query_ref, pk_cube_expr)])
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ cubes:
sql: lifetime_value
filters:
- sql: "{regions.name} = 'East'"
- name: active_count
type: count
sql: "{id}"
filters:
- sql: "{name} LIKE 'A%'"

- name: orders
sql: "SELECT * FROM orders"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,71 @@ async fn test_multiplied_aggregate_hub_sum_measure() {
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_multiplied_aggregate_filtered_count_on_pk() {
let ctx = create_context();

// customers.active_count (type: count, sql: "{CUBE}.id", filters: [...])
// grouped by orders.status.
// customers→orders is one_to_many, so customers is multiplied → AggregateMultipliedSubquery
// The measure references only customers → source = Cube branch.
// Regression: the Cube branch used to render the right side of the FK-aggregate
// rejoin via Expr::Member, which the outer factory's render_references mapped
// back to the inner `keys` subquery alias — yielding a tautological
// `ON keys.<pk> = keys.<pk>` that cross-joined the source table and over-counted.
let query = indoc! {"
measures:
- customers.active_count
dimensions:
- orders.status
order:
- id: orders.status
"};

let sql = ctx.build_sql(query).unwrap();

// The buggy output contained `keys.customers__id = keys.customers__id`. Flatten
// the SQL so the assertion survives any future line-wrapping in the planner's
// formatter, then walk every ON predicate and assert no tautological equality
// survives. The snapshot below is the airtight guard; this is a targeted
// structural sanity check pinned to the bug shape.
let flat = sql.replace('\n', " ");
let flat_upper = flat.to_ascii_uppercase();
let mut cursor = 0;
while let Some(rel) = flat_upper[cursor..].find(" ON ") {
let on_end = cursor + rel + 4;
let after = &flat[on_end..];
let after_upper = &flat_upper[on_end..];
let bound = [
" GROUP BY ",
" ORDER BY ",
" LEFT JOIN ",
" INNER JOIN ",
" RIGHT JOIN ",
" UNION ",
]
.iter()
.filter_map(|kw| after_upper.find(kw))
.min()
.unwrap_or(after.len());
for chunk in after[..bound].split(" AND ") {
if let Some((lhs, rhs)) = chunk.split_once('=') {
let lhs = lhs.trim().trim_matches('(').trim();
let rhs = rhs.trim().trim_matches(')').trim();
assert!(
lhs != rhs,
"tautological join condition `{lhs} = {rhs}` in:\n{sql}"
);
}
}
cursor = on_end;
}

if let Some(result) = ctx.try_execute_pg(query, SEED).await {
insta::assert_snapshot!(result);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_multiplied_aggregate_with_measure_subquery() {
let ctx = create_context();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source: cubesqlplanner/src/tests/integration/multi_fact.rs
expression: result
---
orders__status | customers__active_count
---------------+------------------------
completed | 1
pending | 1
NULL | 0
Loading