Skip to content

Commit 1e4706e

Browse files
Fix tests
1 parent 72c8723 commit 1e4706e

1 file changed

Lines changed: 21 additions & 13 deletions

File tree

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,20 +172,18 @@ fn pushdown_sorts_helper(
172172
return pushdown_sorts_helper(sort_push_down);
173173
}
174174
sort_push_down.plan = plan;
175-
// No ordering work to do at this node, but we still need to propagate
176-
// the distribution requirement to children before transform_down
177-
// descends. Otherwise, when we eventually reach a node where a sort
178-
// must be added, `parent_distribution` has decayed to
179-
// `UnspecifiedDistribution` and `add_sort_above_with_distribution`
180-
// skips the wrapping `SortPreservingMergeExec`.
175+
// No ordering is being pushed down here, so only use the node's own
176+
// distribution requirement. Do NOT propagate parent_distribution
177+
// through partition-merging nodes (e.g. SortPreservingMergeExec):
178+
// those nodes already satisfy SinglePartition themselves, so the
179+
// children below them should not be forced to also produce a single
180+
// partition.
181181
let dists = sort_push_down.plan.required_input_distribution();
182182
for (idx, child) in sort_push_down.children.iter_mut().enumerate() {
183-
child.data.distribution_requirement = stronger_distribution(
184-
&parent_distribution,
185-
dists
186-
.get(idx)
187-
.unwrap_or(&Distribution::UnspecifiedDistribution),
188-
);
183+
child.data.distribution_requirement = dists
184+
.get(idx)
185+
.cloned()
186+
.unwrap_or(Distribution::UnspecifiedDistribution);
189187
}
190188
return Ok(Transformed::no(sort_push_down));
191189
};
@@ -261,6 +259,16 @@ fn pushdown_sorts_helper(
261259
let reqs = sort_push_down.plan.required_input_ordering();
262260
let dists = sort_push_down.plan.required_input_distribution();
263261

262+
// If this node already produces a single partition it has absorbed any
263+
// SinglePartition requirement from the consumer above. Don't push
264+
// that requirement down into children that live below the merge point.
265+
let effective_parent_dist =
266+
if sort_push_down.plan.output_partitioning().partition_count() == 1 {
267+
Distribution::UnspecifiedDistribution
268+
} else {
269+
parent_distribution.clone()
270+
};
271+
264272
for (idx, (child, order)) in
265273
sort_push_down.children.iter_mut().zip(reqs).enumerate()
266274
{
@@ -273,7 +281,7 @@ fn pushdown_sorts_helper(
273281
// from a higher consumer must propagate, not get reset to this
274282
// node's own (often `UnspecifiedDistribution`) input requirement.
275283
child.data.distribution_requirement = stronger_distribution(
276-
&parent_distribution,
284+
&effective_parent_dist,
277285
dists
278286
.get(idx)
279287
.unwrap_or(&Distribution::UnspecifiedDistribution),

0 commit comments

Comments
 (0)