Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,40 @@ impl PrettyPrint for MultiStageSubqueryRef {
}
}

/// Dim-grid source for the JOIN-based assembly: a set of CTE refs that
/// supply the keys grid. Each ref's own logical schema declares the
/// output (leaf) grain; the FKA strategy derives JOIN-keys against
/// `multi_stage_subquery_refs` from the intersection of schemas.
#[derive(Clone, TypedBuilder)]
pub struct FullKeyAggregateKeysInput {
#[builder(default)]
refs: Vec<Rc<MultiStageSubqueryRef>>,
}

impl FullKeyAggregateKeysInput {
pub fn refs(&self) -> &Vec<Rc<MultiStageSubqueryRef>> {
&self.refs
}

pub fn is_empty(&self) -> bool {
self.refs.is_empty()
}
}

impl PrettyPrint for FullKeyAggregateKeysInput {
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
result.println("FullKeyAggregateKeysInput:", state);
let inner = state.new_level();
if !self.refs.is_empty() {
result.println("refs:", &inner);
let details = inner.new_level();
for r in self.refs.iter() {
r.pretty_print(result, &details);
}
}
}
}

/// Top-level aggregating source that stitches together several
/// multi-stage / multi-fact CTEs into one keyed result. The
/// physical builder picks a join strategy from `multi_stage_subquery_refs`
Expand All @@ -49,6 +83,12 @@ pub struct FullKeyAggregate {
use_full_join_and_coalesce: bool,
#[builder(default)]
multi_stage_subquery_refs: Vec<Rc<MultiStageSubqueryRef>>,
/// Optional dim-grid input to LEFT JOIN measure-side refs against.
/// `None` when keys-side can be derived from the measure refs;
/// populated for the JOIN-based path (measure refs sit at partition
/// grain, dim grid lives at leaf grain).
#[builder(default)]
keys_input: Option<Rc<FullKeyAggregateKeysInput>>,
}

impl FullKeyAggregate {
Expand All @@ -67,6 +107,10 @@ impl FullKeyAggregate {
&self.multi_stage_subquery_refs
}

pub fn keys_input(&self) -> Option<&Rc<FullKeyAggregateKeysInput>> {
self.keys_input.as_ref()
}

pub fn is_empty(&self) -> bool {
self.multi_stage_subquery_refs.is_empty()
}
Expand All @@ -89,6 +133,7 @@ impl LogicalNode for FullKeyAggregate {
.schema(self.schema().clone())
.use_full_join_and_coalesce(self.use_full_join_and_coalesce())
.multi_stage_subquery_refs(self.multi_stage_subquery_refs().clone())
.keys_input(self.keys_input.clone())
.build(),
))
}
Expand Down Expand Up @@ -125,5 +170,9 @@ impl PrettyPrint for FullKeyAggregate {
subquery_ref.pretty_print(result, &details_state);
}
}
if let Some(keys_input) = self.keys_input() {
result.println("keys_input:", &state);
keys_input.pretty_print(result, &details_state);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::FullKeyAggregateStrategy;
use crate::logical_plan::{FullKeyAggregate, LogicalJoin};
use crate::logical_plan::{FullKeyAggregate, LogicalJoin, MultiStageSubqueryRef};
use crate::physical_plan::sql_nodes::SqlNodesFactory;
use crate::physical_plan::ReferencesBuilder;
use crate::physical_plan::{
Expand All @@ -8,9 +8,17 @@ use crate::physical_plan::{
};
use crate::physical_plan_builder::PhysicalPlanBuilder;
use crate::physical_plan_builder::PushDownBuilderContext;
use crate::planner::MemberSymbol;
use cubenativeutils::CubeError;
use std::rc::Rc;

/// Keys-based assembly: build a `UNION DISTINCT` of dim projections over the
/// keys-side refs, then LEFT JOIN each measure-side ref by the dims its
/// schema actually carries. Two modes:
/// - keys-side comes from an explicit `keys_input` (JOIN-model: measure
/// refs live at partition grain, keys refs at leaf grain);
/// - keys-side derived from the measure refs themselves (chosen when the
/// dialect lacks FULL JOIN and the logical plan carries no `keys_input`).
pub(super) struct KeysFullKeyAggregateStrategy<'a> {
builder: &'a PhysicalPlanBuilder,
}
Expand All @@ -19,6 +27,16 @@ impl<'a> KeysFullKeyAggregateStrategy<'a> {
pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc<Self> {
Rc::new(Self { builder })
}

fn dim_in_schema(
schema: &crate::logical_plan::LogicalSchema,
member: &Rc<MemberSymbol>,
) -> bool {
let target = member.clone().resolve_reference_chain().full_name();
schema
.all_dimensions()
.any(|d| d.clone().resolve_reference_chain().full_name() == target)
}
}

impl FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'_> {
Expand All @@ -28,104 +46,146 @@ impl FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'_> {
context: &PushDownBuilderContext,
) -> Result<Rc<From>, CubeError> {
let query_tools = self.builder.query_tools();
let mut keys_queries = vec![];
let mut data_queries = vec![];
let mut keys_context = context.clone();
keys_context.dimensions_query = true;

// Decide the source for keys-side. Either an explicit set of refs
// from the logical plan (JOIN-model), or derive from measure refs
// themselves (no `keys_input` in the plan).
let (keys_refs, has_explicit_keys): (Vec<Rc<MultiStageSubqueryRef>>, bool) =
if let Some(keys_input) = full_key_aggregate.keys_input() {
(keys_input.refs().clone(), true)
} else {
(
full_key_aggregate.multi_stage_subquery_refs().clone(),
false,
)
};

// Dimensions projected on the keys-side. With explicit keys this is
// the leaf grain (the refs' own schemas); otherwise it's the FKA
// output grain (= query grain).
let key_dims: Vec<Rc<MemberSymbol>> = if has_explicit_keys && !keys_refs.is_empty() {
keys_refs[0].schema().all_dimensions().cloned().collect()
} else {
full_key_aggregate
.schema()
.all_dimensions()
.cloned()
.collect()
};

// Build a DISTINCT projection of `key_dims` over each keys ref.
let mut keys_projections = vec![];
for keys_ref in keys_refs.iter() {
let ref_schema = context.get_multi_stage_schema(keys_ref.name())?;
let ref_source = SingleAliasedSource::new_from_table_reference(
keys_ref.name().clone(),
ref_schema.clone(),
None,
);
let mut select_builder = SelectBuilder::new(From::new(FromSource::Single(ref_source)));
for dim in key_dims.iter() {
let alias = ref_schema.resolve_member_alias(dim);
select_builder
.add_projection_member_reference(dim, QualifiedColumnName::new(None, alias));
}
select_builder.set_distinct();
keys_projections.push(Rc::new(
select_builder.build(query_tools.clone(), SqlNodesFactory::new()),
));
}

// Build data (measure) sub-selects for each measure ref.
let mut data_queries = vec![];
for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() {
let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?;
let multi_stage_source = SingleAliasedSource::new_from_table_reference(
multi_stage_ref.name().clone(),
multi_stage_schema.clone(),
None,
);
let mut keys_select_builder =
SelectBuilder::new(From::new(FromSource::Single(multi_stage_source.clone())));
for dim in full_key_aggregate.schema().all_dimensions() {
let alias = multi_stage_schema.resolve_member_alias(dim);
let reference = QualifiedColumnName::new(None, alias);
keys_select_builder.add_projection_member_reference(dim, reference);
}
let sql_context = SqlNodesFactory::new();
keys_select_builder.set_distinct();
let keys_select =
Rc::new(keys_select_builder.build(query_tools.clone(), sql_context.clone()));
keys_queries.push(keys_select);

let data_select_builder =
SelectBuilder::new(From::new(FromSource::Single(multi_stage_source)));
let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context));
data_queries.push(data_select);
let data_select = Rc::new(
SelectBuilder::new(From::new(FromSource::Single(multi_stage_source)))
.build(query_tools.clone(), SqlNodesFactory::new()),
);
data_queries.push((data_select, multi_stage_ref.schema().clone()));
}

if data_queries.is_empty() {
let empty_join = LogicalJoin::builder().build();
return self.builder.process_node(&empty_join, context);
}

if data_queries.len() == 1 {
let select = data_queries[0].clone();
let result = From::new_from_subselect(select, "fk_aggregate".to_string());
return Ok(result);
}

let keys_from = From::new_from_union(
Rc::new(Union::new_from_subselects(&keys_queries)),
"pk_aggregate_keys_source".to_string(),
);
let references_builder = ReferencesBuilder::new(keys_from.clone());
let mut keys_select_builder = SelectBuilder::new(keys_from);

for member in full_key_aggregate.schema().all_dimensions() {
let alias = references_builder.resolve_alias_for_member(&member, &None);
if alias.is_none() {
return Err(CubeError::internal(format!(
"Source for {} not found in full key aggregate subqueries",
member.full_name()
)));
}
let reference = QualifiedColumnName::new(None, alias.unwrap());
keys_select_builder.add_projection_member_reference(member, reference);
// Fast path: single measure ref and no explicit keys side — the ref
// already has full key coverage on its own, no need to UNION keys.
if data_queries.len() == 1 && !has_explicit_keys {
let (select, _) = data_queries.into_iter().next().unwrap();
return Ok(From::new_from_subselect(select, "fk_aggregate".to_string()));
}
keys_select_builder.set_distinct();

let sql_context = SqlNodesFactory::new();
let keys_select = Rc::new(keys_select_builder.build(query_tools.clone(), sql_context));

// Combine keys projections via UNION (DISTINCT inside each plus
// UNION DISTINCT) and resolve canonical aliases.
let keys_alias = "fk_aggregate_keys".to_string();
let keys_select = if keys_projections.len() == 1 {
keys_projections.into_iter().next().unwrap()
} else {
let keys_from = From::new_from_union(
Rc::new(Union::new_from_subselects(&keys_projections)),
"fk_aggregate_keys_source".to_string(),
);
let references_builder = ReferencesBuilder::new(keys_from.clone());
let mut outer = SelectBuilder::new(keys_from);
for dim in key_dims.iter() {
let alias = references_builder
.resolve_alias_for_member(dim, &None)
.ok_or_else(|| {
CubeError::internal(format!(
"Source for {} not found in full key aggregate subqueries",
dim.full_name()
))
})?;
outer.add_projection_member_reference(dim, QualifiedColumnName::new(None, alias));
}
outer.set_distinct();
Rc::new(outer.build(query_tools.clone(), SqlNodesFactory::new()))
};

let mut join_builder =
JoinBuilder::new_from_subselect(keys_select.clone(), keys_alias.clone());

for (i, query) in data_queries.into_iter().enumerate() {
let query_alias = format!("q_{}", i);
let conditions = full_key_aggregate
.schema()
.all_dimensions()
for (idx, (query, query_logical_schema)) in data_queries.into_iter().enumerate() {
let query_alias = format!("q_{}", idx);
// JOIN-keys are the dims present in both sides. With explicit
// keys-side this collapses to the measure ref's partition grain;
// without it the measure ref already has all key_dims.
let conditions = key_dims
.iter()
.filter(|d| Self::dim_in_schema(query_logical_schema.as_ref(), d))
.map(|dim| -> Result<_, CubeError> {
let alias_in_keys_query = keys_select.schema().resolve_member_alias(dim);
let keys_query_ref = Expr::Reference(QualifiedColumnName::new(
let alias_in_keys = keys_select.schema().resolve_member_alias(dim);
let keys_ref_expr = Expr::Reference(QualifiedColumnName::new(
Some(keys_alias.clone()),
alias_in_keys_query,
alias_in_keys,
));
let alias_in_data_query = query.schema().resolve_member_alias(dim);
let data_query_ref = Expr::Reference(QualifiedColumnName::new(
let alias_in_data = query.schema().resolve_member_alias(dim);
let data_ref_expr = Expr::Reference(QualifiedColumnName::new(
Some(query_alias.clone()),
alias_in_data_query,
alias_in_data,
));

Ok(vec![(keys_query_ref, data_query_ref)])
Ok(vec![(keys_ref_expr, data_ref_expr)])
})
.collect::<Result<Vec<_>, _>>()?;

// Null-safe dimension join when keys are derived from the
// measure refs (FULL JOIN-equivalent shape). For the JOIN-model
// path (explicit keys) it's a one-to-one match — null-safety is
// unnecessary but stays correct.
join_builder.left_join_subselect(
query,
query_alias.clone(),
JoinCondition::new_dimension_join(conditions, true),
query_alias,
JoinCondition::new_dimension_join(conditions, !has_explicit_keys),
);
}

let result = join_builder.build();
Ok(From::new_from_join(result))
Ok(From::new_from_join(join_builder.build()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ impl<'a> LogicalNodeProcessor<'a, FullKeyAggregate> for FullKeyAggregateProcesso
context: &PushDownBuilderContext,
) -> Result<Self::PhysycalNode, CubeError> {
let strategy: Rc<dyn FullKeyAggregateStrategy> =
if !full_key_aggregate.schema().has_dimensions() {
if full_key_aggregate.keys_input().is_some() {
// JOIN-model: keys side comes from the logical plan.
// `KeysFullKeyAggregateStrategy` handles both the explicit
// `keys_input` shape and the derived-from-measure-refs shape.
KeysFullKeyAggregateStrategy::new(self.builder)
} else if !full_key_aggregate.schema().has_dimensions() {
InnerJoinFullKeyAggregateStrategy::new(self.builder)
} else if self.builder.templates().supports_full_join() {
FullJoinFullKeyAggregateStrategy::new(self.builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ pub struct MultiStageInodeMember {
add_group_by: Vec<Rc<MemberSymbol>>,
group_by: Option<Vec<Rc<MemberSymbol>>>,
time_shift: Option<MeasureTimeShifts>,
/// Optimisation flag: this Aggregate inode is a safe candidate for
/// the `window`-based render — single measure dep, additive identity
/// rollup, no leaf-extending modifiers. When `true`, assembly skips
/// the JOIN-model and `member_query_planner` emits a window function.
/// Default `false`.
use_window_path: bool,
}

impl MultiStageInodeMember {
Expand All @@ -139,9 +145,19 @@ impl MultiStageInodeMember {
add_group_by,
group_by,
time_shift,
use_window_path: false,
}
}

pub fn with_use_window_path(mut self, value: bool) -> Self {
self.use_window_path = value;
self
}

pub fn use_window_path(&self) -> bool {
self.use_window_path
}

pub fn inode_type(&self) -> &MultiStageInodeMemberType {
&self.inode_type
}
Expand Down
Loading
Loading