Skip to content

Commit 689bc94

Browse files
authored
Fix sort enforcement for single-partition output (#53)
1 parent c7ba34f commit 689bc94

3 files changed

Lines changed: 108 additions & 12 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
5757
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
5858
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
5959
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
60+
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
6061
use datafusion_physical_optimizer::PhysicalOptimizerRule;
6162
use datafusion::prelude::*;
6263
use arrow::array::{Int32Array, RecordBatch};
@@ -417,6 +418,46 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
417418
Ok(())
418419
}
419420

421+
#[tokio::test]
422+
async fn output_requirement_adds_merge_after_partition_preserving_sort() -> Result<()> {
423+
let schema = create_test_schema()?;
424+
let input = union_exec(vec![memory_exec(&schema), memory_exec(&schema)]);
425+
let requirement = [PhysicalSortRequirement::new(
426+
col("nullable_col", &schema)?,
427+
Some(SortOptions::new(false, true)),
428+
)]
429+
.into();
430+
let physical_plan: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
431+
input,
432+
Some(OrderingRequirements::new(requirement)),
433+
Distribution::SinglePartition,
434+
Some(21),
435+
));
436+
437+
let config = ConfigOptions::new();
438+
let optimized_plan =
439+
EnforceSorting::new().optimize(Arc::clone(&physical_plan), &config)?;
440+
SanityCheckPlan::new().optimize(optimized_plan, &config)?;
441+
442+
let test = EnforceSortingTest::new(physical_plan);
443+
assert_snapshot!(test.run(), @r"
444+
Input Plan:
445+
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
446+
UnionExec
447+
DataSourceExec: partitions=1, partition_sizes=[0]
448+
DataSourceExec: partitions=1, partition_sizes=[0]
449+
450+
Optimized Plan:
451+
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
452+
SortPreservingMergeExec: [nullable_col@0 ASC], fetch=21
453+
SortExec: TopK(fetch=21), expr=[nullable_col@0 ASC], preserve_partitioning=[true]
454+
UnionExec
455+
DataSourceExec: partitions=1, partition_sizes=[0]
456+
DataSourceExec: partitions=1, partition_sizes=[0]
457+
");
458+
Ok(())
459+
}
460+
420461
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(
421462
repartition_sorts: bool,
422463
) -> Result<String> {

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ use crate::enforce_sorting::sort_pushdown::{
4949
};
5050
use crate::output_requirements::OutputRequirementExec;
5151
use crate::utils::{
52-
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
53-
is_repartition, is_sort, is_sort_preserving_merge, is_window,
52+
add_sort_above_with_check, add_sort_above_with_distribution, is_coalesce_partitions,
53+
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_window,
5454
};
5555

5656
use datafusion_common::Result;
@@ -489,6 +489,7 @@ pub fn ensure_sorting(
489489
};
490490

491491
let plan = &requirements.plan;
492+
let required_distributions = plan.required_input_distribution();
492493
let mut updated_children = vec![];
493494
for (idx, (required_ordering, mut child)) in plan
494495
.required_input_ordering()
@@ -506,13 +507,14 @@ pub fn ensure_sorting(
506507
if physical_ordering.is_some() {
507508
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
508509
}
509-
child = add_sort_above(
510+
child = add_sort_above_with_distribution(
510511
child,
511512
req,
512513
plan.as_any()
513514
.downcast_ref::<OutputRequirementExec>()
514515
.map(|output| output.fetch())
515516
.unwrap_or(None),
517+
&required_distributions[idx],
516518
);
517519
child = update_sort_ctx_children_data(child, true)?;
518520
}
@@ -609,13 +611,17 @@ fn analyze_immediate_sort_removal(
609611
fn adjust_window_sort_removal(
610612
mut window_tree: PlanWithCorrespondingSort,
611613
) -> Result<PlanWithCorrespondingSort> {
614+
let required_distribution = window_tree
615+
.plan
616+
.required_input_distribution()
617+
.swap_remove(0);
618+
let requires_single_partition =
619+
matches!(required_distribution, Distribution::SinglePartition);
620+
612621
// Window operators have a single child we need to adjust:
613622
let child_node = remove_corresponding_sort_from_sub_plan(
614623
window_tree.children.swap_remove(0),
615-
matches!(
616-
window_tree.plan.required_input_distribution()[0],
617-
Distribution::SinglePartition
618-
),
624+
requires_single_partition,
619625
)?;
620626
window_tree.children.push(child_node);
621627

@@ -647,7 +653,12 @@ fn adjust_window_sort_removal(
647653
// Satisfy the ordering requirement so that the window can run:
648654
let mut child_node = window_tree.children.swap_remove(0);
649655
if let Some(reqs) = reqs {
650-
child_node = add_sort_above(child_node, reqs.into_single(), None);
656+
child_node = add_sort_above_with_distribution(
657+
child_node,
658+
reqs.into_single(),
659+
None,
660+
&required_distribution,
661+
);
651662
}
652663
let child_plan = Arc::clone(&child_node.plan);
653664
window_tree.children.push(child_node);

datafusion/physical-optimizer/src/utils.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use datafusion_common::Result;
21-
use datafusion_physical_expr::{LexOrdering, LexRequirement};
21+
use datafusion_physical_expr::{Distribution, LexOrdering, LexRequirement};
2222
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2323
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2424
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -39,6 +39,33 @@ pub fn add_sort_above<T: Clone + Default>(
3939
node: PlanContext<T>,
4040
sort_requirements: LexRequirement,
4141
fetch: Option<usize>,
42+
) -> PlanContext<T> {
43+
add_sort_above_impl(node, sort_requirements, fetch, false)
44+
}
45+
46+
/// This utility function adds a `SortExec` above an operator according to the
47+
/// given ordering requirements. If the parent distribution requires a single
48+
/// input partition, it adds a `SortPreservingMergeExec` above the
49+
/// partition-preserving sort.
50+
pub fn add_sort_above_with_distribution<T: Clone + Default>(
51+
node: PlanContext<T>,
52+
sort_requirements: LexRequirement,
53+
fetch: Option<usize>,
54+
required_distribution: &Distribution,
55+
) -> PlanContext<T> {
56+
add_sort_above_impl(
57+
node,
58+
sort_requirements,
59+
fetch,
60+
matches!(required_distribution, Distribution::SinglePartition),
61+
)
62+
}
63+
64+
fn add_sort_above_impl<T: Clone + Default>(
65+
node: PlanContext<T>,
66+
sort_requirements: LexRequirement,
67+
fetch: Option<usize>,
68+
requires_single_partition: bool,
4269
) -> PlanContext<T> {
4370
let mut sort_reqs: Vec<_> = sort_requirements.into();
4471
sort_reqs.retain(|sort_expr| {
@@ -51,11 +78,28 @@ pub fn add_sort_above<T: Clone + Default>(
5178
let Some(ordering) = LexOrdering::new(sort_exprs) else {
5279
return node;
5380
};
54-
let mut new_sort = SortExec::new(ordering, Arc::clone(&node.plan)).with_fetch(fetch);
55-
if node.plan.output_partitioning().partition_count() > 1 {
81+
let input_has_multiple_partitions =
82+
node.plan.output_partitioning().partition_count() > 1;
83+
84+
let mut new_sort =
85+
SortExec::new(ordering.clone(), Arc::clone(&node.plan)).with_fetch(fetch);
86+
if input_has_multiple_partitions {
5687
new_sort = new_sort.with_preserve_partitioning(true);
5788
}
58-
PlanContext::new(Arc::new(new_sort), T::default(), vec![node])
89+
90+
let sort_node = PlanContext::new(Arc::new(new_sort), T::default(), vec![node]);
91+
if !(requires_single_partition && input_has_multiple_partitions) {
92+
return sort_node;
93+
}
94+
95+
PlanContext::new(
96+
Arc::new(
97+
SortPreservingMergeExec::new(ordering, Arc::clone(&sort_node.plan))
98+
.with_fetch(fetch),
99+
),
100+
T::default(),
101+
vec![sort_node],
102+
)
59103
}
60104

61105
/// This utility function adds a `SortExec` above an operator according to the

0 commit comments

Comments
 (0)