Skip to content

Commit 7768d24

Browse files
Enhance sort pushdown logic to include distribution requirements for … (#54)
Co-authored-by: Matthew Turner <matthew.m.turner@outlook.com>
1 parent 689bc94 commit 7768d24

2 files changed

Lines changed: 218 additions & 11 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use crate::physical_optimizer::test_utils::{
2424
coalesce_partitions_exec, create_test_schema, create_test_schema2,
2525
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec,
2626
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec,
27-
repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options,
27+
repartition_exec, simple_projection_exec, sort_exec, sort_exec_with_fetch,
28+
sort_exec_with_preserve_partitioning, sort_expr, sort_expr_options,
2829
sort_merge_join_exec, sort_preserving_merge_exec,
2930
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
3031
union_exec,
@@ -458,6 +459,102 @@ async fn output_requirement_adds_merge_after_partition_preserving_sort() -> Resu
458459
Ok(())
459460
}
460461

462+
/// Regression test: when `OutputRequirementExec(SinglePartition)` wraps a plan
463+
/// that already contains `SortPreservingMergeExec`, sort pushdown must not add
464+
/// a second `SortPreservingMergeExec` below the existing one.
465+
#[test]
466+
fn test_no_extra_spm_from_output_requirement_single_partition() -> Result<()> {
467+
let schema = create_test_schema()?;
468+
let sort_exprs: LexOrdering = [sort_expr("nullable_col", &schema)].into();
469+
let requirement = [PhysicalSortRequirement::new(
470+
col("nullable_col", &schema)?,
471+
Some(SortOptions::new(false, true)),
472+
)]
473+
.into();
474+
475+
// Plan entering pushdown_sorts:
476+
// OutputRequirementExec (dist=SinglePartition)
477+
// SortPreservingMergeExec [nullable_col@0]
478+
// SortExec [nullable_col@0] (preserve_partitioning=true)
479+
// RepartitionExec (10 partitions)
480+
// DataSource
481+
let source = memory_exec(&schema);
482+
let repartitioned = repartition_exec(source);
483+
let sorted = sort_exec_with_preserve_partitioning(sort_exprs.clone(), repartitioned);
484+
let merged = sort_preserving_merge_exec(sort_exprs.clone(), sorted);
485+
let plan: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
486+
merged,
487+
Some(OrderingRequirements::new(requirement)),
488+
Distribution::SinglePartition,
489+
None,
490+
));
491+
492+
let mut sort_pushdown = SortPushDown::new_default(Arc::clone(&plan));
493+
assign_initial_requirements(&mut sort_pushdown);
494+
let result = pushdown_sorts(sort_pushdown)?;
495+
496+
// The plan is already optimal; no extra SortPreservingMergeExec should appear.
497+
assert_snapshot!(
498+
displayable(result.plan.as_ref()).indent(true).to_string(),
499+
@r"
500+
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
501+
SortPreservingMergeExec: [nullable_col@0 ASC]
502+
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]
503+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
504+
DataSourceExec: partitions=1, partition_sizes=[0]
505+
"
506+
);
507+
Ok(())
508+
}
509+
510+
/// Positive test: when `OutputRequirementExec` carries `SinglePartition` and
511+
/// sort pushdown reaches a multi-partition node through a projection, it must
512+
/// insert both `SortExec(preserve_partitioning=true)` AND
513+
/// `SortPreservingMergeExec` — the core behaviour added by commit 45620e982.
514+
#[test]
515+
fn test_sort_pushdown_adds_spm_for_single_partition_requirement() -> Result<()> {
516+
let schema = create_test_schema()?;
517+
let requirement = [PhysicalSortRequirement::new(
518+
col("nullable_col", &schema)?,
519+
Some(SortOptions::new(false, true)),
520+
)]
521+
.into();
522+
523+
// Plan entering pushdown_sorts:
524+
// OutputRequirementExec (dist=SinglePartition, order=[nullable_col@0])
525+
// ProjectionExec (identity)
526+
// RepartitionExec (10 partitions)
527+
// DataSource
528+
let source = memory_exec(&schema);
529+
let repartitioned = repartition_exec(source);
530+
let projected = simple_projection_exec(repartitioned, vec![0, 1]);
531+
let plan: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
532+
projected,
533+
Some(OrderingRequirements::new(requirement)),
534+
Distribution::SinglePartition,
535+
None,
536+
));
537+
538+
let mut sort_pushdown = SortPushDown::new_default(Arc::clone(&plan));
539+
assign_initial_requirements(&mut sort_pushdown);
540+
let result = pushdown_sorts(sort_pushdown)?;
541+
542+
// Sort is pushed through the projection; because SinglePartition is
543+
// required, add_sort_above_with_distribution wraps it in SPM.
544+
assert_snapshot!(
545+
displayable(result.plan.as_ref()).indent(true).to_string(),
546+
@r"
547+
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
548+
SortPreservingMergeExec: [nullable_col@0 ASC]
549+
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]
550+
ProjectionExec: expr=[nullable_col@0 as nullable_col, non_nullable_col@1 as non_nullable_col]
551+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
552+
DataSourceExec: partitions=1, partition_sizes=[0]
553+
"
554+
);
555+
Ok(())
556+
}
557+
461558
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(
462559
repartition_sorts: bool,
463560
) -> Result<String> {

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 120 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use std::fmt::Debug;
1919
use std::sync::Arc;
2020

2121
use crate::utils::{
22-
add_sort_above, is_sort, is_sort_preserving_merge, is_union, is_window,
22+
add_sort_above_with_distribution, is_sort, is_sort_preserving_merge, is_union,
23+
is_window,
2324
};
2425

2526
use arrow::datatypes::SchemaRef;
@@ -29,7 +30,7 @@ use datafusion_expr::JoinType;
2930
use datafusion_physical_expr::expressions::Column;
3031
use datafusion_physical_expr::utils::collect_columns;
3132
use datafusion_physical_expr::{
32-
EquivalenceProperties, add_offset_to_physical_sort_exprs,
33+
Distribution, EquivalenceProperties, add_offset_to_physical_sort_exprs,
3334
};
3435
use datafusion_physical_expr_common::sort_expr::{
3536
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
@@ -55,23 +56,46 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
5556
/// of the parent node as its data.
5657
///
5758
/// [`EnforceSorting`]: crate::enforce_sorting::EnforceSorting
58-
#[derive(Default, Clone, Debug)]
59+
#[derive(Clone, Debug)]
5960
pub struct ParentRequirements {
6061
ordering_requirement: Option<OrderingRequirements>,
6162
fetch: Option<usize>,
63+
/// The distribution required by whatever consumer will sit above any
64+
/// `SortExec` we materialise here. When a sort is added by `add_sort_above`
65+
/// over a multi-partition input, we use this to decide whether the new
66+
/// sort needs a `SortPreservingMergeExec` wrapper to produce a single
67+
/// partition.
68+
distribution_requirement: Distribution,
69+
}
70+
71+
impl Default for ParentRequirements {
72+
fn default() -> Self {
73+
Self {
74+
ordering_requirement: None,
75+
fetch: None,
76+
distribution_requirement: Distribution::UnspecifiedDistribution,
77+
}
78+
}
6279
}
6380

6481
pub type SortPushDown = PlanContext<ParentRequirements>;
6582

6683
/// Assigns the ordering requirement of the root node to the its children.
6784
pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
6885
let reqs = sort_push_down.plan.required_input_ordering();
69-
for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
86+
let dists = sort_push_down.plan.required_input_distribution();
87+
for (idx, (child, requirement)) in
88+
sort_push_down.children.iter_mut().zip(reqs).enumerate()
89+
{
7090
child.data = ParentRequirements {
7191
ordering_requirement: requirement,
7292
// If the parent has a fetch value, assign it to the children
7393
// Or use the fetch value of the child.
7494
fetch: child.plan.fetch(),
95+
distribution_requirement: dists
96+
.get(idx)
97+
.cloned()
98+
.unwrap_or(Distribution::UnspecifiedDistribution),
7599
};
76100
}
77101
}
@@ -92,11 +116,34 @@ fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
92116
}
93117
}
94118

119+
/// Returns the stricter of two distribution requirements when propagating
120+
/// `parent_distribution` down through pass-through operators.
121+
///
122+
/// `SinglePartition` is the strictest requirement we care about for the
123+
/// purposes of inserting `SortPreservingMergeExec` above a partition-
124+
/// preserving `SortExec`. If either side requests it, we keep that.
125+
fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution {
126+
match (a, b) {
127+
(Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => {
128+
Distribution::SinglePartition
129+
}
130+
(Distribution::HashPartitioned(_), _) => a.clone(),
131+
(_, Distribution::HashPartitioned(_)) => b.clone(),
132+
_ => Distribution::UnspecifiedDistribution,
133+
}
134+
}
135+
95136
fn pushdown_sorts_helper(
96137
mut sort_push_down: SortPushDown,
97138
) -> Result<Transformed<SortPushDown>> {
98139
let plan = sort_push_down.plan;
99140
let parent_fetch = sort_push_down.data.fetch;
141+
// The distribution required by whatever sits above any new sort we add
142+
// here. When this node is a SortExec we are about to remove or replace,
143+
// the new sort takes the removed sort's slot, so its consumer is the
144+
// grandparent — i.e. the same distribution requirement that flowed into
145+
// this call.
146+
let parent_distribution = sort_push_down.data.distribution_requirement.clone();
100147

101148
let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone()
102149
else {
@@ -116,11 +163,28 @@ fn pushdown_sorts_helper(
116163
sort_push_down.data.fetch = fetch;
117164
sort_push_down.data.ordering_requirement =
118165
Some(OrderingRequirements::from(sort_ordering));
166+
// The new context now sits where the SortExec was; preserve the
167+
// grandparent's distribution requirement so a subsequent
168+
// `add_sort_above` knows whether to wrap in SortPreservingMergeExec.
169+
sort_push_down.data.distribution_requirement = parent_distribution;
119170
// Recursive call to helper, so it doesn't transform_down and miss
120171
// the new node (previous child of sort):
121172
return pushdown_sorts_helper(sort_push_down);
122173
}
123174
sort_push_down.plan = plan;
175+
// No ordering is being pushed down here, so only use the node's own
176+
// distribution requirement. Do NOT propagate parent_distribution
177+
// through partition-merging nodes (e.g. SortPreservingMergeExec):
178+
// those nodes already satisfy SinglePartition themselves, so the
179+
// children below them should not be forced to also produce a single
180+
// partition.
181+
let dists = sort_push_down.plan.required_input_distribution();
182+
for (idx, child) in sort_push_down.children.iter_mut().enumerate() {
183+
child.data.distribution_requirement = dists
184+
.get(idx)
185+
.cloned()
186+
.unwrap_or(Distribution::UnspecifiedDistribution);
187+
}
124188
return Ok(Transformed::no(sort_push_down));
125189
};
126190

@@ -149,16 +213,21 @@ fn pushdown_sorts_helper(
149213
// The sort was imposing a different ordering than the one being
150214
// pushed down. Replace it with a sort that matches the pushed-down
151215
// ordering, and continue the pushdown.
152-
// Add back the sort:
153-
sort_push_down = add_sort_above(
216+
// Add back the sort. The new sort sits where the old one did, so
217+
// its consumer is the grandparent and we must respect that
218+
// distribution requirement (otherwise a multi-partition input
219+
// produces preserve_partitioning=true with no SPM above).
220+
sort_push_down = add_sort_above_with_distribution(
154221
sort_push_down,
155222
parent_requirement.into_single(),
156223
parent_fetch,
224+
&parent_distribution,
157225
);
158226
// Update pushdown requirements:
159227
sort_push_down.children[0].data = ParentRequirements {
160228
ordering_requirement: Some(OrderingRequirements::from(sort_ordering)),
161229
fetch: sort_fetch,
230+
distribution_requirement: Distribution::UnspecifiedDistribution,
162231
};
163232
return Ok(Transformed::yes(sort_push_down));
164233
} else {
@@ -174,6 +243,10 @@ fn pushdown_sorts_helper(
174243
} else {
175244
Some(parent_requirement)
176245
};
246+
// The sort was removed; carry the grandparent's distribution
247+
// requirement so any sort we materialise deeper down still
248+
// satisfies it.
249+
sort_push_down.data.distribution_requirement = parent_distribution;
177250
// Recursive call to helper, so it doesn't transform_down and miss
178251
// the new node (previous child of sort):
179252
return pushdown_sorts_helper(sort_push_down);
@@ -184,10 +257,35 @@ fn pushdown_sorts_helper(
184257
if satisfy_parent {
185258
// For non-sort operators which satisfy ordering:
186259
let reqs = sort_push_down.plan.required_input_ordering();
260+
let dists = sort_push_down.plan.required_input_distribution();
261+
262+
// If this node already produces a single partition it has absorbed any
263+
// SinglePartition requirement from the consumer above. Don't push
264+
// that requirement down into children that live below the merge point.
265+
let effective_parent_dist =
266+
if sort_push_down.plan.output_partitioning().partition_count() == 1 {
267+
Distribution::UnspecifiedDistribution
268+
} else {
269+
parent_distribution.clone()
270+
};
187271

188-
for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
272+
for (idx, (child, order)) in
273+
sort_push_down.children.iter_mut().zip(reqs).enumerate()
274+
{
189275
child.data.ordering_requirement = order;
190276
child.data.fetch = min_fetch(parent_fetch, child.data.fetch);
277+
// Any sort we materialise inside this child subtree must still
278+
// satisfy the strongest distribution requirement we've seen on
279+
// the way down. Pass-through operators (Projection, Filter, etc.)
280+
// don't change partitioning, so a `SinglePartition` requirement
281+
// from a higher consumer must propagate, not get reset to this
282+
// node's own (often `UnspecifiedDistribution`) input requirement.
283+
child.data.distribution_requirement = stronger_distribution(
284+
&effective_parent_dist,
285+
dists
286+
.get(idx)
287+
.unwrap_or(&Distribution::UnspecifiedDistribution),
288+
);
191289
}
192290
} else if let Some(adjusted) = pushdown_requirement_to_children(
193291
&sort_push_down.plan,
@@ -197,17 +295,29 @@ fn pushdown_sorts_helper(
197295
// For operators that can take a sort pushdown, continue with updated
198296
// requirements:
199297
let current_fetch = sort_push_down.plan.fetch();
200-
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
298+
let dists = sort_push_down.plan.required_input_distribution();
299+
for (idx, (child, order)) in
300+
sort_push_down.children.iter_mut().zip(adjusted).enumerate()
301+
{
201302
child.data.ordering_requirement = order;
202303
child.data.fetch = min_fetch(current_fetch, parent_fetch);
304+
child.data.distribution_requirement = stronger_distribution(
305+
&parent_distribution,
306+
dists
307+
.get(idx)
308+
.unwrap_or(&Distribution::UnspecifiedDistribution),
309+
);
203310
}
204311
sort_push_down.data.ordering_requirement = None;
205312
} else {
206-
// Can not push down requirements, add new `SortExec`:
207-
sort_push_down = add_sort_above(
313+
// Can not push down requirements, add new `SortExec`. The new sort sits
314+
// between this node and its parent, so its consumer's distribution
315+
// requirement is the one carried in `parent_distribution`.
316+
sort_push_down = add_sort_above_with_distribution(
208317
sort_push_down,
209318
parent_requirement.into_single(),
210319
parent_fetch,
320+
&parent_distribution,
211321
);
212322
assign_initial_requirements(&mut sort_push_down);
213323
}

0 commit comments

Comments
 (0)