1818use arrow:: array:: { ArrayRef , RecordBatch } ;
1919use arrow_schema:: DataType ;
2020use arrow_schema:: TimeUnit :: Nanosecond ;
21- use criterion:: {
22- BenchmarkGroup , BenchmarkId , Criterion , criterion_group, criterion_main,
23- measurement:: WallTime ,
24- } ;
21+ use criterion:: { BenchmarkId , Criterion , criterion_group, criterion_main} ;
2522use datafusion:: prelude:: { DataFrame , SessionContext } ;
2623use datafusion_catalog:: MemTable ;
27- use datafusion_common:: { Column , ScalarValue } ;
24+ use datafusion_common:: ScalarValue ;
2825use datafusion_expr:: Expr :: Literal ;
29- use datafusion_expr:: logical_plan:: LogicalPlan ;
30- use datafusion_expr:: utils:: split_conjunction_owned;
3126use datafusion_expr:: { cast, col, lit, not, try_cast, when} ;
3227use datafusion_functions:: expr_fn:: {
3328 btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
3429} ;
35- use std:: env;
3630use std:: fmt:: Write ;
3731use std:: hint:: black_box;
3832use std:: ops:: Rem ;
3933use std:: sync:: Arc ;
4034use tokio:: runtime:: Runtime ;
4135
42- const FULL_PREDICATE_SWEEP : [ usize ; 5 ] = [ 10 , 20 , 30 , 40 , 60 ] ;
43- const FULL_DEPTH_SWEEP : [ usize ; 3 ] = [ 1 , 2 , 3 ] ;
44- const DEFAULT_SWEEP_POINTS : [ ( usize , usize ) ; 3 ] = [ ( 10 , 1 ) , ( 30 , 2 ) , ( 60 , 3 ) ] ;
45-
4636// This benchmark suite is designed to test the performance of
4737// logical planning with a large plan containing unions, many columns
4838// with a variety of operations in it.
@@ -228,9 +218,7 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
228218fn build_case_heavy_left_join_df ( ctx : & SessionContext , rt : & Runtime ) -> DataFrame {
229219 register_string_table ( ctx, 100 , 1000 ) ;
230220 let query = build_case_heavy_left_join_query ( 30 , 1 ) ;
231- let df = rt. block_on ( async { ctx. sql ( & query) . await . unwrap ( ) } ) ;
232- assert_case_heavy_left_join_inference_candidates ( & df, 30 ) ;
233- df
221+ rt. block_on ( async { ctx. sql ( & query) . await . unwrap ( ) } )
234222}
235223
236224fn build_case_heavy_left_join_query ( predicate_count : usize , case_depth : usize ) -> String {
@@ -249,17 +237,12 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
249237 query. push_str ( " AND " ) ;
250238 }
251239
252- let left_payload_col = ( i % 19 ) + 1 ;
253- let right_payload_col = ( ( i + 7 ) % 19 ) + 1 ;
254- let mut expr = format ! (
255- "CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END"
256- ) ;
240+ let mut expr = format ! ( "length(l.c{})" , i % 20 ) ;
257241 for depth in 0 ..case_depth {
258- let left_col = ( ( i + depth + 3 ) % 19 ) + 1 ;
259- let right_col = ( ( i + depth + 11 ) % 19 ) + 1 ;
260- let join_key_ref = if ( i + depth) % 2 == 0 { "l.c0" } else { "r.c0" } ;
242+ let left_col = ( i + depth + 1 ) % 20 ;
243+ let right_col = ( i + depth + 2 ) % 20 ;
261244 expr = format ! (
262- "CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END"
245+ "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END"
263246 ) ;
264247 }
265248
@@ -269,6 +252,26 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
269252 query
270253}
271254
255+ fn build_case_heavy_left_join_df_with_push_down_filter (
256+ rt : & Runtime ,
257+ predicate_count : usize ,
258+ case_depth : usize ,
259+ push_down_filter_enabled : bool ,
260+ ) -> DataFrame {
261+ let ctx = SessionContext :: new ( ) ;
262+ register_string_table ( & ctx, 100 , 1000 ) ;
263+ if !push_down_filter_enabled {
264+ let removed = ctx. remove_optimizer_rule ( "push_down_filter" ) ;
265+ assert ! (
266+ removed,
267+ "push_down_filter rule should be present in the default optimizer"
268+ ) ;
269+ }
270+
271+ let query = build_case_heavy_left_join_query ( predicate_count, case_depth) ;
272+ rt. block_on ( async { ctx. sql ( & query) . await . unwrap ( ) } )
273+ }
274+
272275fn build_non_case_left_join_query (
273276 predicate_count : usize ,
274277 nesting_depth : usize ,
@@ -301,11 +304,10 @@ fn build_non_case_left_join_query(
301304 query
302305}
303306
304- fn build_left_join_df_with_push_down_filter (
307+ fn build_non_case_left_join_df_with_push_down_filter (
305308 rt : & Runtime ,
306- query_builder : impl Fn ( usize , usize ) -> String ,
307309 predicate_count : usize ,
308- depth : usize ,
310+ nesting_depth : usize ,
309311 push_down_filter_enabled : bool ,
310312) -> DataFrame {
311313 let ctx = SessionContext :: new ( ) ;
@@ -318,143 +320,10 @@ fn build_left_join_df_with_push_down_filter(
318320 ) ;
319321 }
320322
321- let query = query_builder ( predicate_count, depth ) ;
323+ let query = build_non_case_left_join_query ( predicate_count, nesting_depth ) ;
322324 rt. block_on ( async { ctx. sql ( & query) . await . unwrap ( ) } )
323325}
324326
325- fn build_case_heavy_left_join_df_with_push_down_filter (
326- rt : & Runtime ,
327- predicate_count : usize ,
328- case_depth : usize ,
329- push_down_filter_enabled : bool ,
330- ) -> DataFrame {
331- let df = build_left_join_df_with_push_down_filter (
332- rt,
333- build_case_heavy_left_join_query,
334- predicate_count,
335- case_depth,
336- push_down_filter_enabled,
337- ) ;
338- assert_case_heavy_left_join_inference_candidates ( & df, predicate_count) ;
339- df
340- }
341-
342- fn build_non_case_left_join_df_with_push_down_filter (
343- rt : & Runtime ,
344- predicate_count : usize ,
345- nesting_depth : usize ,
346- push_down_filter_enabled : bool ,
347- ) -> DataFrame {
348- build_left_join_df_with_push_down_filter (
349- rt,
350- build_non_case_left_join_query,
351- predicate_count,
352- nesting_depth,
353- push_down_filter_enabled,
354- )
355- }
356-
357- fn find_filter_predicates ( plan : & LogicalPlan ) -> Vec < datafusion_expr:: Expr > {
358- match plan {
359- LogicalPlan :: Filter ( filter) => split_conjunction_owned ( filter. predicate . clone ( ) ) ,
360- LogicalPlan :: Projection ( projection) => {
361- find_filter_predicates ( projection. input . as_ref ( ) )
362- }
363- other => {
364- panic ! ( "expected benchmark query plan to contain a Filter, found {other:?}" )
365- }
366- }
367- }
368-
369- fn assert_case_heavy_left_join_inference_candidates (
370- df : & DataFrame ,
371- expected_predicate_count : usize ,
372- ) {
373- let predicates = find_filter_predicates ( df. logical_plan ( ) ) ;
374- assert_eq ! ( predicates. len( ) , expected_predicate_count) ;
375-
376- let left_join_key = Column :: from_qualified_name ( "l.c0" ) ;
377- let right_join_key = Column :: from_qualified_name ( "r.c0" ) ;
378-
379- for predicate in predicates {
380- let column_refs = predicate. column_refs ( ) ;
381- assert ! (
382- column_refs. contains( &&left_join_key)
383- || column_refs. contains( &&right_join_key) ,
384- "benchmark predicate should reference a join key: {predicate}"
385- ) ;
386- assert ! (
387- column_refs
388- . iter( )
389- . any( |col| * * col != left_join_key && * * col != right_join_key) ,
390- "benchmark predicate should reference a non-join column: {predicate}"
391- ) ;
392- }
393- }
394-
395- fn include_full_push_down_filter_sweep ( ) -> bool {
396- env:: var ( "DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP" )
397- . map ( |value| value == "1" || value. eq_ignore_ascii_case ( "true" ) )
398- . unwrap_or ( false )
399- }
400-
401- fn push_down_filter_sweep_points ( ) -> Vec < ( usize , usize ) > {
402- if include_full_push_down_filter_sweep ( ) {
403- FULL_DEPTH_SWEEP
404- . into_iter ( )
405- . flat_map ( |depth| {
406- FULL_PREDICATE_SWEEP
407- . into_iter ( )
408- . map ( move |predicate_count| ( predicate_count, depth) )
409- } )
410- . collect ( )
411- } else {
412- DEFAULT_SWEEP_POINTS . to_vec ( )
413- }
414- }
415-
416- fn bench_push_down_filter_ab < BuildFn > (
417- group : & mut BenchmarkGroup < ' _ , WallTime > ,
418- rt : & Runtime ,
419- sweep_points : & [ ( usize , usize ) ] ,
420- build_df : BuildFn ,
421- ) where
422- BuildFn : Fn ( & Runtime , usize , usize , bool ) -> DataFrame ,
423- {
424- for & ( predicate_count, depth) in sweep_points {
425- let with_push_down_filter = build_df ( rt, predicate_count, depth, true ) ;
426- let without_push_down_filter = build_df ( rt, predicate_count, depth, false ) ;
427-
428- let input_label = format ! ( "predicates={predicate_count},nesting_depth={depth}" ) ;
429-
430- group. bench_with_input (
431- BenchmarkId :: new ( "with_push_down_filter" , & input_label) ,
432- & with_push_down_filter,
433- |b, df| {
434- b. iter ( || {
435- let df_clone = df. clone ( ) ;
436- black_box (
437- rt. block_on ( async { df_clone. into_optimized_plan ( ) . unwrap ( ) } ) ,
438- ) ;
439- } )
440- } ,
441- ) ;
442-
443- group. bench_with_input (
444- BenchmarkId :: new ( "without_push_down_filter" , & input_label) ,
445- & without_push_down_filter,
446- |b, df| {
447- b. iter ( || {
448- let df_clone = df. clone ( ) ;
449- black_box (
450- rt. block_on ( async { df_clone. into_optimized_plan ( ) . unwrap ( ) } ) ,
451- ) ;
452- } )
453- } ,
454- ) ;
455- }
456- }
457-
458327fn criterion_benchmark ( c : & mut Criterion ) {
459328 let baseline_ctx = SessionContext :: new ( ) ;
460329 let case_heavy_ctx = SessionContext :: new ( ) ;
@@ -480,40 +349,116 @@ fn criterion_benchmark(c: &mut Criterion) {
480349 } )
481350 } ) ;
482351
483- let sweep_points = push_down_filter_sweep_points ( ) ;
352+ let predicate_sweep = [ 10 , 20 , 30 , 40 , 60 ] ;
353+ let case_depth_sweep = [ 1 , 2 , 3 ] ;
484354
485355 let mut hotspot_group =
486356 c. benchmark_group ( "push_down_filter_hotspot_case_heavy_left_join_ab" ) ;
487- bench_push_down_filter_ab (
488- & mut hotspot_group,
489- & rt,
490- & sweep_points,
491- |rt, predicate_count, depth, enable| {
492- build_case_heavy_left_join_df_with_push_down_filter (
493- rt,
494- predicate_count,
495- depth,
496- enable,
497- )
498- } ,
499- ) ;
357+ for case_depth in case_depth_sweep {
358+ for predicate_count in predicate_sweep {
359+ let with_push_down_filter =
360+ build_case_heavy_left_join_df_with_push_down_filter (
361+ & rt,
362+ predicate_count,
363+ case_depth,
364+ true ,
365+ ) ;
366+ let without_push_down_filter =
367+ build_case_heavy_left_join_df_with_push_down_filter (
368+ & rt,
369+ predicate_count,
370+ case_depth,
371+ false ,
372+ ) ;
373+
374+ let input_label =
375+ format ! ( "predicates={predicate_count},case_depth={case_depth}" ) ;
376+ // A/B interpretation:
377+ // - with_push_down_filter: default optimizer path (rule enabled)
378+ // - without_push_down_filter: control path with the rule removed
379+ // Compare both IDs at the same sweep point to isolate rule impact.
380+ hotspot_group. bench_with_input (
381+ BenchmarkId :: new ( "with_push_down_filter" , & input_label) ,
382+ & with_push_down_filter,
383+ |b, df| {
384+ b. iter ( || {
385+ let df_clone = df. clone ( ) ;
386+ black_box (
387+ rt. block_on ( async {
388+ df_clone. into_optimized_plan ( ) . unwrap ( )
389+ } ) ,
390+ ) ;
391+ } )
392+ } ,
393+ ) ;
394+ hotspot_group. bench_with_input (
395+ BenchmarkId :: new ( "without_push_down_filter" , & input_label) ,
396+ & without_push_down_filter,
397+ |b, df| {
398+ b. iter ( || {
399+ let df_clone = df. clone ( ) ;
400+ black_box (
401+ rt. block_on ( async {
402+ df_clone. into_optimized_plan ( ) . unwrap ( )
403+ } ) ,
404+ ) ;
405+ } )
406+ } ,
407+ ) ;
408+ }
409+ }
500410 hotspot_group. finish ( ) ;
501411
502412 let mut control_group =
503413 c. benchmark_group ( "push_down_filter_control_non_case_left_join_ab" ) ;
504- bench_push_down_filter_ab (
505- & mut control_group,
506- & rt,
507- & sweep_points,
508- |rt, predicate_count, depth, enable| {
509- build_non_case_left_join_df_with_push_down_filter (
510- rt,
414+ for nesting_depth in case_depth_sweep {
415+ for predicate_count in predicate_sweep {
416+ let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter (
417+ & rt,
511418 predicate_count,
512- depth,
513- enable,
514- )
515- } ,
516- ) ;
419+ nesting_depth,
420+ true ,
421+ ) ;
422+ let without_push_down_filter =
423+ build_non_case_left_join_df_with_push_down_filter (
424+ & rt,
425+ predicate_count,
426+ nesting_depth,
427+ false ,
428+ ) ;
429+
430+ let input_label =
431+ format ! ( "predicates={predicate_count},nesting_depth={nesting_depth}" ) ;
432+ control_group. bench_with_input (
433+ BenchmarkId :: new ( "with_push_down_filter" , & input_label) ,
434+ & with_push_down_filter,
435+ |b, df| {
436+ b. iter ( || {
437+ let df_clone = df. clone ( ) ;
438+ black_box (
439+ rt. block_on ( async {
440+ df_clone. into_optimized_plan ( ) . unwrap ( )
441+ } ) ,
442+ ) ;
443+ } )
444+ } ,
445+ ) ;
446+ control_group. bench_with_input (
447+ BenchmarkId :: new ( "without_push_down_filter" , & input_label) ,
448+ & without_push_down_filter,
449+ |b, df| {
450+ b. iter ( || {
451+ let df_clone = df. clone ( ) ;
452+ black_box (
453+ rt. block_on ( async {
454+ df_clone. into_optimized_plan ( ) . unwrap ( )
455+ } ) ,
456+ ) ;
457+ } )
458+ } ,
459+ ) ;
460+ }
461+ }
517462 control_group. finish ( ) ;
518463}
519464
0 commit comments