@@ -32,14 +32,18 @@ use std::sync::Arc;
3232
3333use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
3434use criterion:: { BenchmarkId , Criterion , criterion_group, criterion_main} ;
35+ use datafusion_common:: ScalarValue ;
3536use datafusion_common:: tree_node:: TreeNodeRecursion ;
3637use datafusion_common:: { Result , Statistics } ;
3738use datafusion_execution:: TaskContext ;
3839use datafusion_physical_expr:: EquivalenceProperties ;
40+ use datafusion_physical_expr:: PhysicalExpr ;
41+ use datafusion_physical_expr:: expressions:: Literal ;
3942use datafusion_physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
4043use datafusion_physical_plan:: execution_plan:: {
4144 Boundedness , EmissionType , ExecutionPlan , PlanProperties ,
4245} ;
46+ use datafusion_physical_plan:: filter:: FilterExec ;
4347use datafusion_physical_plan:: joins:: CrossJoinExec ;
4448use datafusion_physical_plan:: statistics_context:: { StatisticsArgs , compute_statistics} ;
4549use datafusion_physical_plan:: {
@@ -102,9 +106,7 @@ impl ExecutionPlan for BenchLeaf {
102106
103107 fn apply_expressions (
104108 & self ,
105- _f : & mut dyn FnMut (
106- & dyn datafusion_physical_expr:: PhysicalExpr ,
107- ) -> Result < TreeNodeRecursion > ,
109+ _f : & mut dyn FnMut ( & dyn PhysicalExpr ) -> Result < TreeNodeRecursion > ,
108110 ) -> Result < TreeNodeRecursion > {
109111 Ok ( TreeNodeRecursion :: Continue )
110112 }
@@ -145,6 +147,23 @@ fn build_cross_join_tree(depth: usize, next_col: &mut usize) -> Arc<dyn Executio
145147 Arc :: new ( CrossJoinExec :: new ( left, right) )
146148}
147149
150+ /// Build: Filter^depth -> BenchLeaf (always-true predicate).
151+ /// Demonstrates the cost of the double-walk for partition-preserving chains:
152+ /// the framework first walks the entire tree computing None stats, then each
153+ /// filter requests per-partition stats on demand via compute_child_statistics.
154+ fn build_filter_chain ( depth : usize ) -> Arc < dyn ExecutionPlan > {
155+ let mut plan: Arc < dyn ExecutionPlan > = Arc :: new ( BenchLeaf :: new ( "a" ) ) ;
156+ let predicate: Arc < dyn PhysicalExpr > =
157+ Arc :: new ( Literal :: new ( ScalarValue :: Boolean ( Some ( true ) ) ) ) ;
158+ for _ in 0 ..depth {
159+ plan = Arc :: new (
160+ FilterExec :: try_new ( Arc :: clone ( & predicate) , plan)
161+ . expect ( "FilterExec::try_new failed" ) ,
162+ ) ;
163+ }
164+ plan
165+ }
166+
148167/// Recursive walk without a shared cross-node cache, simulating pre-cache behavior.
149168/// Each operator's internal `compute_child_statistics` call triggers a fresh
150169/// subtree walk, resulting in O(n^2) total node visits for a chain of depth n.
@@ -211,6 +230,40 @@ fn bench_compute_statistics(c: &mut Criterion) {
211230 ) ;
212231 }
213232 group. finish ( ) ;
233+
234+ // --- Filter chain (partition-preserving linear plan) ---
235+ // When called with Some(0), the framework first walks the entire tree
236+ // computing None stats, then each filter requests Some(0) on demand.
237+ // Both walks are cached, so the total cost is ~2n vs n node visits for None.
238+ let mut group = c. benchmark_group ( "compute_statistics_filter_chain" ) ;
239+ for depth in [ 10 , 20 , 50 ] {
240+ let plan = build_filter_chain ( depth) ;
241+ group. bench_with_input (
242+ BenchmarkId :: new ( "cached_partition" , depth) ,
243+ & plan,
244+ |b, plan| {
245+ b. iter ( || compute_statistics ( plan. as_ref ( ) , Some ( 0 ) ) . unwrap ( ) ) ;
246+ } ,
247+ ) ;
248+ group. bench_with_input (
249+ BenchmarkId :: new ( "cached_overall" , depth) ,
250+ & plan,
251+ |b, plan| {
252+ b. iter ( || compute_statistics ( plan. as_ref ( ) , None ) . unwrap ( ) ) ;
253+ } ,
254+ ) ;
255+ group. bench_with_input (
256+ BenchmarkId :: new ( "no_shared_cache" , depth) ,
257+ & plan,
258+ |b, plan| {
259+ b. iter ( || {
260+ compute_statistics_without_shared_cache ( plan. as_ref ( ) , Some ( 0 ) )
261+ . unwrap ( )
262+ } ) ;
263+ } ,
264+ ) ;
265+ }
266+ group. finish ( ) ;
214267}
215268
216269criterion_group ! ( benches, bench_compute_statistics) ;
0 commit comments