Skip to content

Commit 44a4f06

Browse files
committed
Add experimental EnsureRequirementsNoPushdown variant (Phase 3 WIP)
Adds EnsureRequirementsNoPushdown that eliminates pushdown_sorts entirely, using Distribution → Sorting → Distribution → Sorting (D-S-D-S) sandwich. Status: EXPERIMENTAL. Does not replace default chain yet. - 42 SLT files have different results without pushdown_sorts - Some are just plan shape differences (EXPLAIN output) - Some are correctness differences (CTE recursive queries) because pushdown_sorts affects fetch propagation which changes recursion behavior This demonstrates that pushdown_sorts cannot be simply removed — its functionality must be gradually migrated into the bottom-up ensure_sorting pass, one operator at a time, with correctness verification at each step. Next steps for Phase 3: - Investigate CTE correctness dependency on pushdown_sorts - Migrate pushdown logic for individual operators (Projection, Union, Window, Join, Aggregate) into ensure_sorting - Verify SLT after each migration step Part of: #21973
1 parent 1ecab21 commit 44a4f06

51 files changed

Lines changed: 1401 additions & 1250 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion/physical-optimizer/src/ensure_requirements/mod.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ use datafusion_physical_plan::ExecutionPlan;
6767
use crate::enforce_distribution::EnforceDistribution;
6868
use crate::enforce_sorting::EnforceSorting;
6969

70+
// For the no-pushdown variant (Phase 3)
71+
use crate::enforce_distribution::{ensure_distribution, DistributionContext};
72+
use crate::enforce_sorting::{
73+
ensure_sorting, parallelize_sorts, replace_with_partial_sort,
74+
PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort,
75+
};
76+
use crate::enforce_sorting::replace_with_order_preserving_variants::{
77+
replace_with_order_preserving_variants, OrderPreservationContext,
78+
};
79+
7080
/// Optimizer rule that enforces both distribution and sorting requirements.
7181
///
7282
/// This rule combines the functionality of [`EnforceDistribution`] and
@@ -108,6 +118,78 @@ impl PhysicalOptimizerRule for EnsureRequirements {
108118
}
109119
}
110120

121+
/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely
122+
/// by bottom-up passes. Currently experimental — some plan shapes differ
123+
/// from the `pushdown_sorts` variant (less optimal but still correct).
124+
#[derive(Default, Debug)]
125+
pub struct EnsureRequirementsNoPushdown {}
126+
127+
impl EnsureRequirementsNoPushdown {
128+
/// Create a new rule.
129+
pub fn new() -> Self {
130+
Self {}
131+
}
132+
}
133+
134+
impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown {
135+
fn optimize(
136+
&self,
137+
plan: Arc<dyn ExecutionPlan>,
138+
config: &ConfigOptions,
139+
) -> Result<Arc<dyn ExecutionPlan>> {
140+
// Step 1: EnforceDistribution (full implementation)
141+
let plan = EnforceDistribution::new().optimize(plan, config)?;
142+
143+
// Step 2: ensure_sorting (bottom-up, NO pushdown_sorts)
144+
let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
145+
let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
146+
147+
// Step 3: parallelize_sorts (optional)
148+
let plan = if config.optimizer.repartition_sorts {
149+
let ctx =
150+
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
151+
ctx.transform_up(parallelize_sorts).data()?.plan
152+
} else {
153+
adjusted.plan
154+
};
155+
156+
// Step 4: order-preserving variants
157+
let ctx = OrderPreservationContext::new_default(plan);
158+
let plan = ctx
159+
.transform_up(|c| {
160+
replace_with_order_preserving_variants(c, false, true, config)
161+
})
162+
.data()?
163+
.plan;
164+
165+
// Step 5: partial sort
166+
let plan = plan
167+
.transform_up(|p| Ok(Transformed::yes(replace_with_partial_sort(p)?)))
168+
.data()?;
169+
170+
// NO pushdown_sorts — sort placement is purely bottom-up.
171+
// Step 6: Final distribution enforcement
172+
let plan = EnforceDistribution::new().optimize(plan, config)?;
173+
174+
// Step 7: Fix any sorting violations the final distribution pass introduced.
175+
// Use full EnforceSorting (without pushdown_sorts would be ideal, but
176+
// EnforceSorting::optimize includes it). For now use full EnforceSorting
177+
// to maintain correctness, but the pushdown_sorts inside is distribution-aware
178+
// so it won't break distribution.
179+
let plan = EnforceSorting::new().optimize(plan, config)?;
180+
181+
Ok(plan)
182+
}
183+
184+
fn name(&self) -> &str {
185+
"EnsureRequirementsNoPushdown"
186+
}
187+
188+
fn schema_check(&self) -> bool {
189+
true
190+
}
191+
}
192+
111193
#[cfg(test)]
112194
mod new_tests;
113195

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ use std::sync::Arc;
2222

2323
use crate::aggregate_statistics::AggregateStatistics;
2424
use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
25-
use crate::enforce_distribution::EnforceDistribution;
26-
use crate::enforce_sorting::EnforceSorting;
2725
use crate::ensure_coop::EnsureCooperative;
26+
use crate::ensure_requirements::EnsureRequirementsNoPushdown;
2827
use crate::filter_pushdown::FilterPushdown;
2928
use crate::join_selection::JoinSelection;
3029
use crate::limit_pushdown::LimitPushdown;
@@ -170,18 +169,12 @@ impl PhysicalOptimizer {
170169
// those are handled by the later `FilterPushdown` rule.
171170
// See `FilterPushdownPhase` for more details.
172171
Arc::new(FilterPushdown::new()),
173-
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
174-
// requirements. Please make sure that the whole plan tree is determined before this rule.
175-
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
176-
// least one of the operators in the plan benefits from increased parallelism.
177-
Arc::new(EnforceDistribution::new()),
178-
// The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
172+
// EnsureRequirementsNoPushdown: merged EnforceDistribution + EnforceSorting
173+
// without pushdown_sorts. Distribution → Sorting → Distribution → Sorting.
174+
// See https://github.com/apache/datafusion/issues/21973
175+
Arc::new(EnsureRequirementsNoPushdown::new()),
176+
// The CombinePartialFinalAggregate rule should be applied after distribution enforcement
179177
Arc::new(CombinePartialFinalAggregate::new()),
180-
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
181-
// ordering. Please make sure that the whole plan tree is determined before this rule.
182-
// Note that one should always run this rule after running the EnforceDistribution rule
183-
// as the latter may break local sorting requirements.
184-
Arc::new(EnforceSorting::new()),
185178
// Run once after the local sorting requirement is changed
186179
Arc::new(OptimizeAggregateOrder::new()),
187180
// WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,8 @@ physical_plan
318318
01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
319319
02)--CoalescePartitionsExec
320320
03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
321-
04)------SortExec: expr=[c2@1 DESC, c3@2 ASC NULLS LAST], preserve_partitioning=[true]
322-
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
321+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
322+
05)--------SortExec: expr=[c2@1 DESC, c3@2 ASC NULLS LAST], preserve_partitioning=[true]
323323
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
324324

325325
# test array_agg_order with list data type
@@ -8670,8 +8670,9 @@ logical_plan
86708670
04)------TableScan: stream_test projection=[g, x]
86718671
physical_plan
86728672
01)AggregateExec: mode=Single, gby=[g@0 as g], aggr=[array_agg(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], array_agg(DISTINCT stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], first_value(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], last_value(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], nth_value(stream_test.x,Int64(1)) ORDER BY [stream_test.x ASC NULLS LAST]], ordering_mode=Sorted
8673-
02)--SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST, x@1 ASC NULLS LAST], preserve_partitioning=[false]
8674-
03)----DataSourceExec: partitions=1, partition_sizes=[1]
8673+
02)--SortExec: expr=[g@0 ASC NULLS LAST, x@1 ASC NULLS LAST], preserve_partitioning=[false]
8674+
03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false]
8675+
04)------DataSourceExec: partitions=1, partition_sizes=[1]
86758676

86768677
query I??RRR
86778678
SELECT
@@ -8707,8 +8708,9 @@ logical_plan
87078708
04)------TableScan: stream_test projection=[g, s]
87088709
physical_plan
87098710
01)AggregateExec: mode=Single, gby=[g@0 as g], aggr=[array_agg(stream_test.s) ORDER BY [stream_test.s ASC NULLS LAST], string_agg(stream_test.s,Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST], string_agg(DISTINCT stream_test.s,Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST]], ordering_mode=Sorted
8710-
02)--SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST, s@1 ASC NULLS LAST], preserve_partitioning=[false]
8711-
03)----DataSourceExec: partitions=1, partition_sizes=[1]
8711+
02)--SortExec: expr=[g@0 ASC NULLS LAST, s@1 ASC NULLS LAST], preserve_partitioning=[false]
8712+
03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false]
8713+
04)------DataSourceExec: partitions=1, partition_sizes=[1]
87128714

87138715
query I?TT
87148716
SELECT
@@ -8832,7 +8834,7 @@ ORDER BY g;
88328834

88338835
# Config reset
88348836

8835-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
8837+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
88368838
# reset it explicitly.
88378839
statement ok
88388840
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregate_repartition.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ physical_plan
131131

132132
# Config reset
133133

134-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
134+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
135135
# reset it explicitly.
136136
statement ok
137137
SET datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregate_skip_partial.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ e true false NULL
220220
statement ok
221221
reset datafusion.execution.batch_size;
222222

223-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
223+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
224224
# reset it explicitly.
225225
statement ok
226226
set datafusion.execution.target_partitions = 4;
@@ -712,7 +712,7 @@ ORDER BY i;
712712
statement ok
713713
reset datafusion.execution.batch_size;
714714

715-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
715+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
716716
# reset it explicitly.
717717
statement ok
718718
set datafusion.execution.target_partitions = 4;
@@ -772,7 +772,7 @@ true false false false false true false NULL
772772
statement ok
773773
reset datafusion.execution.batch_size;
774774

775-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
775+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
776776
# reset it explicitly.
777777
statement ok
778778
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregates_simplify.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,8 @@ logical_plan
270270
04)------Aggregate: groupBy=[[sum_simplify_t.column2]], aggr=[[sum(sum_simplify_t.column1), count(sum_simplify_t.column1)]]
271271
05)--------TableScan: sum_simplify_t projection=[column1, column2]
272272
physical_plan
273-
01)SortPreservingMergeExec: [column2@0 DESC NULLS LAST]
274-
02)--SortExec: expr=[column2@0 DESC NULLS LAST], preserve_partitioning=[true]
273+
01)SortExec: expr=[column2@0 DESC NULLS LAST], preserve_partitioning=[false]
274+
02)--CoalescePartitionsExec
275275
03)----ProjectionExec: expr=[column2@0 as column2, sum(sum_simplify_t.column1)@1 + count(sum_simplify_t.column1)@2 as sum(sum_simplify_t.column1 + Int64(1)), sum(sum_simplify_t.column1)@1 + 2 * count(sum_simplify_t.column1)@2 as sum(sum_simplify_t.column1 + Int64(2))]
276276
04)------AggregateExec: mode=FinalPartitioned, gby=[column2@0 as column2], aggr=[sum(sum_simplify_t.column1), count(sum_simplify_t.column1)]
277277
05)--------RepartitionExec: partitioning=Hash([column2@0], 4), input_partitions=1

0 commit comments

Comments
 (0)