Skip to content

Commit 76075e2

Browse files
authored
Preserve SPM when parent maintains input order (#21097)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21096 ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Updates `enforce_distribution.rs` - Adds tests ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> N/A
1 parent 98defe6 commit 76075e2

File tree

2 files changed

+181
-2
lines changed

2 files changed

+181
-2
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use datafusion_physical_plan::aggregates::{
5959
AggregateExec, AggregateMode, PhysicalGroupBy,
6060
};
6161

62+
use datafusion_physical_expr::Distribution;
6263
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
6364
use datafusion_physical_plan::execution_plan::ExecutionPlan;
6465
use datafusion_physical_plan::expressions::col;
@@ -227,6 +228,106 @@ impl ExecutionPlan for SortRequiredExec {
227228
}
228229
}
229230

231+
#[derive(Debug)]
232+
struct SinglePartitionMaintainsOrderExec {
233+
input: Arc<dyn ExecutionPlan>,
234+
cache: Arc<PlanProperties>,
235+
}
236+
237+
impl SinglePartitionMaintainsOrderExec {
238+
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
239+
let cache = Self::compute_properties(&input);
240+
Self {
241+
input,
242+
cache: Arc::new(cache),
243+
}
244+
}
245+
246+
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
247+
PlanProperties::new(
248+
input.equivalence_properties().clone(),
249+
input.output_partitioning().clone(),
250+
input.pipeline_behavior(),
251+
input.boundedness(),
252+
)
253+
}
254+
}
255+
256+
impl DisplayAs for SinglePartitionMaintainsOrderExec {
257+
fn fmt_as(
258+
&self,
259+
t: DisplayFormatType,
260+
f: &mut std::fmt::Formatter,
261+
) -> std::fmt::Result {
262+
match t {
263+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
264+
write!(f, "SinglePartitionMaintainsOrderExec")
265+
}
266+
DisplayFormatType::TreeRender => write!(f, ""),
267+
}
268+
}
269+
}
270+
271+
impl ExecutionPlan for SinglePartitionMaintainsOrderExec {
272+
fn name(&self) -> &'static str {
273+
"SinglePartitionMaintainsOrderExec"
274+
}
275+
276+
fn as_any(&self) -> &dyn std::any::Any {
277+
self
278+
}
279+
280+
fn properties(&self) -> &Arc<PlanProperties> {
281+
&self.cache
282+
}
283+
284+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
285+
vec![&self.input]
286+
}
287+
288+
fn required_input_distribution(&self) -> Vec<Distribution> {
289+
vec![Distribution::SinglePartition]
290+
}
291+
292+
fn maintains_input_order(&self) -> Vec<bool> {
293+
vec![true]
294+
}
295+
296+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
297+
vec![false]
298+
}
299+
300+
fn with_new_children(
301+
self: Arc<Self>,
302+
mut children: Vec<Arc<dyn ExecutionPlan>>,
303+
) -> Result<Arc<dyn ExecutionPlan>> {
304+
assert_eq!(children.len(), 1);
305+
let child = children.pop().unwrap();
306+
Ok(Arc::new(Self::new(child)))
307+
}
308+
309+
fn apply_expressions(
310+
&self,
311+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
312+
) -> Result<TreeNodeRecursion> {
313+
Ok(TreeNodeRecursion::Continue)
314+
}
315+
316+
fn execute(
317+
&self,
318+
_partition: usize,
319+
_context: Arc<datafusion::execution::context::TaskContext>,
320+
) -> Result<datafusion_physical_plan::SendableRecordBatchStream> {
321+
unreachable!();
322+
}
323+
}
324+
325+
fn single_partition_maintains_order_exec(
326+
input: Arc<dyn ExecutionPlan>,
327+
) -> Arc<dyn ExecutionPlan> {
328+
Arc::new(SinglePartitionMaintainsOrderExec::new(input))
329+
}
330+
230331
fn parquet_exec() -> Arc<DataSourceExec> {
231332
parquet_exec_with_sort(schema(), vec![])
232333
}
@@ -3681,3 +3782,76 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
36813782

36823783
Ok(())
36833784
}
3785+
3786+
/// When a parent requires SinglePartition and maintains input order, order-preserving
3787+
/// variants (e.g. SortPreservingMergeExec) should be kept so that ordering can
3788+
/// propagate to ancestors. Replacing them with CoalescePartitionsExec would destroy
3789+
/// ordering and force unnecessary sorts later.
3790+
#[test]
3791+
fn maintains_order_preserves_spm_for_single_partition() -> Result<()> {
3792+
let schema = schema();
3793+
let sort_key: LexOrdering = [PhysicalSortExpr {
3794+
expr: col("c", &schema)?,
3795+
options: SortOptions::default(),
3796+
}]
3797+
.into();
3798+
3799+
// GlobalLimitExec -> LocalLimitExec -> sorted multi-partition parquet
3800+
let plan: Arc<dyn ExecutionPlan> =
3801+
limit_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
3802+
3803+
// Test EnforceDistribution in isolation: SPM should be preserved because
3804+
// GlobalLimitExec maintains input order.
3805+
let result = ensure_distribution_helper(plan, 10, false)?;
3806+
assert_plan!(result,
3807+
@r"
3808+
GlobalLimitExec: skip=0, fetch=100
3809+
SortPreservingMergeExec: [c@2 ASC]
3810+
LocalLimitExec: fetch=100
3811+
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet
3812+
");
3813+
3814+
Ok(())
3815+
}
3816+
3817+
/// Tests the cascading effect through a UnionExec with the full optimizer
3818+
/// pipeline and `prefer_existing_sort=true`. Each Union branch has an operator
3819+
/// that requires SinglePartition and maintains input order. SortPreservingMergeExec
3820+
/// should be preserved in each branch, allowing ordering to flow through to the
3821+
/// ancestor SortRequiredExec.
3822+
#[test]
3823+
fn maintains_order_preserves_spm_through_union_with_prefer_existing_sort() -> Result<()> {
3824+
let schema = schema();
3825+
let sort_key: LexOrdering = [PhysicalSortExpr {
3826+
expr: col("c", &schema)?,
3827+
options: SortOptions::default(),
3828+
}]
3829+
.into();
3830+
3831+
let branch1 =
3832+
single_partition_maintains_order_exec(parquet_exec_multiple_sorted(vec![
3833+
sort_key.clone(),
3834+
]));
3835+
let branch2 =
3836+
single_partition_maintains_order_exec(parquet_exec_multiple_sorted(vec![
3837+
sort_key.clone(),
3838+
]));
3839+
let plan = sort_required_exec_with_req(union_exec(vec![branch1, branch2]), sort_key);
3840+
3841+
let test_config = TestConfig::default().with_prefer_existing_sort();
3842+
3843+
let plan_distrib = test_config.to_plan(plan.clone(), &DISTRIB_DISTRIB_SORT);
3844+
assert_plan!(plan_distrib,
3845+
@r"
3846+
SortRequiredExec: [c@2 ASC]
3847+
UnionExec
3848+
SinglePartitionMaintainsOrderExec
3849+
SortPreservingMergeExec: [c@2 ASC]
3850+
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet
3851+
SinglePartitionMaintainsOrderExec
3852+
SortPreservingMergeExec: [c@2 ASC]
3853+
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet
3854+
");
3855+
3856+
Ok(())
3857+
}

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,8 +1376,13 @@ pub fn ensure_distribution(
13761376
match requirement {
13771377
// Operator requires specific distribution.
13781378
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1379-
// Since there is no ordering requirement, preserving ordering is pointless
1380-
child = replace_order_preserving_variants(child)?;
1379+
// If the parent doesn't maintain input order, preserving
1380+
// ordering is pointless. However, if it does maintain
1381+
// input order, we keep order-preserving variants so
1382+
// ordering can flow through to ancestors that need it.
1383+
if !maintains {
1384+
child = replace_order_preserving_variants(child)?;
1385+
}
13811386
}
13821387
Distribution::UnspecifiedDistribution => {
13831388
// Since ordering is lost, trying to preserve ordering is pointless

0 commit comments

Comments
 (0)