Skip to content

Commit 84d861a

Browse files
committed
Add operator_statistics framework for pluggable statistics propagation
Introduces a chain-of-responsibility architecture for statistics computation on physical plan nodes: - StatisticsProvider trait: chain element computing stats for operators - StatisticsRegistry: chains providers, with fast-path for empty config - ExtendedStatistics: Statistics with type-erased extension map - DefaultStatisticsProvider: delegates to each operator's partition_statistics() - PhysicalOptimizerContext trait with optimize_with_context() dispatch - ConfigOnlyContext for backward-compatible rule invocation - SessionState integration with statistics_registry field and builder
1 parent 0b8c4c5 commit 84d861a

6 files changed

Lines changed: 777 additions & 3 deletions

File tree

datafusion/core/src/execution/session_state.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,11 @@ use datafusion_optimizer::{
6868
};
6969
use datafusion_physical_expr::create_physical_expr;
7070
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
71+
use datafusion_physical_optimizer::PhysicalOptimizerContext;
7172
use datafusion_physical_optimizer::PhysicalOptimizerRule;
7273
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
7374
use datafusion_physical_plan::ExecutionPlan;
75+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
7476
use datafusion_session::Session;
7577
#[cfg(feature = "sql")]
7678
use datafusion_sql::{
@@ -191,11 +193,27 @@ pub struct SessionState {
191193
/// thus, changing dialect o PostgreSql is required
192194
function_factory: Option<Arc<dyn FunctionFactory>>,
193195
cache_factory: Option<Arc<dyn CacheFactory>>,
196+
/// Optional statistics registry for pluggable statistics providers.
197+
///
198+
/// When set, physical optimizer rules can use this registry to obtain
199+
/// enhanced statistics (e.g., NDV overrides, histograms) beyond what
200+
/// is available from `ExecutionPlan::partition_statistics()`.
201+
statistics_registry: Option<StatisticsRegistry>,
194202
/// Cache logical plans of prepared statements for later execution.
195203
/// Key is the prepared statement name.
196204
prepared_plans: HashMap<String, Arc<PreparedPlan>>,
197205
}
198206

207+
impl PhysicalOptimizerContext for SessionState {
208+
fn config_options(&self) -> &ConfigOptions {
209+
self.config_options()
210+
}
211+
212+
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
213+
self.statistics_registry.as_ref()
214+
}
215+
}
216+
199217
impl Debug for SessionState {
200218
/// Prefer having short fields at the top and long vector fields near the end
201219
/// Group fields by
@@ -817,6 +835,14 @@ impl SessionState {
817835
self.config.options()
818836
}
819837

838+
/// Returns the statistics registry if one is configured.
839+
///
840+
/// The registry provides pluggable statistics providers for enhanced
841+
/// cardinality estimation (e.g., NDV overrides, histograms).
842+
pub fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
843+
self.statistics_registry.as_ref()
844+
}
845+
820846
/// Mark the start of the execution
821847
pub fn mark_start_execution(&mut self) {
822848
let config = Arc::clone(self.config.options());
@@ -1006,6 +1032,7 @@ pub struct SessionStateBuilder {
10061032
runtime_env: Option<Arc<RuntimeEnv>>,
10071033
function_factory: Option<Arc<dyn FunctionFactory>>,
10081034
cache_factory: Option<Arc<dyn CacheFactory>>,
1035+
statistics_registry: Option<StatisticsRegistry>,
10091036
// fields to support convenience functions
10101037
analyzer_rules: Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>>,
10111038
optimizer_rules: Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>>,
@@ -1047,6 +1074,7 @@ impl SessionStateBuilder {
10471074
runtime_env: None,
10481075
function_factory: None,
10491076
cache_factory: None,
1077+
statistics_registry: None,
10501078
// fields to support convenience functions
10511079
analyzer_rules: None,
10521080
optimizer_rules: None,
@@ -1103,6 +1131,7 @@ impl SessionStateBuilder {
11031131
runtime_env: Some(existing.runtime_env),
11041132
function_factory: existing.function_factory,
11051133
cache_factory: existing.cache_factory,
1134+
statistics_registry: existing.statistics_registry,
11061135
// fields to support convenience functions
11071136
analyzer_rules: None,
11081137
optimizer_rules: None,
@@ -1424,6 +1453,16 @@ impl SessionStateBuilder {
14241453
self
14251454
}
14261455

1456+
/// Set a [`StatisticsRegistry`] for pluggable statistics providers.
1457+
///
1458+
/// The registry allows physical optimizer rules to access enhanced statistics
1459+
/// (e.g., NDV overrides, histograms) beyond what is available from
1460+
/// `ExecutionPlan::partition_statistics()`.
1461+
pub fn with_statistics_registry(mut self, registry: StatisticsRegistry) -> Self {
1462+
self.statistics_registry = Some(registry);
1463+
self
1464+
}
1465+
14271466
/// Register an `ObjectStore` to the [`RuntimeEnv`]. See [`RuntimeEnv::register_object_store`]
14281467
/// for more details.
14291468
///
@@ -1491,6 +1530,7 @@ impl SessionStateBuilder {
14911530
runtime_env,
14921531
function_factory,
14931532
cache_factory,
1533+
statistics_registry,
14941534
analyzer_rules,
14951535
optimizer_rules,
14961536
physical_optimizer_rules,
@@ -1531,6 +1571,7 @@ impl SessionStateBuilder {
15311571
runtime_env,
15321572
function_factory,
15331573
cache_factory,
1574+
statistics_registry,
15341575
prepared_plans: HashMap::new(),
15351576
};
15361577

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2773,7 +2773,7 @@ impl DefaultPhysicalPlanner {
27732773
for optimizer in optimizers {
27742774
let before_schema = new_plan.schema();
27752775
new_plan = optimizer
2776-
.optimize(new_plan, session_state.config_options())
2776+
.optimize_with_context(new_plan, session_state)
27772777
.map_err(|e| {
27782778
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
27792779
})?;

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ pub mod topk_repartition;
4747
pub mod update_aggr_exprs;
4848
pub mod utils;
4949

50-
pub use optimizer::PhysicalOptimizerRule;
50+
pub use optimizer::{ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule};

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,48 @@ use crate::pushdown_sort::PushdownSort;
4242
use datafusion_common::Result;
4343
use datafusion_common::config::ConfigOptions;
4444
use datafusion_physical_plan::ExecutionPlan;
45+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
46+
47+
/// Context available to physical optimizer rules.
48+
///
49+
/// This trait provides access to configuration options and optional statistics
50+
/// registry for enhanced statistics lookup. It allows optimizer rules to access
51+
/// extended context without changing the core [`PhysicalOptimizerRule::optimize`]
52+
/// signature.
53+
pub trait PhysicalOptimizerContext: Send + Sync {
54+
/// Returns the configuration options.
55+
fn config_options(&self) -> &ConfigOptions;
56+
57+
/// Returns the statistics registry for enhanced statistics lookup.
58+
///
59+
/// Returns `None` if no registry is configured, in which case rules
60+
/// should fall back to using `ExecutionPlan::partition_statistics()`.
61+
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
62+
None
63+
}
64+
}
65+
66+
/// Simple context wrapping [`ConfigOptions`] for backward compatibility.
67+
///
68+
/// This struct provides a minimal implementation of [`PhysicalOptimizerContext`]
69+
/// that only supplies configuration options. Used when no statistics registry
70+
/// is available or needed.
71+
pub struct ConfigOnlyContext<'a> {
72+
config: &'a ConfigOptions,
73+
}
74+
75+
impl<'a> ConfigOnlyContext<'a> {
76+
/// Create a new context wrapping the given config options.
77+
pub fn new(config: &'a ConfigOptions) -> Self {
78+
Self { config }
79+
}
80+
}
81+
82+
impl PhysicalOptimizerContext for ConfigOnlyContext<'_> {
83+
fn config_options(&self) -> &ConfigOptions {
84+
self.config
85+
}
86+
}
4587

4688
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
4789
/// computes the same results, but in a potentially more efficient way.
@@ -51,13 +93,29 @@ use datafusion_physical_plan::ExecutionPlan;
5193
///
5294
/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
5395
pub trait PhysicalOptimizerRule: Debug + std::any::Any {
54-
/// Rewrite `plan` to an optimized form
96+
/// Rewrite `plan` to an optimized form.
97+
///
98+
/// This is the primary optimization method. For rules that need access to
99+
/// the statistics registry, override [`optimize_with_context`](Self::optimize_with_context) instead.
55100
fn optimize(
56101
&self,
57102
plan: Arc<dyn ExecutionPlan>,
58103
config: &ConfigOptions,
59104
) -> Result<Arc<dyn ExecutionPlan>>;
60105

106+
/// Rewrite `plan` with access to extended context (statistics registry, etc.).
107+
///
108+
/// Override this method if you need access to the statistics registry for
109+
/// enhanced statistics lookup. The default implementation simply calls
110+
/// [`optimize`](Self::optimize) with the config options from the context.
111+
fn optimize_with_context(
112+
&self,
113+
plan: Arc<dyn ExecutionPlan>,
114+
context: &dyn PhysicalOptimizerContext,
115+
) -> Result<Arc<dyn ExecutionPlan>> {
116+
self.optimize(plan, context.config_options())
117+
}
118+
61119
/// A human readable name for this optimizer rule
62120
fn name(&self) -> &str;
63121

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub mod repartition;
8787
pub mod sort_pushdown;
8888
pub mod sorts;
8989
pub mod spill;
90+
pub mod operator_statistics;
9091
pub mod stream;
9192
pub mod streaming;
9293
pub mod tree_node;

0 commit comments

Comments
 (0)