Skip to content

Commit 1f47d46

Browse files
committed
fix fetch add back with new lex order
1 parent cefa63a commit 1f47d46

1 file changed

Lines changed: 6 additions & 11 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -949,10 +949,10 @@ fn add_merge_on_top(
949949
// - Usage of order preserving variants is not desirable
950950
// (determined by flag `config.optimizer.prefer_existing_sort`)
951951
let new_plan = if let Some(req) = input.plan.output_ordering() {
952-
Arc::new(SortPreservingMergeExec::new(
953-
req.clone(),
954-
Arc::clone(&input.plan),
955-
).with_fetch(*fetch)) as _
952+
Arc::new(
953+
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
954+
.with_fetch(*fetch),
955+
) as _
956956
} else {
957957
// If there is no input order, we can simply coalesce partitions:
958958
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
@@ -1406,13 +1406,8 @@ pub fn ensure_distribution(
14061406
// It was removed by `remove_dist_changing_operators`
14071407
// and we need to add it back.
14081408
if fetch.is_some() {
1409-
let ordering = plan
1410-
.output_ordering()
1411-
.cloned()
1412-
.unwrap_or_else(LexOrdering::default);
1413-
let plan = Arc::new(
1414-
SortPreservingMergeExec::new(ordering, plan).with_fetch(fetch.take()),
1415-
);
1409+
// It's safe to unwrap because `spm` is set only if `fetch` is set.
1410+
let plan = spm.unwrap().with_fetch(fetch.take()).unwrap();
14161411
optimized_distribution_ctx =
14171412
DistributionContext::new(plan, data, vec![optimized_distribution_ctx]);
14181413
}

0 commit comments

Comments
 (0)