Skip to content

Commit 553a1b0

Browse files
committed
Fix sort enforcement for single-partition output
1 parent c7ba34f commit 553a1b0

3 files changed

Lines changed: 98 additions & 11 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
5757
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
5858
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
5959
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
60+
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
6061
use datafusion_physical_optimizer::PhysicalOptimizerRule;
6162
use datafusion::prelude::*;
6263
use arrow::array::{Int32Array, RecordBatch};
@@ -417,6 +418,46 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
417418
Ok(())
418419
}
419420

421+
#[tokio::test]
422+
async fn output_requirement_adds_merge_after_partition_preserving_sort() -> Result<()> {
423+
let schema = create_test_schema()?;
424+
let input = union_exec(vec![memory_exec(&schema), memory_exec(&schema)]);
425+
let requirement = [PhysicalSortRequirement::new(
426+
col("nullable_col", &schema)?,
427+
Some(SortOptions::new(false, true)),
428+
)]
429+
.into();
430+
let physical_plan: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
431+
input,
432+
Some(OrderingRequirements::new(requirement)),
433+
Distribution::SinglePartition,
434+
Some(21),
435+
));
436+
437+
let config = ConfigOptions::new();
438+
let optimized_plan =
439+
EnforceSorting::new().optimize(Arc::clone(&physical_plan), &config)?;
440+
SanityCheckPlan::new().optimize(optimized_plan, &config)?;
441+
442+
let test = EnforceSortingTest::new(physical_plan);
443+
assert_snapshot!(test.run(), @r"
444+
Input Plan:
445+
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
446+
UnionExec
447+
DataSourceExec: partitions=1, partition_sizes=[0]
448+
DataSourceExec: partitions=1, partition_sizes=[0]
449+
450+
Optimized Plan:
451+
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
452+
SortPreservingMergeExec: [nullable_col@0 ASC], fetch=21
453+
SortExec: TopK(fetch=21), expr=[nullable_col@0 ASC], preserve_partitioning=[true]
454+
UnionExec
455+
DataSourceExec: partitions=1, partition_sizes=[0]
456+
DataSourceExec: partitions=1, partition_sizes=[0]
457+
");
458+
Ok(())
459+
}
460+
420461
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(
421462
repartition_sorts: bool,
422463
) -> Result<String> {

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ use crate::enforce_sorting::sort_pushdown::{
4949
};
5050
use crate::output_requirements::OutputRequirementExec;
5151
use crate::utils::{
52-
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
53-
is_repartition, is_sort, is_sort_preserving_merge, is_window,
52+
add_sort_above_with_check, add_sort_above_with_required_single,
53+
is_coalesce_partitions, is_limit, is_repartition, is_sort, is_sort_preserving_merge,
54+
is_window,
5455
};
5556

5657
use datafusion_common::Result;
@@ -489,6 +490,7 @@ pub fn ensure_sorting(
489490
};
490491

491492
let plan = &requirements.plan;
493+
let required_distributions = plan.required_input_distribution();
492494
let mut updated_children = vec![];
493495
for (idx, (required_ordering, mut child)) in plan
494496
.required_input_ordering()
@@ -506,13 +508,14 @@ pub fn ensure_sorting(
506508
if physical_ordering.is_some() {
507509
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
508510
}
509-
child = add_sort_above(
511+
child = add_sort_above_with_required_single(
510512
child,
511513
req,
512514
plan.as_any()
513515
.downcast_ref::<OutputRequirementExec>()
514516
.map(|output| output.fetch())
515517
.unwrap_or(None),
518+
matches!(required_distributions[idx], Distribution::SinglePartition),
516519
);
517520
child = update_sort_ctx_children_data(child, true)?;
518521
}
@@ -609,13 +612,15 @@ fn analyze_immediate_sort_removal(
609612
fn adjust_window_sort_removal(
610613
mut window_tree: PlanWithCorrespondingSort,
611614
) -> Result<PlanWithCorrespondingSort> {
615+
let requires_single_partition = matches!(
616+
window_tree.plan.required_input_distribution()[0],
617+
Distribution::SinglePartition
618+
);
619+
612620
// Window operators have a single child we need to adjust:
613621
let child_node = remove_corresponding_sort_from_sub_plan(
614622
window_tree.children.swap_remove(0),
615-
matches!(
616-
window_tree.plan.required_input_distribution()[0],
617-
Distribution::SinglePartition
618-
),
623+
requires_single_partition,
619624
)?;
620625
window_tree.children.push(child_node);
621626

@@ -647,7 +652,12 @@ fn adjust_window_sort_removal(
647652
// Satisfy the ordering requirement so that the window can run:
648653
let mut child_node = window_tree.children.swap_remove(0);
649654
if let Some(reqs) = reqs {
650-
child_node = add_sort_above(child_node, reqs.into_single(), None);
655+
child_node = add_sort_above_with_required_single(
656+
child_node,
657+
reqs.into_single(),
658+
None,
659+
requires_single_partition,
660+
);
651661
}
652662
let child_plan = Arc::clone(&child_node.plan);
653663
window_tree.children.push(child_node);

datafusion/physical-optimizer/src/utils.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,28 @@ pub fn add_sort_above<T: Clone + Default>(
3939
node: PlanContext<T>,
4040
sort_requirements: LexRequirement,
4141
fetch: Option<usize>,
42+
) -> PlanContext<T> {
43+
add_sort_above_impl(node, sort_requirements, fetch, false)
44+
}
45+
46+
/// This utility function adds a `SortExec` above an operator according to the
47+
/// given ordering requirements. If the parent also needs a single input
48+
/// partition, it adds a `SortPreservingMergeExec` above the partition-preserving
49+
/// sort.
50+
pub fn add_sort_above_with_required_single<T: Clone + Default>(
51+
node: PlanContext<T>,
52+
sort_requirements: LexRequirement,
53+
fetch: Option<usize>,
54+
requires_single_partition: bool,
55+
) -> PlanContext<T> {
56+
add_sort_above_impl(node, sort_requirements, fetch, requires_single_partition)
57+
}
58+
59+
fn add_sort_above_impl<T: Clone + Default>(
60+
node: PlanContext<T>,
61+
sort_requirements: LexRequirement,
62+
fetch: Option<usize>,
63+
requires_single_partition: bool,
4264
) -> PlanContext<T> {
4365
let mut sort_reqs: Vec<_> = sort_requirements.into();
4466
sort_reqs.retain(|sort_expr| {
@@ -51,11 +73,25 @@ pub fn add_sort_above<T: Clone + Default>(
5173
let Some(ordering) = LexOrdering::new(sort_exprs) else {
5274
return node;
5375
};
54-
let mut new_sort = SortExec::new(ordering, Arc::clone(&node.plan)).with_fetch(fetch);
55-
if node.plan.output_partitioning().partition_count() > 1 {
76+
let input_partition_count = node.plan.output_partitioning().partition_count();
77+
let mut new_sort =
78+
SortExec::new(ordering.clone(), Arc::clone(&node.plan)).with_fetch(fetch);
79+
if input_partition_count > 1 {
5680
new_sort = new_sort.with_preserve_partitioning(true);
5781
}
58-
PlanContext::new(Arc::new(new_sort), T::default(), vec![node])
82+
let sort_node = PlanContext::new(Arc::new(new_sort), T::default(), vec![node]);
83+
if requires_single_partition && input_partition_count > 1 {
84+
PlanContext::new(
85+
Arc::new(
86+
SortPreservingMergeExec::new(ordering, Arc::clone(&sort_node.plan))
87+
.with_fetch(fetch),
88+
),
89+
T::default(),
90+
vec![sort_node],
91+
)
92+
} else {
93+
sort_node
94+
}
5995
}
6096

6197
/// This utility function adds a `SortExec` above an operator according to the

0 commit comments

Comments
 (0)