Skip to content

Commit 20a8f83

Browse files
physical_optimizer: preserve_file_partitions when num file groups < target_partitions
`datafusion.optimizer.preserve_file_partitions` would not actually preserve the file partitions when the number of file groups is less than the target_partitions. This is unexpected behavior. If a user wants to preserve file partitions, it is because they want to avoid repartitions. This change fixes that by updating 1 line in the `enforce_distribution` optimizer rule. It also adds a regression test. Before ``` ProjectionExec AggregateExec: mode=FinalPartitioned, gby=[f_dkey, timestamp] RepartitionExec: partitioning=Hash([f_dkey, timestamp], 4), input_partitions=3 AggregateExec: mode=Partial, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` After ``` ProjectionExec AggregateExec: mode=SinglePartitioned, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ```
1 parent e8d217a commit 20a8f83

2 files changed

Lines changed: 63 additions & 7 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1328,8 +1328,12 @@ pub fn ensure_distribution(
13281328
})
13291329
);
13301330

1331-
let allow_subset_satisfy_partitioning = current_partitions
1331+
let allow_subset_satisfy_partitioning = (current_partitions
13321332
>= subset_satisfaction_threshold
1333+
// `preserve_file_partitions` exposes existing file-group
1334+
// partitioning to the optimizer. Respect it even when the
1335+
// file-group count is below `target_partitions`.
1336+
|| config.optimizer.preserve_file_partitions > 0)
13331337
&& !is_partitioned_join
13341338
&& !requires_grouping_id;
13351339

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -681,12 +681,10 @@ logical_plan
681681
06)------SubqueryAlias: d
682682
07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey]
683683
physical_plan
684-
01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)]
685-
02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3
686-
03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)]
687-
04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0]
688-
05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
689-
06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet
684+
01)AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)]
685+
02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0]
686+
03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
687+
04)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet
690688

691689
query TTR rowsort
692690
SELECT f.f_dkey, d.env, sum(f.value)
@@ -698,6 +696,60 @@ A dev 772.4
698696
B prod 614.4
699697
C prod 2017.6
700698

699+
##########
700+
# TEST 13: Partitioned Join where Number of File Groups is less than target_partitions
701+
# With preserve_file_partitions enabled, we should still avoid repartitioning
702+
##########
703+
704+
statement ok
705+
set datafusion.execution.target_partitions = 4;
706+
707+
statement ok
708+
set datafusion.optimizer.preserve_file_partitions = 1;
709+
710+
query TT
711+
EXPLAIN SELECT f_dkey, timestamp,
712+
COUNT(*), AVG(value)
713+
FROM fact_table
714+
GROUP BY f_dkey, timestamp;
715+
----
716+
logical_plan
717+
01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS count(*), avg(fact_table.value)
718+
02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)), avg(fact_table.value)]]
719+
03)----TableScan: fact_table projection=[timestamp, value, f_dkey]
720+
physical_plan
721+
01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)]
722+
02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)]
723+
03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet
724+
725+
query TPIR rowsort
726+
SELECT f_dkey, timestamp,
727+
COUNT(*), AVG(value)
728+
FROM fact_table
729+
GROUP BY f_dkey, timestamp;
730+
----
731+
A 2023-01-01T09:00:00 1 95.5
732+
A 2023-01-01T09:00:10 1 102.3
733+
A 2023-01-01T09:00:20 1 98.7
734+
A 2023-01-01T09:12:20 1 105.1
735+
A 2023-01-01T09:12:30 1 100
736+
A 2023-01-01T09:12:40 1 150
737+
A 2023-01-01T09:12:50 1 120.8
738+
B 2023-01-01T09:00:00 1 75.2
739+
B 2023-01-01T09:00:10 1 82.4
740+
B 2023-01-01T09:00:20 1 78.9
741+
B 2023-01-01T09:00:30 1 85.6
742+
B 2023-01-01T09:12:30 1 80
743+
B 2023-01-01T09:12:40 1 120
744+
B 2023-01-01T09:12:50 1 92.3
745+
C 2023-01-01T09:00:00 1 300.5
746+
C 2023-01-01T09:00:10 1 285.7
747+
C 2023-01-01T09:00:20 1 310.2
748+
C 2023-01-01T09:00:30 1 295.8
749+
C 2023-01-01T09:00:40 1 300
750+
C 2023-01-01T09:12:40 1 250
751+
C 2023-01-01T09:12:50 1 275.4
752+
701753
##########
702754
# CLEANUP
703755
##########

0 commit comments

Comments
 (0)