Skip to content

Commit ab23d15

Browse files
committed
Add operator_statistics framework for pluggable statistics propagation
Introduces a chain-of-responsibility architecture for statistics computation on physical plan nodes: - StatisticsProvider trait: chain element computing stats for operators - StatisticsRegistry: chains providers, with fast-path for empty config - ExtendedStatistics: Statistics with type-erased extension map - DefaultStatisticsProvider: delegates to each operator's partition_statistics() - PhysicalOptimizerContext trait with optimize_with_context() dispatch - ConfigOnlyContext for backward-compatible rule invocation - SessionState integration with statistics_registry field and builder
1 parent 40ae37e commit ab23d15

6 files changed

Lines changed: 857 additions & 3 deletions

File tree

datafusion/core/src/execution/session_state.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ use datafusion_physical_expr::create_physical_expr;
7070
use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry;
7171
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
7272
use datafusion_physical_optimizer::PhysicalOptimizerRule;
73+
use datafusion_physical_optimizer::PhysicalOptimizerContext;
7374
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
7475
use datafusion_physical_plan::ExecutionPlan;
76+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
7577
use datafusion_session::Session;
7678
#[cfg(feature = "sql")]
7779
use datafusion_sql::{
@@ -194,11 +196,27 @@ pub struct SessionState {
194196
/// thus, changing dialect o PostgreSql is required
195197
function_factory: Option<Arc<dyn FunctionFactory>>,
196198
cache_factory: Option<Arc<dyn CacheFactory>>,
199+
/// Optional statistics registry for pluggable statistics providers.
200+
///
201+
/// When set, physical optimizer rules can use this registry to obtain
202+
/// enhanced statistics (e.g., NDV overrides, histograms) beyond what
203+
/// is available from `ExecutionPlan::partition_statistics()`.
204+
statistics_registry: Option<StatisticsRegistry>,
197205
/// Cache logical plans of prepared statements for later execution.
198206
/// Key is the prepared statement name.
199207
prepared_plans: HashMap<String, Arc<PreparedPlan>>,
200208
}
201209

210+
impl PhysicalOptimizerContext for SessionState {
211+
fn config_options(&self) -> &ConfigOptions {
212+
self.config_options()
213+
}
214+
215+
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
216+
self.statistics_registry.as_ref()
217+
}
218+
}
219+
202220
impl Debug for SessionState {
203221
/// Prefer having short fields at the top and long vector fields near the end
204222
/// Group fields by
@@ -824,6 +842,14 @@ impl SessionState {
824842
self.config.options()
825843
}
826844

845+
/// Returns the statistics registry if one is configured.
846+
///
847+
/// The registry provides pluggable statistics providers for enhanced
848+
/// cardinality estimation (e.g., NDV overrides, histograms).
849+
pub fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
850+
self.statistics_registry.as_ref()
851+
}
852+
827853
/// Mark the start of the execution
828854
pub fn mark_start_execution(&mut self) {
829855
let config = Arc::clone(self.config.options());
@@ -1019,6 +1045,7 @@ pub struct SessionStateBuilder {
10191045
runtime_env: Option<Arc<RuntimeEnv>>,
10201046
function_factory: Option<Arc<dyn FunctionFactory>>,
10211047
cache_factory: Option<Arc<dyn CacheFactory>>,
1048+
statistics_registry: Option<StatisticsRegistry>,
10221049
// fields to support convenience functions
10231050
analyzer_rules: Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>>,
10241051
optimizer_rules: Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>>,
@@ -1061,6 +1088,7 @@ impl SessionStateBuilder {
10611088
runtime_env: None,
10621089
function_factory: None,
10631090
cache_factory: None,
1091+
statistics_registry: None,
10641092
// fields to support convenience functions
10651093
analyzer_rules: None,
10661094
optimizer_rules: None,
@@ -1118,6 +1146,7 @@ impl SessionStateBuilder {
11181146
runtime_env: Some(existing.runtime_env),
11191147
function_factory: existing.function_factory,
11201148
cache_factory: existing.cache_factory,
1149+
statistics_registry: existing.statistics_registry,
11211150
// fields to support convenience functions
11221151
analyzer_rules: None,
11231152
optimizer_rules: None,
@@ -1448,6 +1477,16 @@ impl SessionStateBuilder {
14481477
self
14491478
}
14501479

1480+
/// Set a [`StatisticsRegistry`] for pluggable statistics providers.
1481+
///
1482+
/// The registry allows physical optimizer rules to access enhanced statistics
1483+
/// (e.g., NDV overrides, histograms) beyond what is available from
1484+
/// `ExecutionPlan::partition_statistics()`.
1485+
pub fn with_statistics_registry(mut self, registry: StatisticsRegistry) -> Self {
1486+
self.statistics_registry = Some(registry);
1487+
self
1488+
}
1489+
14511490
/// Register an `ObjectStore` to the [`RuntimeEnv`]. See [`RuntimeEnv::register_object_store`]
14521491
/// for more details.
14531492
///
@@ -1516,6 +1555,7 @@ impl SessionStateBuilder {
15161555
runtime_env,
15171556
function_factory,
15181557
cache_factory,
1558+
statistics_registry,
15191559
analyzer_rules,
15201560
optimizer_rules,
15211561
physical_optimizer_rules,
@@ -1558,6 +1598,7 @@ impl SessionStateBuilder {
15581598
runtime_env,
15591599
function_factory,
15601600
cache_factory,
1601+
statistics_registry,
15611602
prepared_plans: HashMap::new(),
15621603
};
15631604

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2776,7 +2776,7 @@ impl DefaultPhysicalPlanner {
27762776
for optimizer in optimizers {
27772777
let before_schema = new_plan.schema();
27782778
new_plan = optimizer
2779-
.optimize(new_plan, session_state.config_options())
2779+
.optimize_with_context(new_plan, session_state)
27802780
.map_err(|e| {
27812781
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
27822782
})?;

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ pub mod topk_repartition;
4747
pub mod update_aggr_exprs;
4848
pub mod utils;
4949

50-
pub use optimizer::PhysicalOptimizerRule;
50+
pub use optimizer::{ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule};

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,48 @@ use crate::pushdown_sort::PushdownSort;
4242
use datafusion_common::Result;
4343
use datafusion_common::config::ConfigOptions;
4444
use datafusion_physical_plan::ExecutionPlan;
45+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
46+
47+
/// Context available to physical optimizer rules.
48+
///
49+
/// This trait provides access to configuration options and optional statistics
50+
/// registry for enhanced statistics lookup. It allows optimizer rules to access
51+
/// extended context without changing the core [`PhysicalOptimizerRule::optimize`]
52+
/// signature.
53+
pub trait PhysicalOptimizerContext: Send + Sync {
54+
/// Returns the configuration options.
55+
fn config_options(&self) -> &ConfigOptions;
56+
57+
/// Returns the statistics registry for enhanced statistics lookup.
58+
///
59+
/// Returns `None` if no registry is configured, in which case rules
60+
/// should fall back to using `ExecutionPlan::partition_statistics()`.
61+
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
62+
None
63+
}
64+
}
65+
66+
/// Simple context wrapping [`ConfigOptions`] for backward compatibility.
67+
///
68+
/// This struct provides a minimal implementation of [`PhysicalOptimizerContext`]
69+
/// that only supplies configuration options. Used when no statistics registry
70+
/// is available or needed.
71+
pub struct ConfigOnlyContext<'a> {
72+
config: &'a ConfigOptions,
73+
}
74+
75+
impl<'a> ConfigOnlyContext<'a> {
76+
/// Create a new context wrapping the given config options.
77+
pub fn new(config: &'a ConfigOptions) -> Self {
78+
Self { config }
79+
}
80+
}
81+
82+
impl PhysicalOptimizerContext for ConfigOnlyContext<'_> {
83+
fn config_options(&self) -> &ConfigOptions {
84+
self.config
85+
}
86+
}
4587

4688
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
4789
/// computes the same results, but in a potentially more efficient way.
@@ -51,13 +93,29 @@ use datafusion_physical_plan::ExecutionPlan;
5193
///
5294
/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
5395
pub trait PhysicalOptimizerRule: Debug + std::any::Any {
54-
/// Rewrite `plan` to an optimized form
96+
/// Rewrite `plan` to an optimized form.
97+
///
98+
/// This is the primary optimization method. For rules that need access to
99+
/// the statistics registry, override [`optimize_with_context`](Self::optimize_with_context) instead.
55100
fn optimize(
56101
&self,
57102
plan: Arc<dyn ExecutionPlan>,
58103
config: &ConfigOptions,
59104
) -> Result<Arc<dyn ExecutionPlan>>;
60105

106+
/// Rewrite `plan` with access to extended context (statistics registry, etc.).
107+
///
108+
/// Override this method if you need access to the statistics registry for
109+
/// enhanced statistics lookup. The default implementation simply calls
110+
/// [`optimize`](Self::optimize) with the config options from the context.
111+
fn optimize_with_context(
112+
&self,
113+
plan: Arc<dyn ExecutionPlan>,
114+
context: &dyn PhysicalOptimizerContext,
115+
) -> Result<Arc<dyn ExecutionPlan>> {
116+
self.optimize(plan, context.config_options())
117+
}
118+
61119
/// A human readable name for this optimizer rule
62120
fn name(&self) -> &str;
63121

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub mod repartition;
8787
pub mod sort_pushdown;
8888
pub mod sorts;
8989
pub mod spill;
90+
pub mod operator_statistics;
9091
pub mod stream;
9192
pub mod streaming;
9293
pub mod tree_node;

0 commit comments

Comments
 (0)