diff --git a/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/filter_nulls_test.rs b/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/filter_nulls_test.rs index bed0e3ebb6b65..ef887cf09ce70 100644 --- a/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/filter_nulls_test.rs +++ b/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/filter_nulls_test.rs @@ -114,6 +114,7 @@ fn scan_with_stats(table_index: usize, column: Symbol, null_count: u64) -> SExpr statistics: Statistics { precise_cardinality: None, column_stats: HashMap::from([(column, column_stat(null_count))]), + cluster_key_stats: Default::default(), }, })), ) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index c061126e318cd..6a779251d891e 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -36,6 +36,7 @@ static COST_FACTOR_COMPUTE_PER_ROW: u64 = 1; static COST_FACTOR_HASH_TABLE_PER_ROW: u64 = 10; static COST_FACTOR_AGGREGATE_PER_ROW: u64 = 5; static COST_FACTOR_NETWORK_PER_ROW: u64 = 50; +static COST_FACTOR_CLUSTER_KEY: u64 = 0; // Settings for readability and writability of tags. // we will not be able to safely get its value when set to only write. @@ -1251,6 +1252,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("cost_factor_cluster_key", DefaultSettingValue { + value: UserSettingValue::UInt64(COST_FACTOR_CLUSTER_KEY), + desc: "Cost factor percentage for clustered keys in join ordering. Set to 0 to use the original join-order cost formula.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=100)), + }), // This setting has been deprecated, retained to prevent set errors. ("enable_geo_create_table", DefaultSettingValue { value: UserSettingValue::UInt64(1), diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 46ade6fc0743c..c80f053137c47 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -943,6 +943,10 @@ impl Settings { self.try_get_u64("cost_factor_network_per_row") } + pub fn get_cost_factor_cluster_key(&self) -> Result { + self.try_get_u64("cost_factor_cluster_key") + } + pub fn get_idle_transaction_timeout_secs(&self) -> Result { self.try_get_u64("idle_transaction_timeout_secs") } diff --git a/src/query/sql/src/planner/expression/expression_parser.rs b/src/query/sql/src/planner/expression/expression_parser.rs index 3180bfa70fe47..14de9e013e8de 100644 --- a/src/query/sql/src/planner/expression/expression_parser.rs +++ b/src/query/sql/src/planner/expression/expression_parser.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_ast::ast::Expr as AExpr; @@ -25,6 +26,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::ColumnId; +use databend_common_expression::ColumnIndex; use databend_common_expression::Constant; use databend_common_expression::DataSchemaRef; use databend_common_expression::Expr; @@ -454,9 +456,39 @@ pub fn analyze_cluster_keys( ctx: Arc, table_meta: Arc, sql: &str, -) -> Result<(String, Vec>)> { +) -> Result<(String, Vec>)> { + analyze_cluster_keys_impl( + ctx, + table_meta, + sql, + |_, column| Ok(column.as_field_index()), + ) +} + +pub fn analyze_cluster_key_order( + ctx: Arc, + table_meta: Arc, + sql: &str, + column_id_to_symbol: &HashMap, +) -> Result>> { + let (_, exprs) = analyze_cluster_keys_impl(ctx, table_meta, sql, |column_id, _| { + column_id_to_symbol + .get(&column_id) + .copied() + .ok_or_else(|| ErrorCode::Internal("Cluster key column should exist in table metadata")) + })?; + Ok(exprs) +} + +fn analyze_cluster_keys_impl( + ctx: Arc, + table_meta: Arc, + sql: &str, + mut project_column: impl FnMut(ColumnId, Symbol) -> Result, +) -> Result<(String, Vec>)> { let ast_exprs = parse_cluster_key_exprs(sql)?; let (mut bind_context, metadata) = bind_table(table_meta)?; + let metadata_ref = metadata.clone(); let name_resolution_ctx = NameResolutionContext::try_from(ctx.get_settings().as_ref())?; let mut type_checker = TypeChecker::try_create( &mut bind_context, @@ -478,13 +510,30 @@ pub fn analyze_cluster_keys( let mut cluster_keys = Vec::with_capacity(exprs.len()); for ast in ast_exprs { let (scalar, _) = *type_checker.resolve(&ast)?; - if scalar.used_columns().len() != 1 || !scalar.evaluable() { + let used_columns = scalar.used_columns(); + if used_columns.len() != 1 || !scalar.evaluable() { return Err(ErrorCode::InvalidClusterKeys(format!( "Cluster by expression `{:#}` is invalid", ast ))); } + let column = *used_columns + .iter() + .next() + .expect("cluster key should use one column"); + let column_id = { + let metadata = metadata_ref.read(); + let ColumnEntry::BaseTableColumn(BaseTableColumn { column_id, .. }) = + metadata.column(column) + else { + return Err(ErrorCode::InvalidClusterKeys(format!( + "Cluster by expression `{:#}` is invalid", + ast + ))); + }; + *column_id + }; let expr = scalar.as_symbol_expr()?; if !expr.is_deterministic(&BUILTIN_FUNCTIONS) { return Err(ErrorCode::InvalidClusterKeys(format!( @@ -501,6 +550,8 @@ pub fn analyze_cluster_keys( ))); } + let target_column = project_column(column_id, column)?; + let expr = expr.project_column_ref(|_| Ok(target_column))?; exprs.push(expr); let mut cluster_by = ast.clone(); diff --git a/src/query/sql/src/planner/optimizer/ir/mod.rs b/src/query/sql/src/planner/optimizer/ir/mod.rs index e43155b313043..db3cbcc3bcc93 100644 --- a/src/query/sql/src/planner/optimizer/ir/mod.rs +++ b/src/query/sql/src/planner/optimizer/ir/mod.rs @@ -32,6 +32,7 @@ pub use expr::VisitAction; pub use group::Group; pub use group::GroupState; pub use memo::Memo; +pub use property::ClusterKeyStatistics; pub use property::Distribution; pub use property::DistributionEnforcer; pub use property::Enforcer; diff --git a/src/query/sql/src/planner/optimizer/ir/property/mod.rs b/src/query/sql/src/planner/optimizer/ir/property/mod.rs index e8c0eba46944f..070343ef731a4 100644 --- a/src/query/sql/src/planner/optimizer/ir/property/mod.rs +++ b/src/query/sql/src/planner/optimizer/ir/property/mod.rs @@ -21,6 +21,7 @@ pub use builder::RelExpr; pub use enforcer::DistributionEnforcer; pub use enforcer::Enforcer; pub use enforcer::PropertyEnforcer; +pub use property::ClusterKeyStatistics; pub use property::Distribution; pub use property::PhysicalProperty; pub use property::RelationalProperty; diff --git a/src/query/sql/src/planner/optimizer/ir/property/property.rs b/src/query/sql/src/planner/optimizer/ir/property/property.rs index dec3c3866a402..0fa034eb5e1cf 100644 --- a/src/query/sql/src/planner/optimizer/ir/property/property.rs +++ b/src/query/sql/src/planner/optimizer/ir/property/property.rs @@ -12,10 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::fmt::Display; use std::fmt::Formatter; +use databend_common_exception::Result; +use databend_common_expression::Expr; + use crate::ColumnSet; +use crate::IndexType; +use crate::Symbol; use crate::optimizer::ir::ColumnStatSet; use crate::plans::ScalarExpr; use crate::plans::ScalarItem; @@ -38,6 +44,58 @@ impl Display for RequiredProperty { } } +#[derive(Default, Clone, Debug)] +pub struct ClusterKeyStatistics { + /// Table index -> cluster-key expressions in that table's cluster-key order. + pub keys: BTreeMap>>, + /// Expressions filtered by equality with constants. + pub filter_keys: Vec>, +} + +impl ClusterKeyStatistics { + pub fn collect_filter_keys<'a>( + predicates: impl IntoIterator, + ) -> Result>> { + let mut filter_keys = Vec::new(); + for predicate in predicates { + Self::collect_equality_filter_key(predicate, &mut filter_keys)?; + } + Ok(filter_keys) + } + + fn collect_equality_filter_key( + predicate: &ScalarExpr, + filter_keys: &mut Vec>, + ) -> Result<()> { + let mut stack = vec![predicate]; + while let Some(predicate) = stack.pop() { + let ScalarExpr::FunctionCall(function) = predicate else { + continue; + }; + + match function.func_name.as_str() { + "and" | "and_filters" => { + stack.extend(function.arguments.iter().rev()); + } + "eq" if function.arguments.len() == 2 => { + let left = &function.arguments[0]; + let right = &function.arguments[1]; + match ( + left.used_columns().is_empty(), + right.used_columns().is_empty(), + ) { + (true, false) => filter_keys.push(right.as_symbol_expr()?), + (false, true) => filter_keys.push(left.as_symbol_expr()?), + _ => {} + } + } + _ => {} + } + } + Ok(()) + } +} + #[derive(Default, Clone, Debug)] pub struct Statistics { // We can get the precise row count of a table in databend, @@ -45,6 +103,7 @@ pub struct Statistics { pub precise_cardinality: Option, /// Statistics of columns, column index -> column stat pub column_stats: ColumnStatSet, + pub cluster_key_stats: ClusterKeyStatistics, } #[derive(Default, Clone, Debug)] diff --git a/src/query/sql/src/planner/optimizer/ir/stats/join.rs b/src/query/sql/src/planner/optimizer/ir/stats/join.rs index 6f4e76e10f5bd..c734140ae8a60 100644 --- a/src/query/sql/src/planner/optimizer/ir/stats/join.rs +++ b/src/query/sql/src/planner/optimizer/ir/stats/join.rs @@ -714,6 +714,7 @@ mod tests { })), }), ]), + cluster_key_stats: Default::default(), }; JoinKeyStatUpdate::finish_join_histograms(&mut statistics, Symbol::new(0), true)?; @@ -758,6 +759,7 @@ mod tests { })), }), ]), + cluster_key_stats: Default::default(), }; let mut statistics = original_statistics.clone(); let join_stat = statistics.column_stats.get_mut(&Symbol::new(0)).unwrap(); diff --git a/src/query/sql/src/planner/optimizer/mod.rs b/src/query/sql/src/planner/optimizer/mod.rs index ace3f3e86a213..eb68bd2c2a822 100644 --- a/src/query/sql/src/planner/optimizer/mod.rs +++ b/src/query/sql/src/planner/optimizer/mod.rs @@ -28,3 +28,4 @@ pub use optimizer::optimize; pub use optimizer::optimize_query; pub use optimizer_api::Optimizer; pub use optimizer_context::OptimizerContext; +pub use statistics::CollectStatisticsOptimizer; diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 4354444d1185f..9d8c4d8a5c54d 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -252,7 +252,7 @@ pub async fn optimize_query(opt_ctx: Arc, s_expr: SExpr) -> Re // Apply statistics aggregation to gather and propagate statistics .add(RuleStatsAggregateOptimizer::new(opt_ctx.clone())) // Collect statistics for SExpr nodes to support cost estimation - .add(CollectStatisticsOptimizer::new(opt_ctx.clone())) + .add(CollectStatisticsOptimizer::new(opt_ctx.clone())?) // Normalize aggregate, it should be executed before RuleSplitAggregate. .add(RuleNormalizeAggregateOptimizer::new()) // Pull up and infer filter. @@ -369,7 +369,7 @@ async fn get_optimized_memo(opt_ctx: Arc, s_expr: SExpr) -> Re .add(SubqueryDecorrelatorOptimizer::new(opt_ctx.clone(), None)) .add(RuleStatsAggregateOptimizer::new(opt_ctx.clone())) // Collect statistics for each leaf node in SExpr. - .add(CollectStatisticsOptimizer::new(opt_ctx.clone())) + .add(CollectStatisticsOptimizer::new(opt_ctx.clone())?) // Pull up and infer filter. .add(PullUpFilterOptimizer::new(opt_ctx.clone())) // Run default rewrite rules diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs index 8e1abb1ec4ae3..8e6d74620b11c 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -20,12 +21,15 @@ use databend_common_base::runtime::spawn; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::Expr; use crate::IndexType; use crate::MetadataRef; use crate::ScalarExpr; +use crate::Symbol; use crate::optimizer::Optimizer; use crate::optimizer::OptimizerContext; +use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::SExpr; use crate::optimizer::optimizers::hyper_dp::JoinNode; use crate::optimizer::optimizers::hyper_dp::JoinRelation; @@ -57,6 +61,7 @@ pub struct DPhpyOptimizer { filters: HashSet, // The number of times emit_csg_cmp is called emit_count: usize, + cluster_key_cost: ClusterKeyCostModel, } impl DPhpyOptimizer { @@ -70,6 +75,7 @@ impl DPhpyOptimizer { relation_set_tree: Default::default(), filters: HashSet::new(), emit_count: 0, + cluster_key_cost: ClusterKeyCostModel::default(), } } @@ -475,6 +481,12 @@ impl DPhpyOptimizer { self.opt_ctx.set_flag("dphyp_optimized", true); return Ok(s_expr.as_ref().clone()); } + self.cluster_key_cost = ClusterKeyCostModel::new( + self.opt_ctx + .get_table_ctx() + .get_settings() + .get_cost_factor_cluster_key()?, + ); // Second, use `join_conditions` to create edges in `query_graph` if !self.build_query_graph(&join_conditions)? { @@ -805,22 +817,25 @@ impl DPhpyOptimizer { left: &[IndexType], right: &[IndexType], join_conditions: Vec<(ScalarExpr, ScalarExpr)>, - left_cardinality: f64, - right_cardinality: f64, - left_join: JoinNode, - right_join: JoinNode, + probe_cardinality: f64, + build_cardinality: f64, + probe_join: JoinNode, + build_join: JoinNode, ) -> Result { - let parent_set = union(left, right); - if !join_conditions.is_empty() { + let probe_factor = if self.cluster_key_cost.enabled() { + self.cluster_key_cost.probe_join_factor( + &probe_join, + &self.join_relations, + &join_conditions, + )? + } else { + 1.0 + }; let mut join_node = JoinNode { join_type: JoinType::Inner, - leaves: Arc::new(parent_set.clone()), - children: if left_cardinality < right_cardinality { - Arc::new(vec![right_join, left_join]) - } else { - Arc::new(vec![left_join, right_join]) - }, + leaves: Arc::new(union(left, right)), + children: Arc::new(vec![probe_join, build_join]), cost: 0.0, join_conditions: Arc::new(join_conditions), cardinality: None, @@ -828,7 +843,7 @@ impl DPhpyOptimizer { }; // Calculate cost for inner join - let cost = join_node.cardinality(&self.join_relations).await? + let cost = join_node.cardinality(&self.join_relations).await? * probe_factor + join_node.children[0].cost + join_node.children[1].cost; @@ -839,13 +854,9 @@ impl DPhpyOptimizer { // Create cross join let join_node = JoinNode { join_type: JoinType::Cross, - leaves: Arc::new(parent_set.clone()), - children: if left_cardinality < right_cardinality { - Arc::new(vec![right_join, left_join]) - } else { - Arc::new(vec![left_join, right_join]) - }, - cost: left_cardinality * right_cardinality, + leaves: Arc::new(union(left, right)), + children: Arc::new(vec![probe_join, build_join]), + cost: probe_cardinality * build_cardinality, join_conditions: Arc::new(vec![]), cardinality: None, s_expr: None, @@ -872,12 +883,32 @@ impl DPhpyOptimizer { let left_cardinality = left_join.cardinality(&self.join_relations).await?; let right_cardinality = right_join.cardinality(&self.join_relations).await?; - // Swap join conditions if left cardinality is smaller - if left_cardinality < right_cardinality { - for join_condition in join_conditions.iter_mut() { - std::mem::swap(&mut join_condition.0, &mut join_condition.1); - } - } + let (probe_cardinality, build_cardinality, probe_join, build_join) = + if self.cluster_key_cost.enabled() { + let left_filter_factor = self + .cluster_key_cost + .filter_factor(&left_join, &self.join_relations)?; + let right_filter_factor = self + .cluster_key_cost + .filter_factor(&right_join, &self.join_relations)?; + + // Use the cheaper side as build input after cluster-key filter discount. + if left_cardinality * left_filter_factor < right_cardinality * right_filter_factor { + for join_condition in join_conditions.iter_mut() { + std::mem::swap(&mut join_condition.0, &mut join_condition.1); + } + (right_cardinality, left_cardinality, right_join, left_join) + } else { + (left_cardinality, right_cardinality, left_join, right_join) + } + } else if left_cardinality < right_cardinality { + for join_condition in join_conditions.iter_mut() { + std::mem::swap(&mut join_condition.0, &mut join_condition.1); + } + (right_cardinality, left_cardinality, right_join, left_join) + } else { + (left_cardinality, right_cardinality, left_join, right_join) + }; // Create join node let join_node = self @@ -885,10 +916,10 @@ impl DPhpyOptimizer { left, right, join_conditions, - left_cardinality, - right_cardinality, - left_join, - right_join, + probe_cardinality, + build_cardinality, + probe_join, + build_join, ) .await?; @@ -1142,6 +1173,120 @@ impl DPhpyOptimizer { } } +#[derive(Clone, Default)] +struct ClusterKeyCostModel { + enabled: bool, + factor: f64, +} + +impl ClusterKeyCostModel { + fn new(factor_percent: u64) -> Self { + Self { + enabled: factor_percent > 0, + factor: factor_percent as f64 / 100.0, + } + } + + fn enabled(&self) -> bool { + self.enabled + } + + fn probe_join_factor( + &self, + probe_node: &JoinNode, + join_relations: &[JoinRelation], + join_conditions: &[(ScalarExpr, ScalarExpr)], + ) -> Result { + let probe_join_keys = { + let mut probe_join_keys = Vec::with_capacity(join_conditions.len()); + for (probe_key, _) in join_conditions { + probe_join_keys.push(probe_key.as_symbol_expr()?); + } + probe_join_keys + }; + if probe_join_keys.is_empty() { + return Ok(1.0); + } + + let s_expr = probe_node.s_expr(join_relations); + let stat_info = RelExpr::with_s_expr(&s_expr).derive_cardinality()?; + Ok(self + .best_cluster_key_candidate( + &stat_info.statistics.cluster_key_stats.keys, + &probe_join_keys, + ) + .unwrap_or(1.0)) + } + + fn filter_factor(&self, node: &JoinNode, join_relations: &[JoinRelation]) -> Result { + let s_expr = node.s_expr(join_relations); + let stat_info = RelExpr::with_s_expr(&s_expr).derive_cardinality()?; + let filter_keys = &stat_info.statistics.cluster_key_stats.filter_keys; + if filter_keys.is_empty() { + return Ok(1.0); + } + + Ok(self + .best_cluster_key_candidate(&stat_info.statistics.cluster_key_stats.keys, filter_keys) + .unwrap_or(1.0)) + } + + fn best_cluster_key_candidate( + &self, + cluster_keys: &BTreeMap>>, + candidate_keys: &[Expr], + ) -> Option { + cluster_keys + .iter() + .filter_map(|(_, cluster_key)| { + let factor = self.cluster_key_prefix_cost_factor(cluster_key, candidate_keys); + (factor < 1.0).then_some(factor) + }) + .min_by(|left, right| left.total_cmp(right)) + } + + fn cluster_key_prefix_cost_factor( + &self, + cluster_key: &[Expr], + candidate_keys: &[Expr], + ) -> f64 { + let mut matched_join_keys = vec![false; candidate_keys.len()]; + let mut factor = 1.0; + + for cluster_key_expr in cluster_key { + let mut matched = false; + for (idx, join_key_expr) in candidate_keys.iter().enumerate() { + if matched_join_keys[idx] { + continue; + } + if Self::cluster_key_matches(cluster_key_expr, join_key_expr) { + factor *= self.factor; + matched_join_keys[idx] = true; + matched = true; + break; + } + } + if !matched { + return factor; + } + } + + factor + } + + fn cluster_key_matches(cluster_key_expr: &Expr, join_key_expr: &Expr) -> bool { + match cluster_key_expr { + Expr::ColumnRef(cluster_key) + if let Expr::ColumnRef(join_key) = join_key_expr + && cluster_key.id == join_key.id => + { + true + } + _ => cluster_key_expr == join_key_expr, + } + } +} + #[async_trait::async_trait] impl Optimizer for DPhpyOptimizer { fn name(&self) -> String { diff --git a/src/query/sql/src/planner/optimizer/optimizers/sync_materialized_cte_ref.rs b/src/query/sql/src/planner/optimizer/optimizers/sync_materialized_cte_ref.rs index 9241475c12390..3dc1a4806ad52 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/sync_materialized_cte_ref.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/sync_materialized_cte_ref.rs @@ -88,6 +88,7 @@ impl SyncMaterializedCTERefOptimizer { statistics: Statistics { precise_cardinality: producer_stat_info.statistics.precise_cardinality, column_stats, + cluster_key_stats: Default::default(), }, }) } diff --git a/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs b/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs index 67c2b211a544b..6dca2ef91d255 100644 --- a/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs +++ b/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs @@ -26,6 +26,7 @@ use crate::BaseTableColumn; use crate::ColumnEntry; use crate::MetadataRef; use crate::ScalarExpr; +use crate::analyze_cluster_key_order; use crate::optimizer::Optimizer; use crate::optimizer::OptimizerContext; use crate::optimizer::ir::SExpr; @@ -39,14 +40,20 @@ use crate::plans::Statistics; pub struct CollectStatisticsOptimizer { table_ctx: Arc, metadata: MetadataRef, + collect_cluster_keys: bool, } impl CollectStatisticsOptimizer { - pub fn new(opt_ctx: Arc) -> Self { - CollectStatisticsOptimizer { + pub fn new(opt_ctx: Arc) -> Result { + Ok(CollectStatisticsOptimizer { + collect_cluster_keys: opt_ctx + .get_table_ctx() + .get_settings() + .get_cost_factor_cluster_key()? + > 0, table_ctx: opt_ctx.get_table_ctx(), metadata: opt_ctx.get_metadata(), - } + }) } pub async fn optimize_async(&mut self, s_expr: &SExpr) -> Result { @@ -73,6 +80,7 @@ impl CollectStatisticsOptimizer { let mut column_stats = HashMap::new(); let mut histograms = HashMap::new(); + let mut column_id_to_symbol = HashMap::new(); for column in columns.iter() { if let ColumnEntry::BaseTableColumn(BaseTableColumn { column_index, @@ -81,6 +89,7 @@ impl CollectStatisticsOptimizer { .. }) = column { + column_id_to_symbol.insert(*column_id, *column_index); if virtual_expr.is_none() { let col_stat = column_statistics_provider .column_statistics(*column_id as ColumnId); @@ -91,12 +100,32 @@ impl CollectStatisticsOptimizer { } } } + let cluster_key_order = if self.collect_cluster_keys + && let Some((_, cluster_key)) = table.cluster_key_meta() + { + analyze_cluster_key_order( + self.table_ctx.clone(), + table.clone(), + &cluster_key, + &column_id_to_symbol, + )? + } else { + Default::default() + }; + let cluster_keys = if cluster_key_order.is_empty() { + Default::default() + } else { + [(scan.table_index, cluster_key_order)] + .into_iter() + .collect() + }; let mut scan = scan.clone(); scan.statistics = Arc::new(Statistics { table_stats, column_stats, histograms, + cluster_keys, }); let mut s_expr = s_expr.replace_plan(Arc::new(RelOperator::Scan(scan.clone()))); if let Some(sample) = &scan.sample { diff --git a/src/query/sql/src/planner/plans/aggregate.rs b/src/query/sql/src/planner/plans/aggregate.rs index bcb6a27e23fcb..ec65b5c12448b 100644 --- a/src/query/sql/src/planner/plans/aggregate.rs +++ b/src/query/sql/src/planner/plans/aggregate.rs @@ -156,6 +156,7 @@ impl Aggregate { statistics: Statistics { precise_cardinality: Some(1), column_stats: column_stats.clone(), + cluster_key_stats: Default::default(), }, })); } @@ -173,6 +174,7 @@ impl Aggregate { statistics: Statistics { precise_cardinality: None, column_stats: column_stats.clone(), + cluster_key_stats: Default::default(), }, })); } @@ -227,6 +229,7 @@ impl Aggregate { statistics: Statistics { precise_cardinality: None, column_stats, + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/src/planner/plans/constant_table_scan.rs b/src/query/sql/src/planner/plans/constant_table_scan.rs index f45c52c32ff29..a6b513ac0576c 100644 --- a/src/query/sql/src/planner/plans/constant_table_scan.rs +++ b/src/query/sql/src/planner/plans/constant_table_scan.rs @@ -235,6 +235,7 @@ impl Operator for ConstantTableScan { statistics: Statistics { precise_cardinality: Some(self.num_rows as u64), column_stats, + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/src/planner/plans/dummy_table_scan.rs b/src/query/sql/src/planner/plans/dummy_table_scan.rs index ca6516b9282c2..07e1154cd9a62 100644 --- a/src/query/sql/src/planner/plans/dummy_table_scan.rs +++ b/src/query/sql/src/planner/plans/dummy_table_scan.rs @@ -143,6 +143,7 @@ impl Operator for DummyTableScan { statistics: Statistics { precise_cardinality: Some(1), column_stats: Default::default(), + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/src/planner/plans/expression_scan.rs b/src/query/sql/src/planner/plans/expression_scan.rs index 12e3642e42690..e633fea60ddb6 100644 --- a/src/query/sql/src/planner/plans/expression_scan.rs +++ b/src/query/sql/src/planner/plans/expression_scan.rs @@ -96,6 +96,7 @@ impl Operator for ExpressionScan { statistics: Statistics { precise_cardinality: None, column_stats: Default::default(), + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/src/planner/plans/filter.rs b/src/query/sql/src/planner/plans/filter.rs index 48b5669b4663c..979fa7abfb810 100644 --- a/src/query/sql/src/planner/plans/filter.rs +++ b/src/query/sql/src/planner/plans/filter.rs @@ -19,6 +19,7 @@ use databend_common_exception::Result; use databend_common_expression::stat_distribution::StatCardinality; use crate::ColumnSet; +use crate::optimizer::ir::ClusterKeyStatistics; use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::RelationalProperty; use crate::optimizer::ir::SelectivityEstimator; @@ -102,11 +103,16 @@ impl Operator for Filter { } else { sb.into_column_stats() }; + let mut cluster_key_stats = stat_info.statistics.cluster_key_stats.clone(); + cluster_key_stats + .filter_keys + .extend(ClusterKeyStatistics::collect_filter_keys(&self.predicates)?); Ok(Arc::new(StatInfo { cardinality, statistics: Statistics { precise_cardinality: None, column_stats, + cluster_key_stats, }, })) } diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index 1a6ef37dca183..c08c882a5974c 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -510,6 +510,10 @@ impl Join { let inner_join_cardinality = join_estimation.join_card(); let cardinality = self.join_cardinality(left_cardinality, right_cardinality, inner_join_cardinality); + // Hash join output follows the probe side. Build-side clustering is not + // preserved by hash table lookups, even though build-side columns remain + // available in the joined rows. + let cluster_key_stats = left_statistics.cluster_key_stats.clone(); if let Some(columns) = join_estimation.updated_columns() { match self.join_type { JoinType::LeftSemi => { @@ -572,6 +576,7 @@ impl Join { statistics: Statistics { precise_cardinality: None, column_stats, + cluster_key_stats, }, })) } @@ -1008,6 +1013,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut right_statistics = Statistics { precise_cardinality: None, @@ -1018,6 +1024,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut estimator = JoinStatsEstimator::new(4.0, 3.0, true); let condition = JoinEquiCondition::new( @@ -1056,6 +1063,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut right_statistics = Statistics { precise_cardinality: None, @@ -1066,6 +1074,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut estimator = JoinStatsEstimator::new(3.0, 2.3333333333333335, true); let condition = JoinEquiCondition::new( @@ -1104,6 +1113,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut right_statistics = Statistics { precise_cardinality: None, @@ -1114,6 +1124,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut estimator = JoinStatsEstimator::new(3.0, 3.0, true); let condition = JoinEquiCondition::new( @@ -1152,6 +1163,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut right_statistics = Statistics { precise_cardinality: None, @@ -1162,6 +1174,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut estimator = JoinStatsEstimator::new(4.0, 3.0, true); let condition = JoinEquiCondition::new( @@ -1201,6 +1214,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }, }); let right_stat_info = Arc::new(StatInfo { @@ -1214,6 +1228,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }, }); let join = Join { @@ -1247,6 +1262,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut right_statistics = Statistics { precise_cardinality: None, @@ -1257,6 +1273,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let mut estimator = JoinStatsEstimator::new(4.0, 3.0, true); let condition = JoinEquiCondition::new( @@ -1298,6 +1315,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let right_statistics = Statistics { precise_cardinality: None, @@ -1308,6 +1326,7 @@ mod tests { null_count: StatCount::exact(1), histogram: None, })]), + cluster_key_stats: Default::default(), }; let estimator = JoinStatsEstimator::new(4.0, 3.0, true); let condition = JoinEquiCondition::new( diff --git a/src/query/sql/src/planner/plans/limit.rs b/src/query/sql/src/planner/plans/limit.rs index 92a00b1d34738..07a67da5c81c4 100644 --- a/src/query/sql/src/planner/plans/limit.rs +++ b/src/query/sql/src/planner/plans/limit.rs @@ -53,6 +53,7 @@ impl Limit { statistics: Statistics { precise_cardinality, column_stats: Default::default(), + cluster_key_stats: stat_info.statistics.cluster_key_stats.clone(), }, })) } diff --git a/src/query/sql/src/planner/plans/mutation_source.rs b/src/query/sql/src/planner/plans/mutation_source.rs index d75c0e5c85edf..5b1806b2e3b62 100644 --- a/src/query/sql/src/planner/plans/mutation_source.rs +++ b/src/query/sql/src/planner/plans/mutation_source.rs @@ -105,6 +105,7 @@ impl Operator for MutationSource { statistics: OpStatistics { precise_cardinality: None, column_stats: Default::default(), + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index cc5e27d3336b0..b700490393e17 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; @@ -23,6 +24,7 @@ use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::Expr; use databend_common_expression::TableSchemaRef; use databend_common_expression::stat_distribution::NdvEstimate; use databend_common_expression::stat_distribution::StatCardinality; @@ -35,6 +37,7 @@ use super::ScalarItem; use crate::ColumnSet; use crate::IndexType; use crate::Symbol; +use crate::optimizer::ir::ClusterKeyStatistics; use crate::optimizer::ir::ColumnStat; use crate::optimizer::ir::ColumnStatSet; use crate::optimizer::ir::Distribution; @@ -91,6 +94,8 @@ pub struct Statistics { // statistics will be ignored in comparison and hashing pub column_stats: HashMap>, pub histograms: HashMap>, + // table index -> cluster-key expressions in that table's cluster-key order + pub cluster_keys: BTreeMap>>, } #[derive(Clone, Debug, Default)] @@ -138,6 +143,15 @@ impl Scan { .filter(|(col, _)| columns.contains(*col)) .map(|(col, hist)| (*col, hist.clone())) .collect(); + let cluster_keys = self + .statistics + .cluster_keys + .iter() + .filter_map(|(table_index, cluster_key_order)| { + Self::cluster_key_order_refs_columns(cluster_key_order, &columns) + .then_some((*table_index, cluster_key_order.clone())) + }) + .collect(); Scan { table_index: self.table_index, @@ -150,6 +164,7 @@ impl Scan { table_stats: self.statistics.table_stats, column_stats, histograms, + cluster_keys, }), prewhere, agg_index: self.agg_index.clone(), @@ -233,6 +248,15 @@ impl Scan { .iter() .any(|secure_predicate| !prewhere.predicates.contains(secure_predicate)) } + + fn cluster_key_order_refs_columns( + cluster_key_order: &[Expr], + columns: &ColumnSet, + ) -> bool { + cluster_key_order + .iter() + .any(|expr| expr.column_refs().keys().any(|col| columns.contains(col))) + } } fn derive_scan_ndv(ndv: Option, null_count: u64, num_rows: Option) -> NdvEstimate { @@ -379,6 +403,15 @@ impl Operator for Scan { }); } } + let cluster_keys = self + .statistics + .cluster_keys + .iter() + .filter_map(|(table_index, cluster_key_order)| { + Self::cluster_key_order_refs_columns(cluster_key_order, &used_columns) + .then_some((*table_index, cluster_key_order.clone())) + }) + .collect(); let precise_cardinality = self .statistics @@ -407,6 +440,16 @@ impl Operator for Scan { } else { None }; + let cluster_key_stats = ClusterKeyStatistics { + keys: cluster_keys, + filter_keys: ClusterKeyStatistics::collect_filter_keys( + self.push_down_predicates.iter().flatten().chain( + self.prewhere + .iter() + .flat_map(|prewhere| prewhere.predicates.iter()), + ), + )?, + }; // SECURITY: When row access policy is active, apply selectivity from // secure predicates first (for reasonable cardinality estimation and @@ -427,6 +470,7 @@ impl Operator for Scan { statistics: OpStatistics { precise_cardinality: None, column_stats: Default::default(), + cluster_key_stats, }, })); } @@ -436,6 +480,7 @@ impl Operator for Scan { statistics: OpStatistics { precise_cardinality, column_stats, + cluster_key_stats, }, })) } diff --git a/src/query/sql/src/planner/plans/union_all.rs b/src/query/sql/src/planner/plans/union_all.rs index 880b2ef1762a0..98e06b2ef005c 100644 --- a/src/query/sql/src/planner/plans/union_all.rs +++ b/src/query/sql/src/planner/plans/union_all.rs @@ -80,6 +80,7 @@ impl UnionAll { statistics: Statistics { precise_cardinality, column_stats: Default::default(), + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/test-support/src/optimizer/mod.rs b/src/query/sql/test-support/src/optimizer/mod.rs index 17c3b59b74b0a..1d628888f23d2 100644 --- a/src/query/sql/test-support/src/optimizer/mod.rs +++ b/src/query/sql/test-support/src/optimizer/mod.rs @@ -481,6 +481,7 @@ impl SExprVisitor for StatsApplier<'_> { table_stats: Some(table_stats), column_stats, histograms, + cluster_keys: Default::default(), }); return Ok(VisitAction::Replace( diff --git a/src/query/sql/tests/it/framework/lite_context.rs b/src/query/sql/tests/it/framework/lite_context.rs index 53e5100e3a6f1..b566832b170e5 100644 --- a/src/query/sql/tests/it/framework/lite_context.rs +++ b/src/query/sql/tests/it/framework/lite_context.rs @@ -749,6 +749,7 @@ impl LiteTableContext { table_stats: Option, column_stats: ColumnStatsMap, histograms: HistogramStatsMap, + cluster_key: Option, options: BTreeMap, ) -> Result> { let schema = Arc::new(TableSchema::new(fields)); @@ -779,6 +780,7 @@ impl LiteTableContext { name: table_name.to_string(), meta: TableMeta { schema, + cluster_key_v2: cluster_key.map(|key| (0, key)), options, ..Default::default() }, @@ -942,6 +944,7 @@ impl LiteTableContext { table_stats, column_stats, histograms, + None, options, )?; self.default_catalog.insert_table(database, table); @@ -1088,16 +1091,27 @@ impl LiteTableContext { } } }; - - self.register_table_with_stats( + let cluster_key = stmt.cluster_by.as_ref().map(|cluster_by| { + let cluster_exprs = cluster_by + .cluster_exprs + .iter() + .map(|expr| format!("{expr:#}")) + .collect::>(); + format!("({})", cluster_exprs.join(", ")) + }); + + let table = self.build_fake_table( &database, &table_name, fields, table_stats, column_stats, histograms, + cluster_key, stmt.table_options, - ) + )?; + self.default_catalog.insert_table(&database, table); + Ok(()) } _ => unsupported("lite sql harness table registration from non-DDL SQL"), } diff --git a/src/query/sql/tests/it/optimizer/cluster_key_join_order.rs b/src/query/sql/tests/it/optimizer/cluster_key_join_order.rs new file mode 100644 index 0000000000000..7c390be59a542 --- /dev/null +++ b/src/query/sql/tests/it/optimizer/cluster_key_join_order.rs @@ -0,0 +1,459 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::io::Write; + +use databend_common_catalog::BasicColumnStatistics; +use databend_common_catalog::TableStatistics; +use databend_common_catalog::table_context::TableContextSettings; +use databend_common_exception::Result; +use databend_common_sql::optimizer::CollectStatisticsOptimizer; +use databend_common_sql::optimizer::OptimizerContext; +use databend_common_sql::optimizer::optimizers::CascadesOptimizer; +use databend_common_sql::optimizer::optimizers::DPhpyOptimizer; +use databend_common_sql::optimizer::optimizers::operator::PullUpFilterOptimizer; +use databend_common_sql::optimizer::optimizers::operator::RuleStatsAggregateOptimizer; +use databend_common_sql::optimizer::optimizers::operator::SubqueryDecorrelatorOptimizer; +use databend_common_sql::optimizer::optimizers::recursive::RecursiveRuleOptimizer; +use databend_common_sql::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES; +use databend_common_sql::optimizer::optimizers::rule::RuleID; +use databend_common_sql::optimizer::pipeline::OptimizerPipeline; +use databend_common_sql::plans::Plan; +use databend_common_statistics::Datum; + +use crate::framework::LiteTableContext; +use crate::framework::golden::open_golden_file; +use crate::framework::golden::write_case_title; + +struct JoinMemoCase<'a> { + name: &'a str, + description: &'a str, + tables: Vec>, + settings: &'a [(&'a str, &'a str)], + sql: &'a str, +} + +struct JoinMemoTable<'a> { + create_sql: &'a str, + column_statistics: fn(u64) -> HashMap, +} + +const CLUSTER_KEY_DISCOUNT: &[(&str, &str)] = &[("cost_factor_cluster_key", "85")]; + +fn key_tables_with_a_clustered_k1_k2() -> Vec> { + vec![ + JoinMemoTable { + create_sql: "CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k1, k2)", + column_statistics, + }, + JoinMemoTable { + create_sql: "CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT)", + column_statistics, + }, + JoinMemoTable { + create_sql: "CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT)", + column_statistics, + }, + ] +} + +fn string_filter_tables() -> Vec> { + vec![ + JoinMemoTable { + create_sql: "CREATE TABLE a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a)", + column_statistics: string_column_statistics, + }, + JoinMemoTable { + create_sql: "CREATE TABLE b(join_key BIGINT, column_a STRING, column_b STRING)", + column_statistics: string_column_statistics, + }, + ] +} + +fn composite_string_tables() -> Vec> { + vec![ + JoinMemoTable { + create_sql: "CREATE TABLE table_a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a, column_b)", + column_statistics: string_column_statistics, + }, + JoinMemoTable { + create_sql: "CREATE TABLE table_b(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_b, column_a)", + column_statistics: string_column_statistics, + }, + ] +} + +fn table_statistics(rows: u64) -> TableStatistics { + TableStatistics { + num_rows: Some(rows), + data_size: Some(rows.saturating_mul(24)), + data_size_compressed: None, + index_size: None, + bloom_index_size: None, + ngram_index_size: None, + inverted_index_size: None, + vector_index_size: None, + virtual_column_size: None, + number_of_blocks: Some(1), + number_of_segments: Some(1), + } +} + +fn column_statistics(rows: u64) -> HashMap { + ["k1", "k2", "v"] + .into_iter() + .map(|column| { + (column.to_string(), BasicColumnStatistics { + min: Some(Datum::Int(0)), + max: Some(Datum::Int(rows as i64)), + ndv: Some(rows), + null_count: 0, + in_memory_size: rows.saturating_mul(8), + }) + }) + .collect() +} + +fn trace_column_statistics(rows: u64) -> HashMap { + let mut stats = column_statistics(rows); + stats.insert("start_day".to_string(), BasicColumnStatistics { + min: Some(Datum::UInt(20240101)), + max: Some(Datum::UInt(20241231)), + ndv: Some(365), + null_count: 0, + in_memory_size: rows.saturating_mul(4), + }); + stats.insert("trace_id".to_string(), BasicColumnStatistics { + min: Some(Datum::Bytes( + b"0000000000000000000000000000000000000000".to_vec(), + )), + max: Some(Datum::Bytes( + b"ffffffffffffffffffffffffffffffffffffffff".to_vec(), + )), + ndv: Some(rows), + null_count: 0, + in_memory_size: rows.saturating_mul(40), + }); + stats +} + +fn string_column_statistics(rows: u64) -> HashMap { + let mut stats = HashMap::new(); + stats.insert("join_key".to_string(), BasicColumnStatistics { + min: Some(Datum::Int(0)), + max: Some(Datum::Int(rows as i64)), + ndv: Some(rows), + null_count: 0, + in_memory_size: rows.saturating_mul(8), + }); + for column in ["column_a", "column_b"] { + stats.insert(column.to_string(), BasicColumnStatistics { + min: Some(Datum::Bytes(b"aaa".to_vec())), + max: Some(Datum::Bytes(b"zzz".to_vec())), + ndv: Some(100), + null_count: 0, + in_memory_size: rows.saturating_mul(16), + }); + } + stats +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_cluster_key_order_join_memo_golden() -> Result<()> { + let mut file = open_golden_file("optimizer", "cluster_key_join_order.txt")?; + + for case in [ + JoinMemoCase { + name: "k1_k2_prefix", + description: "Full memo output when the clustered probe can first match a.k1.", + tables: key_tables_with_a_clustered_k1_k2(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM a + JOIN b ON a.k1 = b.k1 + JOIN c ON a.k2 = c.k2 + ", + }, + JoinMemoCase { + name: "k2_k1_prefix", + description: "Full memo output when the clustered probe can first match a.k2.", + tables: vec![ + JoinMemoTable { + create_sql: + "CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k2, k1)", + column_statistics, + }, + JoinMemoTable { + create_sql: "CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT)", + column_statistics, + }, + JoinMemoTable { + create_sql: "CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT)", + column_statistics, + }, + ], + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM a + JOIN b ON a.k1 = b.k1 + JOIN c ON a.k2 = c.k2 + ", + }, + JoinMemoCase { + name: "filter_preserves_cluster_keys", + description: "Cluster keys still affect join order after a filter on the clustered table.", + tables: key_tables_with_a_clustered_k1_k2(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM (SELECT * FROM a WHERE v >= 0) a + JOIN b ON a.k1 = b.k1 + JOIN c ON a.k2 = c.k2 + ", + }, + JoinMemoCase { + name: "limit_and_join_preserve_cluster_keys", + description: "Cluster keys still affect join order after a limit subquery and a partial join.", + tables: key_tables_with_a_clustered_k1_k2(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM (SELECT * FROM a LIMIT 1000) a + JOIN b ON a.k1 = b.k1 + JOIN c ON a.k2 = c.k2 + ", + }, + JoinMemoCase { + name: "build_side_cluster_keys_do_not_propagate", + description: "Cluster keys from a build-side clustered table do not affect later join costs.", + tables: key_tables_with_a_clustered_k1_k2(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM b + JOIN (SELECT * FROM a LIMIT 100) a ON b.k1 = a.k1 + JOIN (SELECT * FROM c LIMIT 10) c ON a.k2 = c.k2 + ", + }, + JoinMemoCase { + name: "clustered_filter_side_becomes_build", + description: "A filter on the clustered column makes that side a better build input.", + tables: string_filter_tables(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM a + JOIN b ON a.join_key = b.join_key + WHERE a.column_a = 'xxx' + AND b.column_b = 'xxx' + ", + }, + JoinMemoCase { + name: "self_join_filter_on_clustered_alias_becomes_build", + description: "A self-join filter only matches the alias whose filter references the clustered column.", + tables: vec![JoinMemoTable { + create_sql: + "CREATE TABLE t(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a)", + column_statistics: string_column_statistics, + }], + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM t AS l + JOIN t AS r ON l.join_key = r.join_key + WHERE l.column_a = 'xxx' + AND r.column_b = 'xxx' + ", + }, + JoinMemoCase { + name: "clustered_filter_discount_disabled", + description: "The default cluster-key cost factor keeps the original join-order cost formula.", + tables: string_filter_tables(), + settings: &[], + sql: " + SELECT * + FROM a + JOIN b ON a.join_key = b.join_key + WHERE a.column_a = 'xxx' + AND b.column_b = 'xxx' + ", + }, + JoinMemoCase { + name: "composite_cluster_key_filter_on_second_column", + description: "A filter on column_b favors the table whose cluster key starts with column_b.", + tables: composite_string_tables(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM table_a AS a + JOIN table_b AS b + ON a.column_a = b.column_a + AND a.column_b = b.column_b + WHERE b.column_b = 'xxx' + ", + }, + JoinMemoCase { + name: "composite_cluster_key_filter_on_first_column", + description: "A filter on column_a favors the table whose cluster key starts with column_a.", + tables: composite_string_tables(), + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM table_a AS a + JOIN table_b AS b + ON a.column_a = b.column_a + AND a.column_b = b.column_b + WHERE a.column_a = 'xxx' + ", + }, + JoinMemoCase { + name: "composite_cluster_key_later_column_is_not_prefix", + description: "A later composite cluster-key column is not treated as a leading prefix.", + tables: vec![ + JoinMemoTable { + create_sql: + "CREATE TABLE table_a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a, column_b)", + column_statistics: string_column_statistics, + }, + JoinMemoTable { + create_sql: + "CREATE TABLE table_b(join_key BIGINT, column_a STRING, column_b STRING)", + column_statistics: string_column_statistics, + }, + ], + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT a.column_b, b.column_b + FROM table_a AS a + JOIN table_b AS b + ON a.column_b = b.column_b + WHERE a.column_b = 'xxx' + ", + }, + JoinMemoCase { + name: "linear_expression_cluster_key", + description: "A LINEAR cluster key with to_yyyymmdd and substring expressions affects join costs.", + tables: vec![ + JoinMemoTable { + create_sql: "CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING) CLUSTER BY linear ( + to_yyyymmdd(start_time), + SUBSTRING(trace_id FROM 1 FOR 40) + )", + column_statistics: trace_column_statistics, + }, + JoinMemoTable { + create_sql: + "CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING)", + column_statistics: trace_column_statistics, + }, + JoinMemoTable { + create_sql: + "CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING)", + column_statistics: trace_column_statistics, + }, + ], + settings: CLUSTER_KEY_DISCOUNT, + sql: " + SELECT * + FROM a + JOIN b + ON to_yyyymmdd(a.start_time) = b.start_day + AND SUBSTRING(a.trace_id FROM 1 FOR 40) = b.trace_id + JOIN c ON a.k2 = c.k2 + ", + }, + ] { + write_cluster_key_join_order_memo(&mut file, case).await?; + } + + Ok(()) +} + +async fn write_cluster_key_join_order_memo( + file: &mut impl Write, + case: JoinMemoCase<'_>, +) -> Result<()> { + let ctx = LiteTableContext::create().await?; + ctx.configure_for_optimizer_case(true)?; + ctx.set_cluster_node_num(1); + + write_case_title(file, case.name, case.description)?; + for (name, value) in case.settings { + ctx.get_settings() + .set_setting((*name).to_string(), (*value).to_string())?; + writeln!(file, "setting: {name}={value}")?; + } + for table in &case.tables { + writeln!(file, "setup: {}", table.create_sql)?; + ctx.register_table_sql_with_stats( + table.create_sql, + Some(table_statistics(1000)), + (table.column_statistics)(1000), + HashMap::new(), + ) + .await?; + } + + let sql = case + .sql + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .collect::>() + .join("\n"); + writeln!(file, "sql: {sql}")?; + writeln!(file, "memo:")?; + writeln!(file, "{}", explain_memo(&ctx, &sql).await?)?; + writeln!(file)?; + Ok(()) +} + +async fn explain_memo(ctx: &std::sync::Arc, sql: &str) -> Result { + let Plan::Query { + s_expr, metadata, .. + } = ctx.bind_sql(sql).await? + else { + unreachable!("SELECT should bind to a query plan"); + }; + + let settings = ctx.get_settings(); + let opt_ctx = OptimizerContext::new(ctx.clone(), metadata) + .with_settings(&settings)? + .set_enable_distributed_optimization(true) + .clone(); + opt_ctx.set_flag("explain_memo", true); + + let mut pipeline = OptimizerPipeline::new(opt_ctx.clone(), *s_expr) + .await? + .add(SubqueryDecorrelatorOptimizer::new(opt_ctx.clone(), None)) + .add(RuleStatsAggregateOptimizer::new(opt_ctx.clone())) + .add(CollectStatisticsOptimizer::new(opt_ctx.clone())?) + .add(PullUpFilterOptimizer::new(opt_ctx.clone())) + .add(RecursiveRuleOptimizer::new( + opt_ctx.clone(), + &DEFAULT_REWRITE_RULES, + )) + .add(RecursiveRuleOptimizer::new(opt_ctx.clone(), &[ + RuleID::SplitAggregate, + ])) + .add(DPhpyOptimizer::new(opt_ctx.clone())) + .add(CascadesOptimizer::new(opt_ctx.clone())?); + + let _s_expr = pipeline.execute().await?; + + pipeline.memo().display() +} diff --git a/src/query/sql/tests/it/optimizer/cluster_key_join_order.txt b/src/query/sql/tests/it/optimizer/cluster_key_join_order.txt new file mode 100644 index 0000000000000..154c90194027c --- /dev/null +++ b/src/query/sql/tests/it/optimizer/cluster_key_join_order.txt @@ -0,0 +1,465 @@ +=== k1_k2_prefix === +description: Full memo output when the clustered probe can first match a.k1. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k1, k2) +setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT) +setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT) +sql: SELECT * +FROM a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2 +memo: +Memo +├── root group: #5 +├── estimated memory: 3.75 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #3 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 13000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#1, #2] +├── Group #4 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 25000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #3] +└── Group #5 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 26000.000, children: [{ dist: Any }] + └── #0 EvalScalar [#4] + + +=== k2_k1_prefix === +description: Full memo output when the clustered probe can first match a.k2. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k2, k1) +setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT) +setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT) +sql: SELECT * +FROM a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2 +memo: +Memo +├── root group: #5 +├── estimated memory: 3.75 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #3 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 13000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#1, #2] +├── Group #4 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 25000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #3] +└── Group #5 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 26000.000, children: [{ dist: Any }] + └── #0 EvalScalar [#4] + + +=== filter_preserves_cluster_keys === +description: Cluster keys still affect join order after a filter on the clustered table. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k1, k2) +setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT) +setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT) +sql: SELECT * +FROM (SELECT * FROM a WHERE v >= 0) a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2 +memo: +Memo +├── root group: #6 +├── estimated memory: 4.38 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 2000.000, children: [{ dist: Any }] +│ └── #0 EvalScalar [#1] +├── Group #3 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #4 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 14000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#2, #3] +├── Group #5 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 26000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #4] +└── Group #6 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 27000.000, children: [{ dist: Any }] + └── #0 EvalScalar [#5] + + +=== limit_and_join_preserve_cluster_keys === +description: Cluster keys still affect join order after a limit subquery and a partial join. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k1, k2) +setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT) +setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT) +sql: SELECT * +FROM (SELECT * FROM a LIMIT 1000) a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2 +memo: +Memo +├── root group: #7 +├── estimated memory: 5.00 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 2000.000, children: [{ dist: Any }] +│ └── #0 Limit [#0] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 3000.000, children: [{ dist: Any }] +│ └── #0 EvalScalar [#1] +├── Group #3 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #4 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 15000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#2, #3] +├── Group #5 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #6 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1026000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#4, #5] +└── Group #7 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 1001026000.000, children: [{ dist: Any }] + └── #0 EvalScalar [#6] + + +=== build_side_cluster_keys_do_not_propagate === +description: Cluster keys from a build-side clustered table do not affect later join costs. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT) CLUSTER BY (k1, k2) +setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT) +setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT) +sql: SELECT * +FROM b +JOIN (SELECT * FROM a LIMIT 100) a ON b.k1 = a.k1 +JOIN (SELECT * FROM c LIMIT 10) c ON a.k2 = c.k2 +memo: +Memo +├── root group: #9 +├── estimated memory: 6.25 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 2000.000, children: [{ dist: Any }] +│ └── #0 Limit [#1] +├── Group #3 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 2100.000, children: [{ dist: Any }] +│ └── #0 EvalScalar [#2] +├── Group #4 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #5 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 2000.000, children: [{ dist: Any }] +│ └── #0 Limit [#4] +├── Group #6 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 2010.000, children: [{ dist: Any }] +│ └── #0 EvalScalar [#5] +├── Group #7 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 4310.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#3, #6] +├── Group #8 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 16310.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #7] +└── Group #9 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 1016310.000, children: [{ dist: Any }] + └── #0 EvalScalar [#8] + + +=== clustered_filter_side_becomes_build === +description: A filter on the clustered column makes that side a better build input. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a) +setup: CREATE TABLE b(join_key BIGINT, column_a STRING, column_b STRING) +sql: SELECT * +FROM a +JOIN b ON a.join_key = b.join_key +WHERE a.column_a = 'xxx' +AND b.column_b = 'xxx' +memo: +Memo +├── root group: #3 +├── estimated memory: 2.50 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 130.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #1] +└── Group #3 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 131.000, children: [{ dist: Any }] + └── #0 EvalScalar [#2] + + +=== self_join_filter_on_clustered_alias_becomes_build === +description: A self-join filter only matches the alias whose filter references the clustered column. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE t(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a) +sql: SELECT * +FROM t AS l +JOIN t AS r ON l.join_key = r.join_key +WHERE l.column_a = 'xxx' +AND r.column_b = 'xxx' +memo: +Memo +├── root group: #3 +├── estimated memory: 2.50 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 130.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #1] +└── Group #3 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 131.000, children: [{ dist: Any }] + └── #0 EvalScalar [#2] + + +=== clustered_filter_discount_disabled === +description: The default cluster-key cost factor keeps the original join-order cost formula. +setup: CREATE TABLE a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a) +setup: CREATE TABLE b(join_key BIGINT, column_a STRING, column_b STRING) +sql: SELECT * +FROM a +JOIN b ON a.join_key = b.join_key +WHERE a.column_a = 'xxx' +AND b.column_b = 'xxx' +memo: +Memo +├── root group: #3 +├── estimated memory: 2.50 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 130.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #1] +└── Group #3 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 131.000, children: [{ dist: Any }] + └── #0 EvalScalar [#2] + + +=== composite_cluster_key_filter_on_second_column === +description: A filter on column_b favors the table whose cluster key starts with column_b. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE table_a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a, column_b) +setup: CREATE TABLE table_b(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_b, column_a) +sql: SELECT * +FROM table_a AS a +JOIN table_b AS b +ON a.column_a = b.column_a +AND a.column_b = b.column_b +WHERE b.column_b = 'xxx' +memo: +Memo +├── root group: #3 +├── estimated memory: 2.50 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 130.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #1] +└── Group #3 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 140.458, children: [{ dist: Any }] + └── #0 EvalScalar [#2] + + +=== composite_cluster_key_filter_on_first_column === +description: A filter on column_a favors the table whose cluster key starts with column_a. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE table_a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a, column_b) +setup: CREATE TABLE table_b(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_b, column_a) +sql: SELECT * +FROM table_a AS a +JOIN table_b AS b +ON a.column_a = b.column_a +AND a.column_b = b.column_b +WHERE a.column_a = 'xxx' +memo: +Memo +├── root group: #3 +├── estimated memory: 2.50 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 130.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #1] +└── Group #3 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 140.458, children: [{ dist: Any }] + └── #0 EvalScalar [#2] + + +=== composite_cluster_key_later_column_is_not_prefix === +description: A later composite cluster-key column is not treated as a leading prefix. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE table_a(join_key BIGINT, column_a STRING, column_b STRING) CLUSTER BY (column_a, column_b) +setup: CREATE TABLE table_b(join_key BIGINT, column_a STRING, column_b STRING) +sql: SELECT a.column_b, b.column_b +FROM table_a AS a +JOIN table_b AS b +ON a.column_b = b.column_b +WHERE a.column_b = 'xxx' +memo: +Memo +├── root group: #3 +├── estimated memory: 2.50 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 10.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 130.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #1] +└── Group #3 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 230.000, children: [{ dist: Any }] + └── #0 EvalScalar [#2] + + +=== linear_expression_cluster_key === +description: A LINEAR cluster key with to_yyyymmdd and substring expressions affects join costs. +setting: cost_factor_cluster_key=85 +setup: CREATE TABLE a(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING) CLUSTER BY linear ( + to_yyyymmdd(start_time), + SUBSTRING(trace_id FROM 1 FOR 40) + ) +setup: CREATE TABLE b(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING) +setup: CREATE TABLE c(k1 BIGINT, k2 BIGINT, v BIGINT, start_time TIMESTAMP, start_day UInt32, trace_id STRING) +sql: SELECT * +FROM a +JOIN b +ON to_yyyymmdd(a.start_time) = b.start_day +AND SUBSTRING(a.trace_id FROM 1 FOR 40) = b.trace_id +JOIN c ON a.k2 = c.k2 +memo: +Memo +├── root group: #5 +├── estimated memory: 3.75 KiB +├── Group #0 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #1 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #2 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 1000.000, children: [] +│ └── #0 Scan [] +├── Group #3 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 13000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#1, #2] +├── Group #4 +│ ├── Best properties +│ │ └── { dist: Any }: expr: #0, cost: 25000.000, children: [{ dist: Any }, { dist: Any }] +│ └── #0 Join [#0, #3] +└── Group #5 + ├── Best properties + │ └── { dist: Any }: expr: #0, cost: 1025000.000, children: [{ dist: Any }] + └── #0 EvalScalar [#4] + + diff --git a/src/query/sql/tests/it/optimizer/join_cardinality.rs b/src/query/sql/tests/it/optimizer/join_cardinality.rs index 1917b1ed78c41..4c7ddf25e5880 100644 --- a/src/query/sql/tests/it/optimizer/join_cardinality.rs +++ b/src/query/sql/tests/it/optimizer/join_cardinality.rs @@ -282,6 +282,7 @@ fn direct_stat_info(column: usize, stats: TableStats) -> Result> { null_count: StatCount::exact(0), histogram: Some(histogram_from_json(stats.histogram_json)?), })]), + cluster_key_stats: Default::default(), }, })) } diff --git a/src/query/sql/tests/it/optimizer/mod.rs b/src/query/sql/tests/it/optimizer/mod.rs index 6303dc449a663..b0260b3c36f97 100644 --- a/src/query/sql/tests/it/optimizer/mod.rs +++ b/src/query/sql/tests/it/optimizer/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod cluster_key_join_order; mod decorrelate_correlated_aliases; mod eager_aggregation; mod join_cardinality; diff --git a/src/query/sql/tests/it/semantic/binder_materialized_cte_virtual_column.txt b/src/query/sql/tests/it/semantic/binder_materialized_cte_virtual_column.txt index 9ff81635776ff..8884d3e890fa7 100644 --- a/src/query/sql/tests/it/semantic/binder_materialized_cte_virtual_column.txt +++ b/src/query/sql/tests/it/semantic/binder_materialized_cte_virtual_column.txt @@ -24,10 +24,10 @@ Sequence(Sequence) ├── logical_recursive_cte_id: None ├── EvalScalar │ ├── scalars: [default.t.v['message']['attribute']['user_id'] (#7) AS (#7)] - │ └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [5, 6, 7], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: Some(115..126), column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(1), column_name: "v['message']", column_name_lower: None, index: 5, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 5 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(1), column_name: "v['message']['attribute']['account_id']", column_name_lower: None, index: 6, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 6 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(1), column_name: "v['message']['attribute']['user_id']", column_name_lower: None, index: 7, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 7 }] }), children: [SExpr { plan: Scan(Scan { table_index: 1, columns: {4, 5, 6, 7}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 1, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {5: 1, 6: 2, 7: 3}, stat_info: None }) + │ └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [5, 6, 7], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: Some(115..126), column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(1), column_name: "v['message']", column_name_lower: None, index: 5, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 5 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(1), column_name: "v['message']['attribute']['account_id']", column_name_lower: None, index: 6, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 6 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(1), column_name: "v['message']['attribute']['user_id']", column_name_lower: None, index: 7, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 7 }] }), children: [SExpr { plan: Scan(Scan { table_index: 1, columns: {4, 5, 6, 7}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 1, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {}, cluster_keys: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {5: 1, 6: 2, 7: 3}, stat_info: None }) └── EvalScalar ├── scalars: [default.t.v['message']['attribute']['account_id'] (#10) AS (#10)] - └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [9, 10, 11], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: Some(115..126), column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(2), column_name: "v['message']", column_name_lower: None, index: 9, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 9 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(2), column_name: "v['message']['attribute']['account_id']", column_name_lower: None, index: 10, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 10 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(2), column_name: "v['message']['attribute']['user_id']", column_name_lower: None, index: 11, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 11 }] }), children: [SExpr { plan: Scan(Scan { table_index: 2, columns: {8, 9, 10, 11}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 2, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {9: 1, 10: 2, 11: 3}, stat_info: None }) + └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [9, 10, 11], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: Some(115..126), column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(2), column_name: "v['message']", column_name_lower: None, index: 9, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 9 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(2), column_name: "v['message']['attribute']['account_id']", column_name_lower: None, index: 10, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 10 }, ScalarItem { scalar: BoundColumnRef(BoundColumnRef { span: None, column: ColumnBinding { database_name: Some("default"), table_name: Some("t"), column_position: None, table_index: Some(2), column_name: "v['message']['attribute']['user_id']", column_name_lower: None, index: 11, data_type: Nullable(Variant), visibility: InVisible, virtual_expr: None, is_srf: false } }), index: 11 }] }), children: [SExpr { plan: Scan(Scan { table_index: 2, columns: {8, 9, 10, 11}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 2, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {}, cluster_keys: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {9: 1, 10: 2, 11: 3}, stat_info: None }) === materialized_cte_virtual_column_rewrites_chained_ctes === @@ -107,10 +107,10 @@ Sequence(Sequence) ├── logical_recursive_cte_id: None ├── EvalScalar │ ├── scalars: [__databend_virtual_column__0 (#7) AS (#7)] - │ └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [5, 6, 7], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(1), column_name: "v", column_name_lower: None, index: 4, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\"}") })] }), index: 5 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(1), column_name: "v", column_name_lower: None, index: 4, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"account_id\"}") })] }), index: 6 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(1), column_name: "v", column_name_lower: None, index: 4, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"user_id\"}") })] }), index: 7 }] }), children: [SExpr { plan: Scan(Scan { table_index: 1, columns: {4}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 1, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {5: 1, 6: 2, 7: 3}, stat_info: None }) + │ └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [5, 6, 7], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(1), column_name: "v", column_name_lower: None, index: 4, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\"}") })] }), index: 5 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(1), column_name: "v", column_name_lower: None, index: 4, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"account_id\"}") })] }), index: 6 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(1), column_name: "v", column_name_lower: None, index: 4, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"user_id\"}") })] }), index: 7 }] }), children: [SExpr { plan: Scan(Scan { table_index: 1, columns: {4}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 1, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {}, cluster_keys: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {5: 1, 6: 2, 7: 3}, stat_info: None }) └── EvalScalar ├── scalars: [__databend_virtual_column__1 (#10) AS (#10)] - └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [9, 10, 11], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(2), column_name: "v", column_name_lower: None, index: 8, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\"}") })] }), index: 9 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(2), column_name: "v", column_name_lower: None, index: 8, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"account_id\"}") })] }), index: 10 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(2), column_name: "v", column_name_lower: None, index: 8, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"user_id\"}") })] }), index: 11 }] }), children: [SExpr { plan: Scan(Scan { table_index: 2, columns: {8}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 2, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {9: 1, 10: 2, 11: 3}, stat_info: None }) + └── MaterializedCTERef(MaterializedCTERef { cte_name: "__materialized_cte_0_logs", output_columns: [9, 10, 11], def: SExpr { plan: EvalScalar(EvalScalar { items: [ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(2), column_name: "v", column_name_lower: None, index: 8, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\"}") })] }), index: 9 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(2), column_name: "v", column_name_lower: None, index: 8, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"account_id\"}") })] }), index: 10 }, ScalarItem { scalar: FunctionCall(FunctionCall { span: None, func_name: "get_by_keypath", params: [], arguments: [BoundColumnRef(BoundColumnRef { span: Some(113..114), column: ColumnBinding { database_name: Some("default"), table_name: Some("t_no_vc"), column_position: Some(1), table_index: Some(2), column_name: "v", column_name_lower: None, index: 8, data_type: Nullable(Variant), visibility: Visible, virtual_expr: None, is_srf: false } }), ConstantExpr(ConstantExpr { span: None, value: String("{\"message\",\"attribute\",\"user_id\"}") })] }), index: 11 }] }), children: [SExpr { plan: Scan(Scan { table_index: 2, columns: {8}, push_down_predicates: None, secure_predicates: None, limit: None, order_by: None, prewhere: None, agg_index: None, change_type: None, update_stream_columns: false, inverted_index: None, vector_index: None, is_lazy_table: false, sample: None, scan_id: 2, statistics: Statistics { table_stats: None, column_stats: {}, histograms: {}, cluster_keys: {} } }), children: [], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }], original_group: None, rel_prop: OnceLock(), stat_info: OnceLock(), applied_rules: AppliedRules { rules: RuleSet { rules: RoaringBitmap<[]> } } }, column_mapping: {9: 1, 10: 2, 11: 3}, stat_info: None }) === materialized_cte_unqualified_column_preserves_ambiguity === diff --git a/src/query/storages/fuse/src/table_functions/clustering_information.rs b/src/query/storages/fuse/src/table_functions/clustering_information.rs index 469bec1b7cd7c..ac5f7178ff0d5 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information.rs @@ -182,7 +182,7 @@ impl<'a> ClusteringInformationImpl<'a> { analyze_cluster_keys(self.ctx.clone(), Arc::new(self.table.clone()), b)?; let exprs = exprs .into_iter() - .map(|expr| expr.project_column_ref(|index| Ok(index.as_usize()))) + .map(|expr| expr.project_column_ref(|index| Ok(*index))) .collect::>>()?; if a.is_some() && a.unwrap() == cluster_key { default_cluster_key_id = self.table.cluster_key_id(); diff --git a/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/cluster_key_cost.test b/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/cluster_key_cost.test new file mode 100644 index 0000000000000..f78c943efd2ea --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/cluster_key_cost.test @@ -0,0 +1,469 @@ +statement ok +drop database if exists join_reorder_cluster_key_cost; + +statement ok +create database join_reorder_cluster_key_cost; + +statement ok +use join_reorder_cluster_key_cost; + +# Isolate the cluster-key filter discount from TPCH data shape. +statement ok +create or replace table clustered_side( + join_key int, + filter_key int, + payload int +) cluster by(filter_key) as +select + (number % 100)::int, + (number % 10)::int, + number::int +from numbers(1000); + +statement ok +create or replace table plain_side( + join_key int, + filter_key int, + payload int +) as +select + (number % 100)::int, + (number % 10)::int, + number::int +from numbers(1000); + +statement ok +analyze table clustered_side; + +statement ok +analyze table plain_side; + +statement ok +set cost_factor_cluster_key = 0; + +query T +explain join select * +from clustered_side +join plain_side on clustered_side.join_key = plain_side.join_key +where clustered_side.filter_key = 1 + and plain_side.filter_key = 1; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.plain_side (#1) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.clustered_side (#0) (read rows: 1000) + +statement ok +set cost_factor_cluster_key = 85; + +query T +explain join select * +from clustered_side +join plain_side on clustered_side.join_key = plain_side.join_key +where clustered_side.filter_key = 1 + and plain_side.filter_key = 1; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.clustered_side (#0) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.plain_side (#1) (read rows: 1000) + +statement ok +set cost_factor_cluster_key = 100; + +query T +explain join select * +from clustered_side +join plain_side on clustered_side.join_key = plain_side.join_key +where clustered_side.filter_key = 1 + and plain_side.filter_key = 1; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.plain_side (#1) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.clustered_side (#0) (read rows: 1000) + +# Numeric three-table cases from cluster_key_join_order.rs. +statement ok +set cost_factor_cluster_key = 85; + +statement ok +create or replace table a(k1 BIGINT, k2 BIGINT, v BIGINT) cluster by(k1, k2) as +select number::BIGINT, number::BIGINT, number::BIGINT +from numbers(1000); + +statement ok +create or replace table b(k1 BIGINT, k2 BIGINT, v BIGINT) as +select number::BIGINT, number::BIGINT, number::BIGINT +from numbers(1000); + +statement ok +create or replace table c(k1 BIGINT, k2 BIGINT, v BIGINT) as +select number::BIGINT, number::BIGINT, number::BIGINT +from numbers(1000); + +statement ok +analyze table a; + +statement ok +analyze table b; + +statement ok +analyze table c; + +# k1_k2_prefix +query T +explain join +SELECT * +FROM a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) +└── Probe + └── HashJoin: INNER + ├── Build + │ └── Scan: default.join_reorder_cluster_key_cost.c (#2) (read rows: 1000) + └── Probe + └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) + +# filter_preserves_cluster_keys +query T +explain join +SELECT * +FROM (SELECT * FROM a WHERE v >= 0) a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) +└── Probe + └── HashJoin: INNER + ├── Build + │ └── Scan: default.join_reorder_cluster_key_cost.c (#2) (read rows: 1000) + └── Probe + └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) + +# limit_and_join_preserve_cluster_keys +query T +explain join +SELECT * +FROM (SELECT * FROM a LIMIT 1000) a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) +└── Probe + └── HashJoin: INNER + ├── Build + │ └── Scan: default.join_reorder_cluster_key_cost.c (#2) (read rows: 1000) + └── Probe + └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) + +# build_side_cluster_keys_do_not_propagate +query T +explain join +SELECT * +FROM b +JOIN (SELECT * FROM a LIMIT 100) a ON b.k1 = a.k1 +JOIN (SELECT * FROM c LIMIT 10) c ON a.k2 = c.k2; +---- +HashJoin: INNER +├── Build +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.join_reorder_cluster_key_cost.c (#2) (read rows: 1000) +│ └── Probe +│ └── Scan: default.join_reorder_cluster_key_cost.a (#1) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.b (#0) (read rows: 1000) + +statement ok +create or replace table a(k1 BIGINT, k2 BIGINT, v BIGINT) cluster by(k2, k1) as +select number::BIGINT, number::BIGINT, number::BIGINT +from numbers(1000); + +statement ok +analyze table a; + +# k2_k1_prefix +query T +explain join +SELECT * +FROM a +JOIN b ON a.k1 = b.k1 +JOIN c ON a.k2 = c.k2; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.c (#2) (read rows: 1000) +└── Probe + └── HashJoin: INNER + ├── Build + │ └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) + └── Probe + └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) + +# String filter cases from cluster_key_join_order.rs. +statement ok +create or replace table a(join_key BIGINT, column_a STRING, column_b STRING) cluster by(column_a) as +select + number::BIGINT, + if(number % 10 = 0, 'xxx', 'aaa'), + if(number % 10 = 1, 'xxx', 'bbb') +from numbers(1000); + +statement ok +create or replace table b(join_key BIGINT, column_a STRING, column_b STRING) as +select + number::BIGINT, + if(number % 10 = 0, 'xxx', 'aaa'), + if(number % 10 = 1, 'xxx', 'bbb') +from numbers(1000); + +statement ok +analyze table a; + +statement ok +analyze table b; + +# clustered_filter_side_becomes_build +query T +explain join +SELECT * +FROM a +JOIN b ON a.join_key = b.join_key +WHERE a.column_a = 'xxx' + AND b.column_b = 'xxx'; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) + +statement ok +create or replace table t(join_key BIGINT, column_a STRING, column_b STRING) cluster by(column_a) as +select + number::BIGINT, + if(number % 10 = 0, 'xxx', 'aaa'), + if(number % 10 = 1, 'xxx', 'bbb') +from numbers(1000); + +statement ok +analyze table t; + +# self_join_filter_on_clustered_alias_becomes_build +query T +explain join +SELECT * +FROM t AS l +JOIN t AS r ON l.join_key = r.join_key +WHERE l.column_a = 'xxx' + AND r.column_b = 'xxx'; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.t (#0) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.t (#1) (read rows: 1000) + +statement ok +unset cost_factor_cluster_key; + +# clustered_filter_discount_disabled +query T +explain join +SELECT * +FROM a +JOIN b ON a.join_key = b.join_key +WHERE a.column_a = 'xxx' + AND b.column_b = 'xxx'; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) + +# Composite cluster-key cases from cluster_key_join_order.rs. +statement ok +set cost_factor_cluster_key = 85; + +statement ok +create or replace table table_a(join_key BIGINT, column_a STRING, column_b STRING) cluster by(column_a, column_b) as +select + number::BIGINT, + if(number % 10 = 0, 'xxx', 'aaa'), + if(number % 10 = 1, 'xxx', 'bbb') +from numbers(1000); + +statement ok +create or replace table table_b(join_key BIGINT, column_a STRING, column_b STRING) cluster by(column_b, column_a) as +select + number::BIGINT, + if(number % 10 = 0, 'xxx', 'aaa'), + if(number % 10 = 1, 'xxx', 'bbb') +from numbers(1000); + +statement ok +analyze table table_a; + +statement ok +analyze table table_b; + +# composite_cluster_key_filter_on_second_column +query T +explain join +SELECT * +FROM table_a AS a +JOIN table_b AS b + ON a.column_a = b.column_a + AND a.column_b = b.column_b +WHERE b.column_b = 'xxx'; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.table_b (#1) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.table_a (#0) (read rows: 1000) + +# composite_cluster_key_filter_on_first_column +query T +explain join +SELECT * +FROM table_a AS a +JOIN table_b AS b + ON a.column_a = b.column_a + AND a.column_b = b.column_b +WHERE a.column_a = 'xxx'; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.table_a (#0) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.table_b (#1) (read rows: 1000) + +statement ok +create or replace table table_b(join_key BIGINT, column_a STRING, column_b STRING) as +select + number::BIGINT, + if(number % 10 = 0, 'xxx', 'aaa'), + if(number % 10 = 1, 'xxx', 'bbb') +from numbers(1000); + +statement ok +analyze table table_b; + +# composite_cluster_key_later_column_is_not_prefix +query T +explain join +SELECT a.column_b, b.column_b +FROM table_a AS a +JOIN table_b AS b + ON a.column_b = b.column_b +WHERE a.column_b = 'xxx'; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.table_b (#1) (read rows: 1000) +└── Probe + └── Scan: default.join_reorder_cluster_key_cost.table_a (#0) (read rows: 1000) + +# Linear expression cluster-key case from cluster_key_join_order.rs. +statement ok +create or replace table a( + k1 BIGINT, + k2 BIGINT, + v BIGINT, + start_time TIMESTAMP, + start_day UInt32, + trace_id STRING +) cluster by linear ( + to_yyyymmdd(start_time), + substring(trace_id from 1 for 40) +) as +select + number::BIGINT, + number::BIGINT, + number::BIGINT, + '2024-01-01 00:00:00'::TIMESTAMP, + 20240101::UInt32, + number::STRING +from numbers(1000); + +statement ok +create or replace table b( + k1 BIGINT, + k2 BIGINT, + v BIGINT, + start_time TIMESTAMP, + start_day UInt32, + trace_id STRING +) as +select + number::BIGINT, + number::BIGINT, + number::BIGINT, + '2024-01-01 00:00:00'::TIMESTAMP, + 20240101::UInt32, + number::STRING +from numbers(1000); + +statement ok +create or replace table c( + k1 BIGINT, + k2 BIGINT, + v BIGINT, + start_time TIMESTAMP, + start_day UInt32, + trace_id STRING +) as +select + number::BIGINT, + number::BIGINT, + number::BIGINT, + '2024-01-01 00:00:00'::TIMESTAMP, + 20240101::UInt32, + number::STRING +from numbers(1000); + +statement ok +analyze table a; + +statement ok +analyze table b; + +statement ok +analyze table c; + +# linear_expression_cluster_key +query T +explain join +SELECT * +FROM a +JOIN b + ON to_yyyymmdd(a.start_time) = b.start_day + AND substring(a.trace_id from 1 for 40) = b.trace_id +JOIN c ON a.k2 = c.k2; +---- +HashJoin: INNER +├── Build +│ └── Scan: default.join_reorder_cluster_key_cost.c (#2) (read rows: 1000) +└── Probe + └── HashJoin: INNER + ├── Build + │ └── Scan: default.join_reorder_cluster_key_cost.b (#1) (read rows: 1000) + └── Probe + └── Scan: default.join_reorder_cluster_key_cost.a (#0) (read rows: 1000) + +statement ok +unset cost_factor_cluster_key;