Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ fn scan_with_stats(table_index: usize, column: Symbol, null_count: u64) -> SExpr
statistics: Statistics {
precise_cardinality: None,
column_stats: HashMap::from([(column, column_stat(null_count))]),
cluster_key_stats: Default::default(),
},
})),
)
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static COST_FACTOR_COMPUTE_PER_ROW: u64 = 1;
static COST_FACTOR_HASH_TABLE_PER_ROW: u64 = 10;
static COST_FACTOR_AGGREGATE_PER_ROW: u64 = 5;
static COST_FACTOR_NETWORK_PER_ROW: u64 = 50;
static COST_FACTOR_CLUSTER_KEY: u64 = 0;

// Settings for readability and writability of tags.
// we will not be able to safely get its value when set to only write.
Expand Down Expand Up @@ -1251,6 +1252,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("cost_factor_cluster_key", DefaultSettingValue {
value: UserSettingValue::UInt64(COST_FACTOR_CLUSTER_KEY),
desc: "Cost factor percentage for clustered keys in join ordering. Set to 0 to use the original join-order cost formula.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=100)),
}),
// This setting has been deprecated, retained to prevent set errors.
("enable_geo_create_table", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,10 @@ impl Settings {
self.try_get_u64("cost_factor_network_per_row")
}

pub fn get_cost_factor_cluster_key(&self) -> Result<u64> {
self.try_get_u64("cost_factor_cluster_key")
}

pub fn get_idle_transaction_timeout_secs(&self) -> Result<u64> {
self.try_get_u64("idle_transaction_timeout_secs")
}
Expand Down
55 changes: 53 additions & 2 deletions src/query/sql/src/planner/expression/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_ast::ast::Expr as AExpr;
Expand All @@ -25,6 +26,7 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::ColumnIndex;
use databend_common_expression::Constant;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Expr;
Expand Down Expand Up @@ -454,9 +456,39 @@ pub fn analyze_cluster_keys(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<(String, Vec<Expr<Symbol>>)> {
) -> Result<(String, Vec<Expr<FieldIndex>>)> {
analyze_cluster_keys_impl(
ctx,
table_meta,
sql,
|_, column| Ok(column.as_field_index()),
)
}

pub fn analyze_cluster_key_order(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
column_id_to_symbol: &HashMap<ColumnId, Symbol>,
) -> Result<Vec<Expr<Symbol>>> {
let (_, exprs) = analyze_cluster_keys_impl(ctx, table_meta, sql, |column_id, _| {
column_id_to_symbol
.get(&column_id)
.copied()
.ok_or_else(|| ErrorCode::Internal("Cluster key column should exist in table metadata"))
})?;
Ok(exprs)
}

fn analyze_cluster_keys_impl<Index: ColumnIndex + Copy>(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
mut project_column: impl FnMut(ColumnId, Symbol) -> Result<Index>,
) -> Result<(String, Vec<Expr<Index>>)> {
let ast_exprs = parse_cluster_key_exprs(sql)?;
let (mut bind_context, metadata) = bind_table(table_meta)?;
let metadata_ref = metadata.clone();
let name_resolution_ctx = NameResolutionContext::try_from(ctx.get_settings().as_ref())?;
let mut type_checker = TypeChecker::try_create(
&mut bind_context,
Expand All @@ -478,13 +510,30 @@ pub fn analyze_cluster_keys(
let mut cluster_keys = Vec::with_capacity(exprs.len());
for ast in ast_exprs {
let (scalar, _) = *type_checker.resolve(&ast)?;
if scalar.used_columns().len() != 1 || !scalar.evaluable() {
let used_columns = scalar.used_columns();
if used_columns.len() != 1 || !scalar.evaluable() {
return Err(ErrorCode::InvalidClusterKeys(format!(
"Cluster by expression `{:#}` is invalid",
ast
)));
}

let column = *used_columns
.iter()
.next()
.expect("cluster key should use one column");
let column_id = {
let metadata = metadata_ref.read();
let ColumnEntry::BaseTableColumn(BaseTableColumn { column_id, .. }) =
metadata.column(column)
else {
return Err(ErrorCode::InvalidClusterKeys(format!(
"Cluster by expression `{:#}` is invalid",
ast
)));
};
*column_id
};
let expr = scalar.as_symbol_expr()?;
if !expr.is_deterministic(&BUILTIN_FUNCTIONS) {
return Err(ErrorCode::InvalidClusterKeys(format!(
Expand All @@ -501,6 +550,8 @@ pub fn analyze_cluster_keys(
)));
}

let target_column = project_column(column_id, column)?;
let expr = expr.project_column_ref(|_| Ok(target_column))?;
exprs.push(expr);

let mut cluster_by = ast.clone();
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use expr::VisitAction;
pub use group::Group;
pub use group::GroupState;
pub use memo::Memo;
pub use property::ClusterKeyStatistics;
pub use property::Distribution;
pub use property::DistributionEnforcer;
pub use property::Enforcer;
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/ir/property/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use builder::RelExpr;
pub use enforcer::DistributionEnforcer;
pub use enforcer::Enforcer;
pub use enforcer::PropertyEnforcer;
pub use property::ClusterKeyStatistics;
pub use property::Distribution;
pub use property::PhysicalProperty;
pub use property::RelationalProperty;
Expand Down
59 changes: 59 additions & 0 deletions src/query/sql/src/planner/optimizer/ir/property/property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

use databend_common_exception::Result;
use databend_common_expression::Expr;

use crate::ColumnSet;
use crate::IndexType;
use crate::Symbol;
use crate::optimizer::ir::ColumnStatSet;
use crate::plans::ScalarExpr;
use crate::plans::ScalarItem;
Expand All @@ -38,13 +44,66 @@ impl Display for RequiredProperty {
}
}

#[derive(Default, Clone, Debug)]
pub struct ClusterKeyStatistics {
/// Table index -> cluster-key expressions in that table's cluster-key order.
pub keys: BTreeMap<IndexType, Vec<Expr<Symbol>>>,
/// Expressions filtered by equality with constants.
pub filter_keys: Vec<Expr<Symbol>>,
}

impl ClusterKeyStatistics {
pub fn collect_filter_keys<'a>(
predicates: impl IntoIterator<Item = &'a ScalarExpr>,
) -> Result<Vec<Expr<Symbol>>> {
let mut filter_keys = Vec::new();
for predicate in predicates {
Self::collect_equality_filter_key(predicate, &mut filter_keys)?;
}
Ok(filter_keys)
}

fn collect_equality_filter_key(
predicate: &ScalarExpr,
filter_keys: &mut Vec<Expr<Symbol>>,
) -> Result<()> {
let mut stack = vec![predicate];
while let Some(predicate) = stack.pop() {
let ScalarExpr::FunctionCall(function) = predicate else {
continue;
};

match function.func_name.as_str() {
"and" | "and_filters" => {
stack.extend(function.arguments.iter().rev());
}
"eq" if function.arguments.len() == 2 => {
let left = &function.arguments[0];
let right = &function.arguments[1];
match (
left.used_columns().is_empty(),
right.used_columns().is_empty(),
) {
(true, false) => filter_keys.push(right.as_symbol_expr()?),
(false, true) => filter_keys.push(left.as_symbol_expr()?),
_ => {}
}
}
_ => {}
}
}
Ok(())
}
}

#[derive(Default, Clone, Debug)]
pub struct Statistics {
// We can get the precise row count of a table in databend,
// which information is useful to optimize some queries like `COUNT(*)`.
pub precise_cardinality: Option<u64>,
/// Statistics of columns, column index -> column stat
pub column_stats: ColumnStatSet,
pub cluster_key_stats: ClusterKeyStatistics,
}

#[derive(Default, Clone, Debug)]
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/ir/stats/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ mod tests {
})),
}),
]),
cluster_key_stats: Default::default(),
};

JoinKeyStatUpdate::finish_join_histograms(&mut statistics, Symbol::new(0), true)?;
Expand Down Expand Up @@ -758,6 +759,7 @@ mod tests {
})),
}),
]),
cluster_key_stats: Default::default(),
};
let mut statistics = original_statistics.clone();
let join_stat = statistics.column_stats.get_mut(&Symbol::new(0)).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ pub use optimizer::optimize;
pub use optimizer::optimize_query;
pub use optimizer_api::Optimizer;
pub use optimizer_context::OptimizerContext;
pub use statistics::CollectStatisticsOptimizer;
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
// Apply statistics aggregation to gather and propagate statistics
.add(RuleStatsAggregateOptimizer::new(opt_ctx.clone()))
// Collect statistics for SExpr nodes to support cost estimation
.add(CollectStatisticsOptimizer::new(opt_ctx.clone()))
.add(CollectStatisticsOptimizer::new(opt_ctx.clone())?)
// Normalize aggregate, it should be executed before RuleSplitAggregate.
.add(RuleNormalizeAggregateOptimizer::new())
// Pull up and infer filter.
Expand Down Expand Up @@ -369,7 +369,7 @@ async fn get_optimized_memo(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
.add(SubqueryDecorrelatorOptimizer::new(opt_ctx.clone(), None))
.add(RuleStatsAggregateOptimizer::new(opt_ctx.clone()))
// Collect statistics for each leaf node in SExpr.
.add(CollectStatisticsOptimizer::new(opt_ctx.clone()))
.add(CollectStatisticsOptimizer::new(opt_ctx.clone())?)
// Pull up and infer filter.
.add(PullUpFilterOptimizer::new(opt_ctx.clone()))
// Run default rewrite rules
Expand Down
Loading
Loading