Skip to content

Commit cefa63a

Browse files
committed
fix fetch with new order lex
1 parent 5a99099 commit cefa63a

1 file changed

Lines changed: 8 additions & 15 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -940,28 +940,21 @@ fn add_merge_on_top(
940940
input: DistributionContext,
941941
fetch: &mut Option<usize>,
942942
) -> DistributionContext {
943-
// Add SortPreservingMerge only when partition count is larger than 1.
943+
// Apply only when the partition count is larger than one.
944944
if input.plan.output_partitioning().partition_count() > 1 {
945945
// When there is an existing ordering, we preserve ordering
946946
// when decreasing partitions. This will be un-done in the future
947947
// if any of the following conditions is true
948948
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
949949
// - Usage of order preserving variants is not desirable
950-
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
951-
let should_preserve_ordering = input.plan.output_ordering().is_some();
952-
953-
let ordering = input
954-
.plan
955-
.output_ordering()
956-
.cloned()
957-
.unwrap_or_else(LexOrdering::default);
958-
959-
let new_plan = if should_preserve_ordering {
960-
Arc::new(
961-
SortPreservingMergeExec::new(ordering, Arc::clone(&input.plan))
962-
.with_fetch(fetch.take()),
963-
) as _
950+
// (determined by flag `config.optimizer.prefer_existing_sort`)
951+
let new_plan = if let Some(req) = input.plan.output_ordering() {
952+
Arc::new(SortPreservingMergeExec::new(
953+
req.clone(),
954+
Arc::clone(&input.plan),
955+
).with_fetch(*fetch)) as _
964956
} else {
957+
// If there is no input order, we can simply coalesce partitions:
965958
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
966959
};
967960

0 commit comments

Comments
 (0)