Skip to content

Commit b99edb9

Browse files
committed
Add EnsureRequirements: idempotent merged EnforceDistribution + EnforceSorting
## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR #53/#54 regression scenarios - #14150 fetch preservation across passes Closes: #14150 Part of: #21973
1 parent bb86364 commit b99edb9

10 files changed

Lines changed: 2820 additions & 76 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,27 +67,26 @@ Rule order matters. The default pipeline may change between releases.
6767
The same rule name may appear more than once when the default pipeline runs it
6868
in multiple phases.
6969

70-
| order | rule | phase | summary |
71-
| ----- | ------------------------------ | ----------------------- | ------------------------------------------------------------------------------------------------------------ |
72-
| 1 | `OutputRequirements` | add phase | Adds helper nodes so output requirements survive later physical rewrites. |
73-
| 2 | `aggregate_statistics` | - | Uses exact source statistics to answer some aggregates without scanning data. |
74-
| 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
75-
| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
76-
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
77-
| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
78-
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
79-
| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
80-
| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
81-
| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
82-
| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
83-
| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
84-
| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
85-
| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
86-
| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
87-
| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
88-
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
89-
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
90-
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
70+
| order | rule | phase | summary |
71+
| ----- | ------------------------------ | ----------------------- | -------------------------------------------------------------------------------------------------------------------------------- |
72+
| 1 | `OutputRequirements` | add phase | Adds helper nodes so output requirements survive later physical rewrites. |
73+
| 2 | `aggregate_statistics` | - | Uses exact source statistics to answer some aggregates without scanning data. |
74+
| 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
75+
| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
76+
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
77+
| 6 | `EnsureRequirements` | - | Enforces both distribution and sorting requirements in a single idempotent rule (replaces EnforceDistribution + EnforceSorting). |
78+
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
79+
| 8 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
80+
| 9 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
81+
| 10 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
82+
| 11 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
83+
| 12 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
84+
| 13 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
85+
| 14 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
86+
| 15 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
87+
| 16 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
88+
| 17 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
89+
| 18 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
90+
| 19 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
91+
| 20 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
92+
| 21 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,10 @@ fn preserving_order_enables_streaming(
970970
///
971971
/// Updated node with an execution plan, where the desired single distribution
972972
/// requirement is satisfied.
973-
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
973+
fn add_merge_on_top(
974+
input: DistributionContext,
975+
fetch: Option<usize>,
976+
) -> DistributionContext {
974977
// Apply only when the partition count is larger than one.
975978
if input.plan.output_partitioning().partition_count() > 1 {
976979
// When there is an existing ordering, we preserve ordering
@@ -979,14 +982,20 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
979982
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
980983
// - Usage of order preserving variants is not desirable
981984
// (determined by flag `config.optimizer.prefer_existing_sort`)
982-
let new_plan = if let Some(req) = input.plan.output_ordering() {
983-
Arc::new(SortPreservingMergeExec::new(
984-
req.clone(),
985-
Arc::clone(&input.plan),
986-
)) as _
985+
let new_plan: Arc<dyn ExecutionPlan> = if let Some(req) =
986+
input.plan.output_ordering()
987+
{
988+
let mut spm =
989+
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan));
990+
if let Some(f) = fetch {
991+
spm = spm.with_fetch(Some(f));
992+
}
993+
Arc::new(spm)
987994
} else {
988995
// If there is no input order, we can simply coalesce partitions:
989-
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
996+
Arc::new(
997+
CoalescePartitionsExec::new(Arc::clone(&input.plan)).with_fetch(fetch),
998+
)
990999
};
9911000

9921001
DistributionContext::new(new_plan, true, vec![input])
@@ -1012,20 +1021,41 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
10121021
/// ```text
10131022
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
10141023
/// ```
1024+
/// Returned by [`remove_dist_changing_operators`] to carry the fetch value
1025+
/// that may have been on a removed `SortPreservingMergeExec` or `CoalescePartitionsExec`.
1026+
struct RemovedDistOps {
1027+
context: DistributionContext,
1028+
/// The fetch value from the removed SPM/Coalesce, if any.
1029+
/// Must be re-applied when distribution operators are re-inserted.
1030+
removed_fetch: Option<usize>,
1031+
}
1032+
10151033
fn remove_dist_changing_operators(
10161034
mut distribution_context: DistributionContext,
1017-
) -> Result<DistributionContext> {
1035+
) -> Result<RemovedDistOps> {
1036+
let mut removed_fetch = None;
10181037
while is_repartition(&distribution_context.plan)
10191038
|| is_coalesce_partitions(&distribution_context.plan)
10201039
|| is_sort_preserving_merge(&distribution_context.plan)
10211040
{
1041+
// Preserve fetch from SPM or CoalescePartitions before removing (#14150).
1042+
if let Some(fetch) = distribution_context.plan.fetch() {
1043+
removed_fetch = Some(
1044+
removed_fetch
1045+
.map(|existing: usize| existing.min(fetch))
1046+
.unwrap_or(fetch),
1047+
);
1048+
}
10221049
// All of above operators have a single child. First child is only child.
10231050
// Remove any distribution changing operators at the beginning:
10241051
distribution_context = distribution_context.children.swap_remove(0);
10251052
// Note that they will be re-inserted later on if necessary or helpful.
10261053
}
10271054

1028-
Ok(distribution_context)
1055+
Ok(RemovedDistOps {
1056+
context: distribution_context,
1057+
removed_fetch,
1058+
})
10291059
}
10301060

10311061
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1219,11 +1249,16 @@ pub fn ensure_distribution(
12191249
let order_preserving_variants_desirable =
12201250
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
12211251

1222-
// Remove unnecessary repartition from the physical plan if any
1223-
let DistributionContext {
1224-
mut plan,
1225-
data,
1226-
children,
1252+
// Remove unnecessary repartition from the physical plan if any.
1253+
// Preserve fetch from removed SPM/Coalesce (#14150).
1254+
let RemovedDistOps {
1255+
context:
1256+
DistributionContext {
1257+
mut plan,
1258+
data,
1259+
children,
1260+
},
1261+
removed_fetch,
12271262
} = remove_dist_changing_operators(dist_context)?;
12281263

12291264
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
@@ -1359,7 +1394,7 @@ pub fn ensure_distribution(
13591394
// Satisfy the distribution requirement if it is unmet.
13601395
match &requirement {
13611396
Distribution::SinglePartition => {
1362-
child = add_merge_on_top(child);
1397+
child = add_merge_on_top(child, removed_fetch);
13631398
}
13641399
Distribution::HashPartitioned(exprs) => {
13651400
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
262262
/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan.
263263
/// Otherwise, by checking the requirement satisfaction searches for a replacement chance.
264264
/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`]
265-
fn replace_with_partial_sort(
265+
pub fn replace_with_partial_sort(
266266
plan: Arc<dyn ExecutionPlan>,
267267
) -> Result<Arc<dyn ExecutionPlan>> {
268268
let Some(sort_plan) = plan.downcast_ref::<SortExec>() else {

0 commit comments

Comments
 (0)