Skip to content

Commit 28c5c5a

Browse files
committed
feat: disable join dynamic filter pushdown by default
When a hash join's build-side dynamic filter contains a `hash_lookup` term, evaluating it on every probe-side row inside the scan duplicates the work the join's own probe is about to do. On TPC-H Q17 this doubles end-to-end query time (the equivalent of running the hash probe twice), and similar ~20-100% regressions show up across TPC-H Q3/Q5/Q8/Q9/Q14/Q18 and many TPC-DS queries that join a small dim table to a large fact table. Flip the default so users don't pay this cost on the common shape; the config is still available per-query when the build-side filter is selective enough to make scan-level pruning worthwhile (e.g. small dimension table that prunes most of a fact table's row groups / pages). The reset block at the end of dynamic_filter_pushdown_config.slt is switched from `SET ... = true` to `RESET ...` so the test ends in the configured default regardless of what that default is.
1 parent 50d74a7 commit 28c5c5a

11 files changed

Lines changed: 82 additions & 85 deletions

datafusion/common/src/config.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,15 @@ config_namespace! {
11261126

11271127
/// When set to true, the optimizer will attempt to push down Join dynamic filters
11281128
/// into the file scan phase.
1129-
pub enable_join_dynamic_filter_pushdown: bool, default = true
1129+
///
1130+
/// Disabled by default: when a join's build-side dynamic filter contains a
1131+
/// `hash_lookup` term, evaluating it on every probe-side row inside the scan
1132+
/// duplicates the work the join's own probe is about to do, which on benchmarks
1133+
/// like TPC-H Q17 doubles the query time (the equivalent of running the probe
1134+
/// twice). Re-enable per-query when the build-side filter is selective enough
1135+
/// to make scan-level pruning worthwhile (e.g. a small dimension table that
1136+
/// prunes most of a large fact table's row groups / pages).
1137+
pub enable_join_dynamic_filter_pushdown: bool, default = false
11301138

11311139
/// When set to true, the optimizer will attempt to push down Aggregate dynamic filters
11321140
/// into the file scan phase.

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ logical_plan
157157
physical_plan
158158
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
159159
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
160-
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
160+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
161161

162162
# Disable Join dynamic filter pushdown
163163
statement ok
@@ -857,16 +857,16 @@ DROP TABLE agg_parquet;
857857

858858
# Config reset
859859
statement ok
860-
SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true;
860+
RESET datafusion.optimizer.enable_topk_dynamic_filter_pushdown;
861861

862862
statement ok
863-
SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true;
863+
RESET datafusion.optimizer.enable_join_dynamic_filter_pushdown;
864864

865865
statement ok
866-
SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
866+
RESET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown;
867867

868868
statement ok
869-
SET datafusion.optimizer.enable_dynamic_filter_pushdown = true;
869+
RESET datafusion.optimizer.enable_dynamic_filter_pushdown;
870870

871871
statement ok
872872
RESET datafusion.execution.parquet.max_row_group_size;

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ physical_plan
339339
17)-----------------------------│ files: 1 ││ partition_count(in->out): │
340340
18)-----------------------------│ format: parquet ││ 1 -> 4 │
341341
19)-----------------------------│ ││ │
342-
20)-----------------------------│ predicate: ││ partitioning_scheme: │
343-
21)-----------------------------│ DynamicFilter [ empty ] ││ RoundRobinBatch(4) │
342+
20)-----------------------------│ ││ partitioning_scheme: │
343+
21)-----------------------------│ ││ RoundRobinBatch(4) │
344344
22)-----------------------------└───────────────────────────┘└─────────────┬─────────────┘
345345
23)----------------------------------------------------------┌─────────────┴─────────────┐
346346
24)----------------------------------------------------------│ DataSourceExec │

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ datafusion.optimizer.default_filter_selectivity 20
301301
datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true
302302
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
303303
datafusion.optimizer.enable_dynamic_filter_pushdown true
304-
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
304+
datafusion.optimizer.enable_join_dynamic_filter_pushdown false
305305
datafusion.optimizer.enable_leaf_expression_pushdown true
306306
datafusion.optimizer.enable_piecewise_merge_join false
307307
datafusion.optimizer.enable_round_robin_repartition true
@@ -451,7 +451,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
451451
datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase.
452452
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
453453
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
454-
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
454+
datafusion.optimizer.enable_join_dynamic_filter_pushdown false When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. Disabled by default: when a join's build-side dynamic filter contains a `hash_lookup` term, evaluating it on every probe-side row inside the scan duplicates the work the join's own probe is about to do, which on benchmarks like TPC-H Q17 doubles the query time (the equivalent of running the probe twice). Re-enable per-query when the build-side filter is selective enough to make scan-level pruning worthwhile (e.g. a small dimension table that prunes most of a large fact table's row groups / pages).
455455
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
456456
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
457457
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
@@ -895,7 +895,7 @@ show functions
895895
statement ok
896896
reset datafusion.catalog.information_schema;
897897

898-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
898+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
899899
# reset it explicitly.
900900
statement ok
901901
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2865,9 +2865,8 @@ physical_plan
28652865
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
28662866
03)----DataSourceExec: partitions=1, partition_sizes=[1]
28672867
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2868-
05)------FilterExec: DynamicFilter [ empty ]
2869-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2870-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
2868+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2869+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
28712870

28722871
query IT rowsort
28732872
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2901,9 +2900,8 @@ physical_plan
29012900
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29022901
03)----DataSourceExec: partitions=1, partition_sizes=[1]
29032902
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2904-
05)------FilterExec: DynamicFilter [ empty ]
2905-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2906-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
2903+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2904+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
29072905

29082906
query IT
29092907
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
@@ -2958,9 +2956,8 @@ physical_plan
29582956
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29592957
03)----DataSourceExec: partitions=1, partition_sizes=[1]
29602958
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2961-
05)------FilterExec: DynamicFilter [ empty ]
2962-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2963-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
2959+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2960+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
29642961

29652962
query IT rowsort
29662963
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2994,9 +2991,8 @@ physical_plan
29942991
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29952992
03)----DataSourceExec: partitions=1, partition_sizes=[1]
29962993
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2997-
05)------FilterExec: DynamicFilter [ empty ]
2998-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2999-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
2994+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2995+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
30002996

30012997
query IT
30022998
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
@@ -3052,9 +3048,8 @@ physical_plan
30523048
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
30533049
03)----DataSourceExec: partitions=1, partition_sizes=[1]
30543050
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3055-
05)------FilterExec: DynamicFilter [ empty ]
3056-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3057-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
3051+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3052+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
30583053

30593054
query ITI rowsort
30603055
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3069,9 +3064,8 @@ physical_plan
30693064
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
30703065
03)----DataSourceExec: partitions=1, partition_sizes=[1]
30713066
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3072-
05)------FilterExec: DynamicFilter [ empty ]
3073-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3074-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
3067+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3068+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
30753069

30763070
query ITI rowsort
30773071
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3124,9 +3118,8 @@ physical_plan
31243118
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
31253119
03)----DataSourceExec: partitions=1, partition_sizes=[1]
31263120
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3127-
05)------FilterExec: DynamicFilter [ empty ]
3128-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3129-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
3121+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3122+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
31303123

31313124
query ITI rowsort
31323125
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3141,9 +3134,8 @@ physical_plan
31413134
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
31423135
03)----DataSourceExec: partitions=1, partition_sizes=[1]
31433136
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3144-
05)------FilterExec: DynamicFilter [ empty ]
3145-
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3146-
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
3137+
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3138+
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
31473139

31483140
query ITI rowsort
31493141
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -4149,8 +4141,7 @@ physical_plan
41494141
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, y@1)], filter=a@0 < x@1
41504142
02)--DataSourceExec: partitions=1, partition_sizes=[0]
41514143
03)--SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false]
4152-
04)----FilterExec: DynamicFilter [ empty ]
4153-
05)------DataSourceExec: partitions=1, partition_sizes=[0]
4144+
04)----DataSourceExec: partitions=1, partition_sizes=[0]
41544145

41554146
# Test full join with limit
41564147
statement ok
@@ -4452,8 +4443,7 @@ physical_plan
44524443
04)------FilterExec: b@1 > 3, projection=[a@0]
44534444
05)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44544445
06)----SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
4455-
07)------FilterExec: DynamicFilter [ empty ]
4456-
08)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
4446+
07)------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44574447

44584448
query TT
44594449
explain select * from test where a in (select a from test where b > 3) order by c desc nulls last;
@@ -4473,8 +4463,7 @@ physical_plan
44734463
04)------FilterExec: b@1 > 3, projection=[a@0]
44744464
05)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44754465
06)----SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
4476-
07)------FilterExec: DynamicFilter [ empty ]
4477-
08)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
4466+
07)------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44784467

44794468
query III
44804469
select * from test where a in (select a from test where b > 3) order by c desc nulls first;

0 commit comments

Comments
 (0)