Skip to content

Commit c253bfb

Browse files
authored
feat: Add pluggable StatisticsRegistry for operator-level statistics propagation (#21483)
## Which issue does this PR close? - Part of #21443 (Pluggable operator-level statistics propagation) - Part of #8227 (statistics improvements epic) ## Rationale for this change DataFusion's built-in statistics propagation has no extension point: downstream projects cannot inject external catalog stats, override built-in estimation, or plug in custom strategies without forking. This PR introduces `StatisticsRegistry`, a pluggable chain-of-responsibility for operator-level statistics following the same pattern as `RelationPlanner` for SQL parsing and `ExpressionAnalyzer` (#21120) for expression-level stats. See #21443 for full motivation and design context. ## What changes are included in this PR? 1. Framework (`operator_statistics/mod.rs`): `StatisticsProvider` trait, `StatisticsRegistry` (chain-of-responsibility), `ExtendedStatistics` (Statistics + type-erased extension map), `DefaultStatisticsProvider`. `PhysicalOptimizerContext` trait with `optimize_with_context` dispatch. `SessionState` integration. 2. Built-in providers for Filter, Projection, Passthrough (sort/repartition/etc), Aggregate, Join (hash/sort-merge/nested-loop/cross), Limit, and Union. NDV utilities: `num_distinct_vals`, `ndv_after_selectivity`. 3. `ClosureStatisticsProvider`: closure-based provider for test injection and cardinality feedback. 4. JoinSelection integration: `use_statistics_registry` config flag (default false), registry-aware `optimize_with_context`, SLT test demonstrating plan difference on skewed data. ## Are these changes tested? - 39 unit tests covering all providers, NDV utilities, chain priority, and edge cases (Inexact precision, Absent propagation, Partial aggregate delegation, GROUPING SETS delegation, join-type bounds, multi-key NDV, exact Cartesian product, CrossJoin, GlobalLimit skip+fetch) - 1 SLT test (`statistics_registry.slt`): three-table join on skewed data (8:1:1 customer_id distribution) where the built-in NDV formula estimates 33 rows (wrong; actual=66) and the registry conservatively estimates 100, producing the correct build-side swap ## Are there any user-facing changes? New public API (purely additive, non-breaking): - `StatisticsProvider` trait and `StatisticsRegistry` in `datafusion-physical-plan` - `ExtendedStatistics`, `StatisticsResult` types; built-in provider structs; `num_distinct_vals`, `ndv_after_selectivity` utilities - `PhysicalOptimizerContext` trait and `ConfigOnlyContext` in `datafusion-physical-optimizer` - `SessionState::statistics_registry()`, `SessionStateBuilder::with_statistics_registry()` - Config: `datafusion.optimizer.use_statistics_registry` (default false) Default behavior is unchanged. The registry is only consulted when the flag is explicitly enabled. Known limitations: - Column-level stats (NDV, min/max) at Join/Aggregate/Union/Limit boundaries are not improved: these operators call `partition_statistics(None)` internally, re-fetching raw child stats and discarding registry enrichment. 4 TODO comments mark the affected call sites; #20184 would close this gap. - No `ExpressionAnalyzer` integration yet (#21122). --- Disclaimer: I used AI to assist in the code generation, I have manually reviewed the output and it matches my intention and understanding.
1 parent d68373e commit c253bfb

File tree

12 files changed

+2680
-44
lines changed

12 files changed

+2680
-44
lines changed

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,12 @@ config_namespace! {
12441244
/// query is used.
12451245
pub join_reordering: bool, default = true
12461246

1247+
/// When set to true, the physical plan optimizer uses the pluggable
1248+
/// `StatisticsRegistry` for statistics propagation across operators.
1249+
/// This enables more accurate cardinality estimates compared to each
1250+
/// operator's built-in `partition_statistics`.
1251+
pub use_statistics_registry: bool, default = false
1252+
12471253
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
12481254
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
12491255
pub prefer_hash_join: bool, default = true

datafusion/core/src/execution/session_state.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,11 @@ use datafusion_optimizer::{
6868
};
6969
use datafusion_physical_expr::create_physical_expr;
7070
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
71+
use datafusion_physical_optimizer::PhysicalOptimizerContext;
7172
use datafusion_physical_optimizer::PhysicalOptimizerRule;
7273
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
7374
use datafusion_physical_plan::ExecutionPlan;
75+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
7476
use datafusion_session::Session;
7577
#[cfg(feature = "sql")]
7678
use datafusion_sql::{
@@ -191,11 +193,27 @@ pub struct SessionState {
191193
/// thus, changing dialect o PostgreSql is required
192194
function_factory: Option<Arc<dyn FunctionFactory>>,
193195
cache_factory: Option<Arc<dyn CacheFactory>>,
196+
/// Optional statistics registry for pluggable statistics providers.
197+
///
198+
/// When set, physical optimizer rules can use this registry to obtain
199+
/// enhanced statistics (e.g., NDV overrides, histograms) beyond what
200+
/// is available from `ExecutionPlan::partition_statistics()`.
201+
statistics_registry: Option<StatisticsRegistry>,
194202
/// Cache logical plans of prepared statements for later execution.
195203
/// Key is the prepared statement name.
196204
prepared_plans: HashMap<String, Arc<PreparedPlan>>,
197205
}
198206

207+
impl PhysicalOptimizerContext for SessionState {
208+
fn config_options(&self) -> &ConfigOptions {
209+
self.config_options()
210+
}
211+
212+
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
213+
self.statistics_registry.as_ref()
214+
}
215+
}
216+
199217
impl Debug for SessionState {
200218
/// Prefer having short fields at the top and long vector fields near the end
201219
/// Group fields by
@@ -817,6 +835,14 @@ impl SessionState {
817835
self.config.options()
818836
}
819837

838+
/// Returns the statistics registry if one is configured.
839+
///
840+
/// The registry provides pluggable statistics providers for enhanced
841+
/// cardinality estimation (e.g., NDV overrides, histograms).
842+
pub fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
843+
self.statistics_registry.as_ref()
844+
}
845+
820846
/// Mark the start of the execution
821847
pub fn mark_start_execution(&mut self) {
822848
let config = Arc::clone(self.config.options());
@@ -1006,6 +1032,7 @@ pub struct SessionStateBuilder {
10061032
runtime_env: Option<Arc<RuntimeEnv>>,
10071033
function_factory: Option<Arc<dyn FunctionFactory>>,
10081034
cache_factory: Option<Arc<dyn CacheFactory>>,
1035+
statistics_registry: Option<StatisticsRegistry>,
10091036
// fields to support convenience functions
10101037
analyzer_rules: Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>>,
10111038
optimizer_rules: Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>>,
@@ -1047,6 +1074,7 @@ impl SessionStateBuilder {
10471074
runtime_env: None,
10481075
function_factory: None,
10491076
cache_factory: None,
1077+
statistics_registry: None,
10501078
// fields to support convenience functions
10511079
analyzer_rules: None,
10521080
optimizer_rules: None,
@@ -1103,6 +1131,7 @@ impl SessionStateBuilder {
11031131
runtime_env: Some(existing.runtime_env),
11041132
function_factory: existing.function_factory,
11051133
cache_factory: existing.cache_factory,
1134+
statistics_registry: existing.statistics_registry,
11061135
// fields to support convenience functions
11071136
analyzer_rules: None,
11081137
optimizer_rules: None,
@@ -1424,6 +1453,16 @@ impl SessionStateBuilder {
14241453
self
14251454
}
14261455

1456+
/// Set a [`StatisticsRegistry`] for pluggable statistics providers.
1457+
///
1458+
/// The registry allows physical optimizer rules to access enhanced statistics
1459+
/// (e.g., NDV overrides, histograms) beyond what is available from
1460+
/// `ExecutionPlan::partition_statistics()`.
1461+
pub fn with_statistics_registry(mut self, registry: StatisticsRegistry) -> Self {
1462+
self.statistics_registry = Some(registry);
1463+
self
1464+
}
1465+
14271466
/// Register an `ObjectStore` to the [`RuntimeEnv`]. See [`RuntimeEnv::register_object_store`]
14281467
/// for more details.
14291468
///
@@ -1491,6 +1530,7 @@ impl SessionStateBuilder {
14911530
runtime_env,
14921531
function_factory,
14931532
cache_factory,
1533+
statistics_registry,
14941534
analyzer_rules,
14951535
optimizer_rules,
14961536
physical_optimizer_rules,
@@ -1531,6 +1571,7 @@ impl SessionStateBuilder {
15311571
runtime_env,
15321572
function_factory,
15331573
cache_factory,
1574+
statistics_registry,
15341575
prepared_plans: HashMap::new(),
15351576
};
15361577

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2773,7 +2773,7 @@ impl DefaultPhysicalPlanner {
27732773
for optimizer in optimizers {
27742774
let before_schema = new_plan.schema();
27752775
new_plan = optimizer
2776-
.optimize(new_plan, session_state.config_options())
2776+
.optimize_with_context(new_plan, session_state)
27772777
.map_err(|e| {
27782778
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
27792779
})?;

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 68 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
//! `PartitionMode` and the build side using the available statistics for hash joins.
2525
2626
use crate::PhysicalOptimizerRule;
27+
use crate::optimizer::{ConfigOnlyContext, PhysicalOptimizerContext};
28+
use datafusion_common::Statistics;
2729
use datafusion_common::config::ConfigOptions;
2830
use datafusion_common::error::Result;
2931
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -37,6 +39,7 @@ use datafusion_physical_plan::joins::{
3739
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
3840
StreamJoinPartitionMode, SymmetricHashJoinExec,
3941
};
42+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
4043
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
4144
use std::sync::Arc;
4245

@@ -53,36 +56,49 @@ impl JoinSelection {
5356
}
5457
}
5558

59+
/// Get statistics for a plan node, using the registry if available.
60+
fn get_stats(
61+
plan: &dyn ExecutionPlan,
62+
registry: Option<&StatisticsRegistry>,
63+
) -> Result<Arc<Statistics>> {
64+
if let Some(reg) = registry {
65+
reg.compute(plan)
66+
.map(|s| Arc::<Statistics>::clone(s.base_arc()))
67+
} else {
68+
plan.partition_statistics(None)
69+
}
70+
}
71+
5672
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
5773
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
5874
/// Checks whether join inputs should be swapped using available statistics.
5975
///
6076
/// It follows these steps:
61-
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
77+
/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78+
/// (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79+
/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
6280
/// the left (build) side.
63-
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64-
/// 3. Do not reorder the join if neither statistic is available, or if
81+
/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82+
/// 4. Do not reorder the join if neither statistic is available, or if
6583
/// `datafusion.optimizer.join_reordering` is disabled.
6684
///
67-
///
6885
/// Used configurations inside arg `config`
6986
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
7087
pub(crate) fn should_swap_join_order(
7188
left: &dyn ExecutionPlan,
7289
right: &dyn ExecutionPlan,
7390
config: &ConfigOptions,
91+
registry: Option<&StatisticsRegistry>,
7492
) -> Result<bool> {
7593
if !config.optimizer.join_reordering {
7694
return Ok(false);
7795
}
7896

79-
// Get the left and right table's total bytes
80-
// If both the left and right tables contain total_byte_size statistics,
81-
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
82-
let left_stats = left.partition_statistics(None)?;
83-
let right_stats = right.partition_statistics(None)?;
84-
// First compare `total_byte_size` of left and right side,
85-
// if information in this field is insufficient fallback to the `num_rows`
97+
let left_stats = get_stats(left, registry)?;
98+
let right_stats = get_stats(right, registry)?;
99+
100+
// First compare total_byte_size, then fall back to num_rows if byte
101+
// sizes are unavailable.
86102
match (
87103
left_stats.total_byte_size.get_value(),
88104
right_stats.total_byte_size.get_value(),
@@ -102,8 +118,9 @@ fn supports_collect_by_thresholds(
102118
plan: &dyn ExecutionPlan,
103119
threshold_byte_size: usize,
104120
threshold_num_rows: usize,
121+
registry: Option<&StatisticsRegistry>,
105122
) -> bool {
106-
let Ok(stats) = plan.partition_statistics(None) else {
123+
let Ok(stats) = get_stats(plan, registry) else {
107124
return false;
108125
};
109126

@@ -126,31 +143,36 @@ impl PhysicalOptimizerRule for JoinSelection {
126143
plan: Arc<dyn ExecutionPlan>,
127144
config: &ConfigOptions,
128145
) -> Result<Arc<dyn ExecutionPlan>> {
129-
// First, we make pipeline-fixing modifications to joins so as to accommodate
130-
// unbounded inputs. Each pipeline-fixing subrule, which is a function
131-
// of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
132-
// argument storing state variables that indicate the unboundedness status
133-
// of the current [`ExecutionPlan`] as we traverse the plan tree.
146+
self.optimize_with_context(plan, &ConfigOnlyContext::new(config))
147+
}
148+
149+
fn optimize_with_context(
150+
&self,
151+
plan: Arc<dyn ExecutionPlan>,
152+
context: &dyn PhysicalOptimizerContext,
153+
) -> Result<Arc<dyn ExecutionPlan>> {
154+
let config = context.config_options();
155+
let mut default_registry = None;
156+
let registry: Option<&StatisticsRegistry> =
157+
if config.optimizer.use_statistics_registry {
158+
Some(context.statistics_registry().unwrap_or_else(|| {
159+
default_registry
160+
.insert(StatisticsRegistry::default_with_builtin_providers())
161+
}))
162+
} else {
163+
None
164+
};
134165
let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
135166
Box::new(hash_join_convert_symmetric_subrule),
136167
Box::new(hash_join_swap_subrule),
137168
];
138169
let new_plan = plan
139170
.transform_up(|p| apply_subrules(p, &subrules, config))
140171
.data()?;
141-
// Next, we apply another subrule that tries to optimize joins using any
142-
// statistics their inputs might have.
143-
// - For a hash join with partition mode [`PartitionMode::Auto`], we will
144-
// make a cost-based decision to select which `PartitionMode` mode
145-
// (`Partitioned`/`CollectLeft`) is optimal. If the statistics information
146-
// is not available, we will fall back to [`PartitionMode::Partitioned`].
147-
// - We optimize/swap join sides so that the left (build) side of the join
148-
// is the small side. If the statistics information is not available, we
149-
// do not modify join sides.
150-
// - We will also swap left and right sides for cross joins so that the left
151-
// side is the small side.
152172
new_plan
153-
.transform_up(|plan| statistical_join_selection_subrule(plan, config))
173+
.transform_up(|plan| {
174+
statistical_join_selection_subrule(plan, config, registry)
175+
})
154176
.data()
155177
}
156178

@@ -178,6 +200,7 @@ pub(crate) fn try_collect_left(
178200
hash_join: &HashJoinExec,
179201
ignore_threshold: bool,
180202
config: &ConfigOptions,
203+
registry: Option<&StatisticsRegistry>,
181204
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
182205
let left = hash_join.left();
183206
let right = hash_join.right();
@@ -188,20 +211,22 @@ pub(crate) fn try_collect_left(
188211
&**left,
189212
optimizer_config.hash_join_single_partition_threshold,
190213
optimizer_config.hash_join_single_partition_threshold_rows,
214+
registry,
191215
);
192216
let right_can_collect = ignore_threshold
193217
|| supports_collect_by_thresholds(
194218
&**right,
195219
optimizer_config.hash_join_single_partition_threshold,
196220
optimizer_config.hash_join_single_partition_threshold_rows,
221+
registry,
197222
);
198223

199224
match (left_can_collect, right_can_collect) {
200225
(true, true) => {
201226
// Don't swap null-aware anti joins as they have specific side requirements
202227
if hash_join.join_type().supports_swap()
203228
&& !hash_join.null_aware
204-
&& should_swap_join_order(&**left, &**right, config)?
229+
&& should_swap_join_order(&**left, &**right, config, registry)?
205230
{
206231
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
207232
} else {
@@ -245,13 +270,14 @@ pub(crate) fn try_collect_left(
245270
pub(crate) fn partitioned_hash_join(
246271
hash_join: &HashJoinExec,
247272
config: &ConfigOptions,
273+
registry: Option<&StatisticsRegistry>,
248274
) -> Result<Arc<dyn ExecutionPlan>> {
249275
let left = hash_join.left();
250276
let right = hash_join.right();
251277
// Don't swap null-aware anti joins as they have specific side requirements
252278
if hash_join.join_type().supports_swap()
253279
&& !hash_join.null_aware
254-
&& should_swap_join_order(&**left, &**right, config)?
280+
&& should_swap_join_order(&**left, &**right, config, registry)?
255281
{
256282
hash_join.swap_inputs(PartitionMode::Partitioned)
257283
} else {
@@ -285,26 +311,28 @@ pub(crate) fn partitioned_hash_join(
285311
fn statistical_join_selection_subrule(
286312
plan: Arc<dyn ExecutionPlan>,
287313
config: &ConfigOptions,
314+
registry: Option<&StatisticsRegistry>,
288315
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
289316
let transformed = if let Some(hash_join) = plan.downcast_ref::<HashJoinExec>() {
290317
match hash_join.partition_mode() {
291-
PartitionMode::Auto => try_collect_left(hash_join, false, config)?
318+
PartitionMode::Auto => try_collect_left(hash_join, false, config, registry)?
292319
.map_or_else(
293-
|| partitioned_hash_join(hash_join, config).map(Some),
320+
|| partitioned_hash_join(hash_join, config, registry).map(Some),
294321
|v| Ok(Some(v)),
295322
)?,
296-
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)?
297-
.map_or_else(
298-
|| partitioned_hash_join(hash_join, config).map(Some),
323+
PartitionMode::CollectLeft => {
324+
try_collect_left(hash_join, true, config, registry)?.map_or_else(
325+
|| partitioned_hash_join(hash_join, config, registry).map(Some),
299326
|v| Ok(Some(v)),
300-
)?,
327+
)?
328+
}
301329
PartitionMode::Partitioned => {
302330
let left = hash_join.left();
303331
let right = hash_join.right();
304332
// Don't swap null-aware anti joins as they have specific side requirements
305333
if hash_join.join_type().supports_swap()
306334
&& !hash_join.null_aware
307-
&& should_swap_join_order(&**left, &**right, config)?
335+
&& should_swap_join_order(&**left, &**right, config, registry)?
308336
{
309337
hash_join
310338
.swap_inputs(PartitionMode::Partitioned)
@@ -317,7 +345,7 @@ fn statistical_join_selection_subrule(
317345
} else if let Some(cross_join) = plan.downcast_ref::<CrossJoinExec>() {
318346
let left = cross_join.left();
319347
let right = cross_join.right();
320-
if should_swap_join_order(&**left, &**right, config)? {
348+
if should_swap_join_order(&**left, &**right, config, registry)? {
321349
cross_join.swap_inputs().map(Some)?
322350
} else {
323351
None
@@ -326,7 +354,7 @@ fn statistical_join_selection_subrule(
326354
let left = nl_join.left();
327355
let right = nl_join.right();
328356
if nl_join.join_type().supports_swap()
329-
&& should_swap_join_order(&**left, &**right, config)?
357+
&& should_swap_join_order(&**left, &**right, config, registry)?
330358
{
331359
nl_join.swap_inputs().map(Some)?
332360
} else {

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ pub mod topk_repartition;
4747
pub mod update_aggr_exprs;
4848
pub mod utils;
4949

50-
pub use optimizer::PhysicalOptimizerRule;
50+
pub use optimizer::{ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule};

0 commit comments

Comments
 (0)