Skip to content

Commit 45620e9

Browse files
committed
Enhance sort pushdown logic to include distribution requirements for SortExec
1 parent 689bc94 commit 45620e9

1 file changed

Lines changed: 70 additions & 10 deletions

File tree

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use std::fmt::Debug;
1919
use std::sync::Arc;
2020

2121
use crate::utils::{
22-
add_sort_above, is_sort, is_sort_preserving_merge, is_union, is_window,
22+
add_sort_above_with_distribution, is_sort, is_sort_preserving_merge, is_union,
23+
is_window,
2324
};
2425

2526
use arrow::datatypes::SchemaRef;
@@ -29,7 +30,7 @@ use datafusion_expr::JoinType;
2930
use datafusion_physical_expr::expressions::Column;
3031
use datafusion_physical_expr::utils::collect_columns;
3132
use datafusion_physical_expr::{
32-
EquivalenceProperties, add_offset_to_physical_sort_exprs,
33+
Distribution, EquivalenceProperties, add_offset_to_physical_sort_exprs,
3334
};
3435
use datafusion_physical_expr_common::sort_expr::{
3536
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
@@ -55,23 +56,46 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
5556
/// of the parent node as its data.
5657
///
5758
/// [`EnforceSorting`]: crate::enforce_sorting::EnforceSorting
58-
#[derive(Default, Clone, Debug)]
59+
#[derive(Clone, Debug)]
5960
pub struct ParentRequirements {
6061
ordering_requirement: Option<OrderingRequirements>,
6162
fetch: Option<usize>,
63+
/// The distribution required by whatever consumer will sit above any
64+
/// `SortExec` we materialise here. When a sort is added by `add_sort_above`
65+
/// over a multi-partition input, we use this to decide whether the new
66+
/// sort needs a `SortPreservingMergeExec` wrapper to produce a single
67+
/// partition.
68+
distribution_requirement: Distribution,
69+
}
70+
71+
impl Default for ParentRequirements {
72+
fn default() -> Self {
73+
Self {
74+
ordering_requirement: None,
75+
fetch: None,
76+
distribution_requirement: Distribution::UnspecifiedDistribution,
77+
}
78+
}
6279
}
6380

6481
pub type SortPushDown = PlanContext<ParentRequirements>;
6582

6683
/// Assigns the ordering requirement of the root node to the its children.
6784
pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
6885
let reqs = sort_push_down.plan.required_input_ordering();
69-
for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
86+
let dists = sort_push_down.plan.required_input_distribution();
87+
for (idx, (child, requirement)) in
88+
sort_push_down.children.iter_mut().zip(reqs).enumerate()
89+
{
7090
child.data = ParentRequirements {
7191
ordering_requirement: requirement,
7292
// If the parent has a fetch value, assign it to the children
7393
// Or use the fetch value of the child.
7494
fetch: child.plan.fetch(),
95+
distribution_requirement: dists
96+
.get(idx)
97+
.cloned()
98+
.unwrap_or(Distribution::UnspecifiedDistribution),
7599
};
76100
}
77101
}
@@ -97,6 +121,12 @@ fn pushdown_sorts_helper(
97121
) -> Result<Transformed<SortPushDown>> {
98122
let plan = sort_push_down.plan;
99123
let parent_fetch = sort_push_down.data.fetch;
124+
// The distribution required by whatever sits above any new sort we add
125+
// here. When this node is a SortExec we are about to remove or replace,
126+
// the new sort takes the removed sort's slot, so its consumer is the
127+
// grandparent — i.e. the same distribution requirement that flowed into
128+
// this call.
129+
let parent_distribution = sort_push_down.data.distribution_requirement.clone();
100130

101131
let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone()
102132
else {
@@ -116,6 +146,10 @@ fn pushdown_sorts_helper(
116146
sort_push_down.data.fetch = fetch;
117147
sort_push_down.data.ordering_requirement =
118148
Some(OrderingRequirements::from(sort_ordering));
149+
// The new context now sits where the SortExec was; preserve the
150+
// grandparent's distribution requirement so a subsequent
151+
// `add_sort_above` knows whether to wrap in SortPreservingMergeExec.
152+
sort_push_down.data.distribution_requirement = parent_distribution;
119153
// Recursive call to helper, so it doesn't transform_down and miss
120154
// the new node (previous child of sort):
121155
return pushdown_sorts_helper(sort_push_down);
@@ -149,16 +183,21 @@ fn pushdown_sorts_helper(
149183
// The sort was imposing a different ordering than the one being
150184
// pushed down. Replace it with a sort that matches the pushed-down
151185
// ordering, and continue the pushdown.
152-
// Add back the sort:
153-
sort_push_down = add_sort_above(
186+
// Add back the sort. The new sort sits where the old one did, so
187+
// its consumer is the grandparent and we must respect that
188+
// distribution requirement (otherwise a multi-partition input
189+
// produces preserve_partitioning=true with no SPM above).
190+
sort_push_down = add_sort_above_with_distribution(
154191
sort_push_down,
155192
parent_requirement.into_single(),
156193
parent_fetch,
194+
&parent_distribution,
157195
);
158196
// Update pushdown requirements:
159197
sort_push_down.children[0].data = ParentRequirements {
160198
ordering_requirement: Some(OrderingRequirements::from(sort_ordering)),
161199
fetch: sort_fetch,
200+
distribution_requirement: Distribution::UnspecifiedDistribution,
162201
};
163202
return Ok(Transformed::yes(sort_push_down));
164203
} else {
@@ -174,6 +213,10 @@ fn pushdown_sorts_helper(
174213
} else {
175214
Some(parent_requirement)
176215
};
216+
// The sort was removed; carry the grandparent's distribution
217+
// requirement so any sort we materialise deeper down still
218+
// satisfies it.
219+
sort_push_down.data.distribution_requirement = parent_distribution;
177220
// Recursive call to helper, so it doesn't transform_down and miss
178221
// the new node (previous child of sort):
179222
return pushdown_sorts_helper(sort_push_down);
@@ -184,10 +227,17 @@ fn pushdown_sorts_helper(
184227
if satisfy_parent {
185228
// For non-sort operators which satisfy ordering:
186229
let reqs = sort_push_down.plan.required_input_ordering();
230+
let dists = sort_push_down.plan.required_input_distribution();
187231

188-
for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
232+
for (idx, (child, order)) in
233+
sort_push_down.children.iter_mut().zip(reqs).enumerate()
234+
{
189235
child.data.ordering_requirement = order;
190236
child.data.fetch = min_fetch(parent_fetch, child.data.fetch);
237+
child.data.distribution_requirement = dists
238+
.get(idx)
239+
.cloned()
240+
.unwrap_or(Distribution::UnspecifiedDistribution);
191241
}
192242
} else if let Some(adjusted) = pushdown_requirement_to_children(
193243
&sort_push_down.plan,
@@ -197,17 +247,27 @@ fn pushdown_sorts_helper(
197247
// For operators that can take a sort pushdown, continue with updated
198248
// requirements:
199249
let current_fetch = sort_push_down.plan.fetch();
200-
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
250+
let dists = sort_push_down.plan.required_input_distribution();
251+
for (idx, (child, order)) in
252+
sort_push_down.children.iter_mut().zip(adjusted).enumerate()
253+
{
201254
child.data.ordering_requirement = order;
202255
child.data.fetch = min_fetch(current_fetch, parent_fetch);
256+
child.data.distribution_requirement = dists
257+
.get(idx)
258+
.cloned()
259+
.unwrap_or(Distribution::UnspecifiedDistribution);
203260
}
204261
sort_push_down.data.ordering_requirement = None;
205262
} else {
206-
// Can not push down requirements, add new `SortExec`:
207-
sort_push_down = add_sort_above(
263+
// Can not push down requirements, add new `SortExec`. The new sort sits
264+
// between this node and its parent, so its consumer's distribution
265+
// requirement is the one carried in `parent_distribution`.
266+
sort_push_down = add_sort_above_with_distribution(
208267
sort_push_down,
209268
parent_requirement.into_single(),
210269
parent_fetch,
270+
&parent_distribution,
211271
);
212272
assign_initial_requirements(&mut sort_push_down);
213273
}

0 commit comments

Comments
 (0)