Skip to content

Commit bc32cf2

Browse files
committed
Replace partition_statistics_with_context with statistics_with_args
Introduce StatisticsArgs struct combining partition, child_stats, and cache. Adds child_stats_for helper to eliminate per-operator boilerplate. Removes StatisticsContext (flattened into StatisticsArgs).
1 parent 44f1022 commit bc32cf2

41 files changed

Lines changed: 454 additions & 789 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ use datafusion::{
108108
},
109109
physical_expr::EquivalenceProperties,
110110
physical_plan::{
111-
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsContext,
111+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsArgs,
112112
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
113113
},
114114
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
@@ -722,12 +722,8 @@ impl ExecutionPlan for SampleExec {
722722
Some(self.metrics.clone_inner())
723723
}
724724

725-
fn partition_statistics_with_context(
726-
&self,
727-
_partition: Option<usize>,
728-
ctx: &StatisticsContext,
729-
) -> Result<Arc<Statistics>> {
730-
let mut stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0]));
725+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
726+
let mut stats = Arc::unwrap_or_clone(Arc::clone(&args.child_stats()[0]));
731727
let ratio = self.upper_bound - self.lower_bound;
732728

733729
// Scale statistics by sampling ratio (inexact due to randomness)

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use datafusion_common::stats::Precision;
4141
use datafusion_common::tree_node::TreeNodeRecursion;
4242
use datafusion_physical_expr::EquivalenceProperties;
4343
use datafusion_physical_plan::PlanProperties;
44-
use datafusion_physical_plan::StatisticsContext;
44+
use datafusion_physical_plan::StatisticsArgs;
4545
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
4646
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
4747

@@ -180,12 +180,8 @@ impl ExecutionPlan for CustomExecutionPlan {
180180
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
181181
}
182182

183-
fn partition_statistics_with_context(
184-
&self,
185-
partition: Option<usize>,
186-
_ctx: &StatisticsContext,
187-
) -> Result<Arc<Statistics>> {
188-
if partition.is_some() {
183+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
184+
if args.partition().is_some() {
189185
return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
190186
}
191187
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_catalog::Session;
3636
use datafusion_common::tree_node::TreeNodeRecursion;
3737
use datafusion_common::{project_schema, stats::Precision};
3838
use datafusion_physical_expr::EquivalenceProperties;
39-
use datafusion_physical_plan::StatisticsContext;
39+
use datafusion_physical_plan::StatisticsArgs;
4040
use datafusion_physical_plan::compute_statistics;
4141
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
4242

@@ -176,12 +176,8 @@ impl ExecutionPlan for StatisticsValidation {
176176
unimplemented!("This plan only serves for testing statistics")
177177
}
178178

179-
fn partition_statistics_with_context(
180-
&self,
181-
partition: Option<usize>,
182-
_ctx: &StatisticsContext,
183-
) -> Result<Arc<Statistics>> {
184-
if partition.is_some() {
179+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
180+
if args.partition().is_some() {
185181
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
186182
} else {
187183
Ok(Arc::new(self.stats.clone()))

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use datafusion_physical_plan::joins::utils::JoinFilter;
4545
use datafusion_physical_plan::joins::{HashJoinExec, NestedLoopJoinExec, PartitionMode};
4646
use datafusion_physical_plan::projection::ProjectionExec;
4747
use datafusion_physical_plan::{
48-
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsContext,
48+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsArgs,
4949
compute_statistics,
5050
execution_plan::{Boundedness, EmissionType},
5151
};
@@ -1140,12 +1140,8 @@ impl ExecutionPlan for StatisticsExec {
11401140
unimplemented!("This plan only serves for testing statistics")
11411141
}
11421142

1143-
fn partition_statistics_with_context(
1144-
&self,
1145-
partition: Option<usize>,
1146-
_ctx: &StatisticsContext,
1147-
) -> Result<Arc<Statistics>> {
1148-
Ok(Arc::new(if partition.is_some() {
1143+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
1144+
Ok(Arc::new(if args.partition().is_some() {
11491145
Statistics::new_unknown(&self.schema)
11501146
} else {
11511147
self.stats.clone()

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use datafusion_physical_plan::union::UnionExec;
7070
use datafusion_physical_plan::windows::{BoundedWindowAggExec, create_window_expr};
7171
use datafusion_physical_plan::{
7272
DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, Partitioning,
73-
PlanProperties, SortOrderPushdownResult, StatisticsContext, displayable,
73+
PlanProperties, SortOrderPushdownResult, StatisticsArgs, displayable,
7474
};
7575

7676
/// Create a non sorted parquet exec
@@ -983,11 +983,7 @@ impl ExecutionPlan for TestScan {
983983
internal_err!("TestScan is for testing optimizer only, not for execution")
984984
}
985985

986-
fn partition_statistics_with_context(
987-
&self,
988-
_partition: Option<usize>,
989-
_ctx: &StatisticsContext,
990-
) -> Result<Arc<Statistics>> {
986+
fn statistics_with_args(&self, _args: &StatisticsArgs) -> Result<Arc<Statistics>> {
991987
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
992988
}
993989

datafusion/datasource/src/source.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
4646
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
4747
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
4848
use datafusion_physical_plan::SortOrderPushdownResult;
49-
use datafusion_physical_plan::StatisticsContext;
49+
use datafusion_physical_plan::StatisticsArgs;
5050
use datafusion_physical_plan::filter_pushdown::{
5151
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
5252
};
@@ -452,12 +452,8 @@ impl ExecutionPlan for DataSourceExec {
452452
Some(metrics)
453453
}
454454

455-
fn partition_statistics_with_context(
456-
&self,
457-
partition: Option<usize>,
458-
_ctx: &StatisticsContext,
459-
) -> Result<Arc<Statistics>> {
460-
self.data_source.partition_statistics(partition)
455+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
456+
self.data_source.partition_statistics(args.partition())
461457
}
462458

463459
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ use datafusion_common::stats::Precision;
7171
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
7272
use datafusion_common::utils::combine_limit;
7373
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
74+
use datafusion_physical_plan::compute_statistics;
7475
use datafusion_physical_plan::empty::EmptyExec;
7576
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
7677
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
@@ -351,7 +352,7 @@ fn limit_eliminable_exact_num_rows(
351352
}
352353

353354
if matches!(
354-
current.partition_statistics(None)?.num_rows,
355+
compute_statistics(current.as_ref(), None)?.num_rows,
355356
Precision::Exact(0)
356357
) {
357358
return Ok(Some(0));

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ use datafusion_common::{Result, Statistics};
3434
use datafusion_execution::TaskContext;
3535
use datafusion_physical_expr::Distribution;
3636
use datafusion_physical_expr_common::sort_expr::OrderingRequirements;
37-
use datafusion_physical_plan::StatisticsContext;
3837
use datafusion_physical_plan::execution_plan::Boundedness;
3938
use datafusion_physical_plan::projection::{
4039
ProjectionExec, make_with_child, update_expr, update_ordering_requirement,
4140
};
4241
use datafusion_physical_plan::sorts::sort::SortExec;
4342
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
43+
use datafusion_physical_plan::statistics_context::StatisticsArgs;
4444
use datafusion_physical_plan::{
4545
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
4646
SendableRecordBatchStream,
@@ -243,15 +243,8 @@ impl ExecutionPlan for OutputRequirementExec {
243243
unreachable!();
244244
}
245245

246-
fn partition_statistics_with_context(
247-
&self,
248-
partition: Option<usize>,
249-
ctx: &StatisticsContext,
250-
) -> Result<Arc<Statistics>> {
251-
match partition {
252-
Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition),
253-
None => Ok(Arc::clone(&ctx.child_stats()[0])),
254-
}
246+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
247+
args.child_stats_for(0, self.input.as_ref())
255248
}
256249

257250
fn try_swapping_with_projection(

datafusion/physical-plan/benches/compute_statistics.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ use datafusion_physical_plan::execution_plan::{
4141
Boundedness, EmissionType, ExecutionPlan, PlanProperties,
4242
};
4343
use datafusion_physical_plan::joins::CrossJoinExec;
44-
use datafusion_physical_plan::statistics_context::{
45-
StatisticsContext, compute_statistics,
46-
};
44+
use datafusion_physical_plan::statistics_context::{StatisticsArgs, compute_statistics};
4745
use datafusion_physical_plan::{
4846
DisplayAs, DisplayFormatType, Partitioning, SendableRecordBatchStream,
4947
};
@@ -119,11 +117,7 @@ impl ExecutionPlan for BenchLeaf {
119117
unimplemented!()
120118
}
121119

122-
fn partition_statistics_with_context(
123-
&self,
124-
_partition: Option<usize>,
125-
_ctx: &StatisticsContext,
126-
) -> Result<Arc<Statistics>> {
120+
fn statistics_with_args(&self, _args: &StatisticsArgs) -> Result<Arc<Statistics>> {
127121
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
128122
}
129123
}
@@ -166,8 +160,8 @@ fn compute_statistics_without_shared_cache(
166160
.iter()
167161
.map(|child| compute_statistics_without_shared_cache(child.as_ref(), partition))
168162
.collect::<Result<Vec<_>>>()?;
169-
let ctx = StatisticsContext::new(child_stats);
170-
plan.partition_statistics_with_context(partition, &ctx)
163+
let args = StatisticsArgs::new(partition, child_stats);
164+
plan.statistics_with_args(&args)
171165
}
172166

173167
fn bench_compute_statistics(c: &mut Criterion) {

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::filter_pushdown::{
3131
FilterPushdownPropagation, PushedDownPredicate,
3232
};
3333
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
34-
use crate::statistics_context::StatisticsContext;
34+
use crate::statistics_context::StatisticsArgs;
3535
use crate::{
3636
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
3737
SendableRecordBatchStream, Statistics, check_if_same_properties,
@@ -1644,15 +1644,8 @@ impl ExecutionPlan for AggregateExec {
16441644
Some(self.metrics.clone_inner())
16451645
}
16461646

1647-
fn partition_statistics_with_context(
1648-
&self,
1649-
partition: Option<usize>,
1650-
ctx: &StatisticsContext,
1651-
) -> Result<Arc<Statistics>> {
1652-
let child_statistics = match partition {
1653-
Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition)?,
1654-
None => Arc::clone(&ctx.child_stats()[0]),
1655-
};
1647+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
1648+
let child_statistics = args.child_stats_for(0, self.input.as_ref())?;
16561649
Ok(Arc::new(self.statistics_inner(&child_statistics)?))
16571650
}
16581651

@@ -2801,12 +2794,8 @@ mod tests {
28012794
Ok(Box::pin(stream))
28022795
}
28032796

2804-
fn partition_statistics_with_context(
2805-
&self,
2806-
partition: Option<usize>,
2807-
_ctx: &StatisticsContext,
2808-
) -> Result<Arc<Statistics>> {
2809-
if partition.is_some() {
2797+
fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
2798+
if args.partition().is_some() {
28102799
return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref())));
28112800
}
28122801
let (_, batches) = some_data();

0 commit comments

Comments
 (0)