Skip to content

Commit e4dcf39

Browse files
committed
refactor(physical-plan): default child_stats_requests to Skip
Make the default `child_stats_requests` skip every child, so a node that does not derive its statistics from children (for example one that only overrides the deprecated `partition_statistics`) triggers no child traversal. This avoids the eager child walk, and the child-stat errors it could surface, that `StatisticsContext::compute` previously performed even when `statistics_from_inputs` ignored the results. Nodes that read `input_stats` now declare the children they use via `child_stats_requests`.
1 parent af9e4b5 commit e4dcf39

17 files changed

Lines changed: 116 additions & 44 deletions

File tree

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ use datafusion::{
108108
},
109109
physical_expr::EquivalenceProperties,
110110
physical_plan::{
111-
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsArgs,
111+
ChildStats, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
112+
StatisticsArgs,
112113
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
113114
},
114115
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
@@ -722,6 +723,10 @@ impl ExecutionPlan for SampleExec {
722723
Some(self.metrics.clone_inner())
723724
}
724725

726+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
727+
vec![ChildStats::At(partition)]
728+
}
729+
725730
fn statistics_from_inputs(
726731
&self,
727732
input_stats: &[Arc<Statistics>],

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use datafusion_physical_plan::projection::{
3939
use datafusion_physical_plan::sorts::sort::SortExec;
4040
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
4141
use datafusion_physical_plan::{
42-
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
43-
SendableRecordBatchStream, StatisticsArgs,
42+
ChildStats, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
43+
PlanProperties, SendableRecordBatchStream, StatisticsArgs,
4444
};
4545

4646
/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
@@ -242,6 +242,10 @@ impl ExecutionPlan for OutputRequirementExec {
242242
unreachable!();
243243
}
244244

245+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
246+
vec![ChildStats::At(partition)]
247+
}
248+
245249
fn statistics_from_inputs(
246250
&self,
247251
input_stats: &[Arc<Statistics>],

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::filter_pushdown::{
3333
FilterPushdownPropagation, PushedDownPredicate,
3434
};
3535
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
36-
use crate::statistics::StatisticsArgs;
36+
use crate::statistics::{ChildStats, StatisticsArgs};
3737
use crate::{
3838
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
3939
SendableRecordBatchStream, Statistics, check_if_same_properties,
@@ -1765,6 +1765,10 @@ impl ExecutionPlan for AggregateExec {
17651765
Some(self.metrics.clone_inner())
17661766
}
17671767

1768+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
1769+
vec![ChildStats::At(partition)]
1770+
}
1771+
17681772
fn statistics_from_inputs(
17691773
&self,
17701774
input_stats: &[Arc<Statistics>],

datafusion/physical-plan/src/buffer.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::filter_pushdown::{
2424
FilterPushdownPropagation,
2525
};
2626
use crate::projection::ProjectionExec;
27-
use crate::statistics::StatisticsArgs;
27+
use crate::statistics::{ChildStats, StatisticsArgs};
2828
use crate::stream::RecordBatchStreamAdapter;
2929
use crate::{
3030
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SortOrderPushdownResult,
@@ -238,6 +238,10 @@ impl ExecutionPlan for BufferExec {
238238
Some(self.metrics.clone_inner())
239239
}
240240

241+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
242+
vec![ChildStats::At(partition)]
243+
}
244+
241245
fn statistics_from_inputs(
242246
&self,
243247
input_stats: &[Arc<Statistics>],

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
2424
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2525
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
2626
use crate::projection::ProjectionExec;
27-
use crate::statistics::StatisticsArgs;
27+
use crate::statistics::{ChildStats, StatisticsArgs};
2828
use crate::stream::EmptyRecordBatchStream;
2929
use crate::{
3030
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
@@ -216,6 +216,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
216216
Some(self.metrics.clone_inner())
217217
}
218218

219+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
220+
vec![ChildStats::At(partition)]
221+
}
222+
219223
fn statistics_from_inputs(
220224
&self,
221225
input_stats: &[Arc<Statistics>],

datafusion/physical-plan/src/coop.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ use crate::filter_pushdown::{
8484
FilterPushdownPropagation,
8585
};
8686
use crate::projection::ProjectionExec;
87-
use crate::statistics::StatisticsArgs;
87+
use crate::statistics::{ChildStats, StatisticsArgs};
8888
use crate::{
8989
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
9090
SendableRecordBatchStream, SortOrderPushdownResult, check_if_same_properties,
@@ -299,6 +299,10 @@ impl ExecutionPlan for CooperativeExec {
299299
Ok(make_cooperative(child_stream))
300300
}
301301

302+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
303+
vec![ChildStats::At(partition)]
304+
}
305+
302306
fn statistics_from_inputs(
303307
&self,
304308
input_stats: &[Arc<Statistics>],

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -538,20 +538,23 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
538538
self.partition_statistics(args.partition())
539539
}
540540

541-
/// Returns the partition index to request from each child when computing
542-
/// statistics for this node at `partition`.
543-
///
544-
/// The returned `Vec` has one entry per child (in the same order as
545-
/// [`Self::children`]). [`ChildStats::At`] with `None` requests the child's
546-
/// overall (all-partitions) statistics, and [`ChildStats::Skip`] omits a child
547-
/// whose statistics this node does not need.
548-
///
549-
/// The default requests the same `partition` from every child.
550-
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
551-
self.children()
552-
.iter()
553-
.map(|_| ChildStats::At(partition))
554-
.collect()
541+
/// Returns, per child, which statistics the [`StatisticsContext`] should resolve
542+
/// before calling [`Self::statistics_from_inputs`].
543+
///
544+
/// One entry per child (same order as [`Self::children`]): [`ChildStats::At`]
545+
/// requests the child's statistics at a partition (`None` = overall);
546+
/// [`ChildStats::Skip`] omits a child whose statistics this node does not need
547+
/// (a `Statistics::new_unknown` placeholder fills its `input_stats` slot).
548+
///
549+
/// The default skips every child, so a node that derives nothing from its
550+
/// children (for example one that only overrides the deprecated
551+
/// [`Self::partition_statistics`]) triggers no child traversal. A node that reads
552+
/// `input_stats` in [`Self::statistics_from_inputs`] must override this to declare
553+
/// the children it uses.
554+
///
555+
/// [`StatisticsContext`]: crate::statistics::StatisticsContext
556+
fn child_stats_requests(&self, _partition: Option<usize>) -> Vec<ChildStats> {
557+
self.children().iter().map(|_| ChildStats::Skip).collect()
555558
}
556559

557560
/// Returns `true` if a limit can be safely pushed down through this

datafusion/physical-plan/src/filter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::projection::{
4242
EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
4343
try_embed_projection, update_expr,
4444
};
45-
use crate::statistics::{StatisticsArgs, StatisticsContext};
45+
use crate::statistics::{ChildStats, StatisticsArgs, StatisticsContext};
4646
use crate::stream::EmptyRecordBatchStream;
4747
use crate::{
4848
DisplayFormatType, ExecutionPlan,
@@ -592,6 +592,10 @@ impl ExecutionPlan for FilterExec {
592592
Some(self.metrics.clone_inner())
593593
}
594594

595+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
596+
vec![ChildStats::At(partition)]
597+
}
598+
595599
/// The output statistics of a filtering operation can be estimated if the
596600
/// predicate's selectivity value can be determined for the incoming data.
597601
fn statistics_from_inputs(

datafusion/physical-plan/src/joins/sort_merge_join/exec.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::projection::{
3838
physical_to_column_exprs, update_join_on,
3939
};
4040
use crate::spill::spill_manager::SpillManager;
41-
use crate::statistics::StatisticsArgs;
41+
use crate::statistics::{ChildStats, StatisticsArgs};
4242
use crate::{
4343
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
4444
PlanProperties, SendableRecordBatchStream, Statistics, check_if_same_properties,
@@ -564,6 +564,10 @@ impl ExecutionPlan for SortMergeJoinExec {
564564
Some(self.metrics.clone_inner())
565565
}
566566

567+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
568+
vec![ChildStats::At(partition), ChildStats::At(partition)]
569+
}
570+
567571
fn statistics_from_inputs(
568572
&self,
569573
input_stats: &[Arc<Statistics>],

datafusion/physical-plan/src/limit.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use super::{
2727
SendableRecordBatchStream, Statistics,
2828
};
2929
use crate::execution_plan::{Boundedness, CardinalityEffect};
30-
use crate::statistics::StatisticsArgs;
30+
use crate::statistics::{ChildStats, StatisticsArgs};
3131
use crate::{
3232
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
3333
check_if_same_properties,
@@ -220,6 +220,10 @@ impl ExecutionPlan for GlobalLimitExec {
220220
Some(self.metrics.clone_inner())
221221
}
222222

223+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
224+
vec![ChildStats::At(partition)]
225+
}
226+
223227
fn statistics_from_inputs(
224228
&self,
225229
input_stats: &[Arc<Statistics>],
@@ -387,6 +391,10 @@ impl ExecutionPlan for LocalLimitExec {
387391
Some(self.metrics.clone_inner())
388392
}
389393

394+
fn child_stats_requests(&self, partition: Option<usize>) -> Vec<ChildStats> {
395+
vec![ChildStats::At(partition)]
396+
}
397+
390398
fn statistics_from_inputs(
391399
&self,
392400
input_stats: &[Arc<Statistics>],

0 commit comments

Comments
 (0)