Skip to content

Commit ba7e30e

Browse files
committed
Add EnsureRequirements: idempotent merged EnforceDistribution + EnforceSorting
## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR #53/#54 regression scenarios - #14150 fetch preservation across passes Closes: #14150 Part of: #21973
1 parent bb86364 commit ba7e30e

10 files changed

Lines changed: 2812 additions & 68 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,20 +74,19 @@ in multiple phases.
7474
| 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
7575
| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
7676
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
77-
| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
77+
| 6 | `EnsureRequirements` | - | Enforces both distribution and sorting requirements in a single idempotent rule (replaces EnforceDistribution + EnforceSorting). |
7878
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
79-
| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
80-
| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
81-
| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
82-
| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
83-
| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
84-
| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
85-
| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
86-
| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
87-
| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
88-
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
89-
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
90-
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
79+
| 8 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
80+
| 9 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
81+
| 10 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
82+
| 11 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
83+
| 12 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
84+
| 13 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
85+
| 14 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
86+
| 15 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
87+
| 16 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
88+
| 17 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
89+
| 18 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
90+
| 19 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
91+
| 20 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
92+
| 21 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,10 @@ fn preserving_order_enables_streaming(
970970
///
971971
/// Updated node with an execution plan, where the desired single distribution
972972
/// requirement is satisfied.
973-
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
973+
fn add_merge_on_top(
974+
input: DistributionContext,
975+
fetch: Option<usize>,
976+
) -> DistributionContext {
974977
// Apply only when the partition count is larger than one.
975978
if input.plan.output_partitioning().partition_count() > 1 {
976979
// When there is an existing ordering, we preserve ordering
@@ -979,14 +982,20 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
979982
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
980983
// - Usage of order preserving variants is not desirable
981984
// (determined by flag `config.optimizer.prefer_existing_sort`)
982-
let new_plan = if let Some(req) = input.plan.output_ordering() {
983-
Arc::new(SortPreservingMergeExec::new(
984-
req.clone(),
985-
Arc::clone(&input.plan),
986-
)) as _
985+
let new_plan: Arc<dyn ExecutionPlan> = if let Some(req) =
986+
input.plan.output_ordering()
987+
{
988+
let mut spm =
989+
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan));
990+
if let Some(f) = fetch {
991+
spm = spm.with_fetch(Some(f));
992+
}
993+
Arc::new(spm)
987994
} else {
988995
// If there is no input order, we can simply coalesce partitions:
989-
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
996+
Arc::new(
997+
CoalescePartitionsExec::new(Arc::clone(&input.plan)).with_fetch(fetch),
998+
)
990999
};
9911000

9921001
DistributionContext::new(new_plan, true, vec![input])
@@ -1012,20 +1021,41 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
10121021
/// ```text
10131022
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
10141023
/// ```
1024+
/// Returned by [`remove_dist_changing_operators`] to carry the fetch value
1025+
/// that may have been on a removed `SortPreservingMergeExec` or `CoalescePartitionsExec`.
1026+
struct RemovedDistOps {
1027+
context: DistributionContext,
1028+
/// The fetch value from the removed SPM/Coalesce, if any.
1029+
/// Must be re-applied when distribution operators are re-inserted.
1030+
removed_fetch: Option<usize>,
1031+
}
1032+
10151033
fn remove_dist_changing_operators(
10161034
mut distribution_context: DistributionContext,
1017-
) -> Result<DistributionContext> {
1035+
) -> Result<RemovedDistOps> {
1036+
let mut removed_fetch = None;
10181037
while is_repartition(&distribution_context.plan)
10191038
|| is_coalesce_partitions(&distribution_context.plan)
10201039
|| is_sort_preserving_merge(&distribution_context.plan)
10211040
{
1041+
// Preserve fetch from SPM or CoalescePartitions before removing (#14150).
1042+
if let Some(fetch) = distribution_context.plan.fetch() {
1043+
removed_fetch = Some(
1044+
removed_fetch
1045+
.map(|existing: usize| existing.min(fetch))
1046+
.unwrap_or(fetch),
1047+
);
1048+
}
10221049
// All of above operators have a single child. First child is only child.
10231050
// Remove any distribution changing operators at the beginning:
10241051
distribution_context = distribution_context.children.swap_remove(0);
10251052
// Note that they will be re-inserted later on if necessary or helpful.
10261053
}
10271054

1028-
Ok(distribution_context)
1055+
Ok(RemovedDistOps {
1056+
context: distribution_context,
1057+
removed_fetch,
1058+
})
10291059
}
10301060

10311061
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1219,11 +1249,16 @@ pub fn ensure_distribution(
12191249
let order_preserving_variants_desirable =
12201250
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
12211251

1222-
// Remove unnecessary repartition from the physical plan if any
1223-
let DistributionContext {
1224-
mut plan,
1225-
data,
1226-
children,
1252+
// Remove unnecessary repartition from the physical plan if any.
1253+
// Preserve fetch from removed SPM/Coalesce (#14150).
1254+
let RemovedDistOps {
1255+
context:
1256+
DistributionContext {
1257+
mut plan,
1258+
data,
1259+
children,
1260+
},
1261+
removed_fetch,
12271262
} = remove_dist_changing_operators(dist_context)?;
12281263

12291264
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
@@ -1359,7 +1394,7 @@ pub fn ensure_distribution(
13591394
// Satisfy the distribution requirement if it is unmet.
13601395
match &requirement {
13611396
Distribution::SinglePartition => {
1362-
child = add_merge_on_top(child);
1397+
child = add_merge_on_top(child, removed_fetch);
13631398
}
13641399
Distribution::HashPartitioned(exprs) => {
13651400
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
262262
/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan.
263263
/// Otherwise, by checking the requirement satisfaction searches for a replacement chance.
264264
/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`]
265-
fn replace_with_partial_sort(
265+
pub fn replace_with_partial_sort(
266266
plan: Arc<dyn ExecutionPlan>,
267267
) -> Result<Arc<dyn ExecutionPlan>> {
268268
let Some(sort_plan) = plan.downcast_ref::<SortExec>() else {

0 commit comments

Comments
 (0)