Skip to content

Commit eceb7fa

Browse files
physical_optimizer: preserve_file_partitions when num file groups < target_partitions (apache#21533)
## Rationale for this change `datafusion.optimizer.preserve_file_partitions` would not actually preserve the file partitions when the number of file groups is less than the target_partitions. This is unexpected behavior. If a user wants to preserve file partitions, it is because they want to avoid repartitions. Before ``` ProjectionExec AggregateExec: mode=FinalPartitioned, gby=[f_dkey, timestamp] RepartitionExec: partitioning=Hash([f_dkey, timestamp], 4), input_partitions=3 AggregateExec: mode=Partial, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` After ``` ProjectionExec AggregateExec: mode=SinglePartitioned, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` ## What changes are included in this PR? This change fixes that by updating 1 line in the `enforce_distribution` optimizer rule. ## Are these changes tested? Yes. In the `preserve_file_partitions` SLT. ## Are there any user-facing changes? No.
1 parent eae7bf4 commit eceb7fa

2 files changed

Lines changed: 61 additions & 1 deletion

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,8 +1295,14 @@ pub fn ensure_distribution(
12951295
})
12961296
);
12971297

1298-
let allow_subset_satisfy_partitioning = current_partitions
1298+
let allow_subset_satisfy_partitioning = (current_partitions
12991299
>= subset_satisfaction_threshold
1300+
// `preserve_file_partitions` exposes existing file-group
1301+
// partitioning to the optimizer. Respect it when the only
1302+
// reason to repartition would be to increase partition count
1303+
// beyond the preserved file-group count.
1304+
|| (config.optimizer.preserve_file_partitions > 0
1305+
&& current_partitions < target_partitions))
13001306
&& !is_partitioned_join
13011307
&& !requires_grouping_id;
13021308

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,60 @@ A dev 772.4
704704
B prod 614.4
705705
C prod 2017.6
706706

707+
##########
708+
# TEST 13: Partitioned Join where Number of File Groups is less than target_partitions
709+
# With preserve_file_partitions enabled, we should still avoid repartitioning
710+
##########
711+
712+
statement ok
713+
set datafusion.execution.target_partitions = 4;
714+
715+
statement ok
716+
set datafusion.optimizer.preserve_file_partitions = 1;
717+
718+
query TT
719+
EXPLAIN SELECT f_dkey, timestamp,
720+
COUNT(*), AVG(value)
721+
FROM fact_table
722+
GROUP BY f_dkey, timestamp;
723+
----
724+
logical_plan
725+
01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS count(*), avg(fact_table.value)
726+
02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)), avg(fact_table.value)]]
727+
03)----TableScan: fact_table projection=[timestamp, value, f_dkey]
728+
physical_plan
729+
01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)]
730+
02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)]
731+
03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet
732+
733+
query TPIR rowsort
734+
SELECT f_dkey, timestamp,
735+
COUNT(*), AVG(value)
736+
FROM fact_table
737+
GROUP BY f_dkey, timestamp;
738+
----
739+
A 2023-01-01T09:00:00 1 95.5
740+
A 2023-01-01T09:00:10 1 102.3
741+
A 2023-01-01T09:00:20 1 98.7
742+
A 2023-01-01T09:12:20 1 105.1
743+
A 2023-01-01T09:12:30 1 100
744+
A 2023-01-01T09:12:40 1 150
745+
A 2023-01-01T09:12:50 1 120.8
746+
B 2023-01-01T09:00:00 1 75.2
747+
B 2023-01-01T09:00:10 1 82.4
748+
B 2023-01-01T09:00:20 1 78.9
749+
B 2023-01-01T09:00:30 1 85.6
750+
B 2023-01-01T09:12:30 1 80
751+
B 2023-01-01T09:12:40 1 120
752+
B 2023-01-01T09:12:50 1 92.3
753+
C 2023-01-01T09:00:00 1 300.5
754+
C 2023-01-01T09:00:10 1 285.7
755+
C 2023-01-01T09:00:20 1 310.2
756+
C 2023-01-01T09:00:30 1 295.8
757+
C 2023-01-01T09:00:40 1 300
758+
C 2023-01-01T09:12:40 1 250
759+
C 2023-01-01T09:12:50 1 275.4
760+
707761
##########
708762
# CLEANUP
709763
##########

0 commit comments

Comments
 (0)