Skip to content

Commit fb1c0f3

Browse files
authored
Add EnsureRequirements: merged EnforceDistribution + EnforceSorting with idempotent pushdown_sorts (apache#21976)
## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. Fix `pushdown_sorts` to be distribution-aware and fix the `SortPreservingMergeExec` / `CoalescePartitionsExec` fetch preservation issue from apache#14150, making the composition idempotent. **Epic**: apache#21973 Closes: apache#14150 ## Problem `EnforceDistribution` and `EnforceSorting` run as separate rules, but sorting and distribution are coupled through `SortExec.preserve_partitioning`. This caused: 1. **`SanityCheckPlan` validation failures on multi-partition sort + limit** — `pushdown_sorts` set `preserve_partitioning=true` on multi-partition input without inserting `SortPreservingMergeExec`, violating the `SinglePartition` requirement coming from `GlobalLimitExec`. 2. **Non-idempotent composition** — running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values (apache#14150)** — `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` / `CoalescePartitionsExec` when stripping and re-adding distribution operators. DataFusion was the only major query engine with separate rules — Spark (`EnsureRequirements`) and Presto/Trino (`AddExchanges`) handle both in a single rule. ## Changes ### 1. `EnsureRequirements` rule (new, replaces `EnforceDistribution` + `EnforceSorting` in the default chain) - Single `PhysicalOptimizerRule` that calls the distribution + sorting helpers in one coordinated bottom-up sequence. - Registered in place of `Arc::new(EnforceDistribution) + Arc::new(EnforceSorting)` in the default optimizer chain. - Comprehensive inline tests covering known bug topologies + idempotency verification. ### 2. Distribution-aware `pushdown_sorts` (`sort_pushdown.rs`) - Add `distribution_requirement: Distribution` field to `ParentRequirements`. - New `add_sort_above_with_distribution()` in `utils.rs` — inserts `SortPreservingMergeExec` when the parent requires `SinglePartition` and the input has multiple partitions. - Switch both `add_sort_above` call sites to the distribution-aware variant. - Propagate distribution through recursion with a `stronger_distribution()` helper. - Reset distribution below partition-merging nodes (SPM, single-partition outputs). ### 3. Fix fetch preservation in distribution enforcement (apache#14150) - `remove_dist_changing_operators()` now saves `fetch` from removed SPM / Coalesce nodes. - `add_merge_on_top()` re-applies the saved `fetch` to re-created operators. ### 4. Retire the old rule entry points; retarget existing tests After review feedback from @alamb (apache#21976 (comment)), the rule structs and their `impl PhysicalOptimizerRule` blocks have been deleted from `enforce_distribution.rs` and `enforce_sorting/mod.rs`. The internal helpers (`ensure_distribution`, `ensure_sorting`, the contexts, `parallelize_sorts`, `replace_with_order_preserving_variants`, `sort_pushdown`, …) stay in place — `EnsureRequirements` calls them directly. The existing integration tests in `core/tests/physical_optimizer/` now exercise `EnsureRequirements` instead of the deleted rules: - `enforce_distribution.rs` — `Run::Distribution` / `Run::Sorting` branches both call `EnsureRequirements::new()`. Legacy run sequences (`DISTRIB_DISTRIB_SORT`, `SORT_DISTRIB_DISTRIB`) are preserved verbatim; idempotency makes the previously-different orderings converge to the same plan. - `enforce_sorting.rs` — `EnforceSortingTest` drives `EnsureRequirements::new()` and pins `target_partitions = 10` so snapshots are deterministic across machines. The historical `[Dist, Sort]` vs `[Sort, Dist, Sort]` comparison is rewritten as "running `EnsureRequirements` N times == running it once". - `enforce_sorting_monotonicity.rs` / `replace_with_order_preserving_variants.rs` — driven through the same test framework; only snapshots updated. A previously-separate `ensure_requirements/new_tests.rs` (added in an earlier iteration of this PR) is removed; the same coverage lives in the inline tests in `ensure_requirements/mod.rs`. ### 5. Updated SLT - `explain.slt`: `EnforceDistribution` + `EnforceSorting` collapse to `EnsureRequirements` in `EXPLAIN VERBOSE` output. ## Snapshot drift ~78 snapshots in the retargeted tests refreshed. The consistent pattern is `SortExec + CoalescePartitionsExec` (blocking) → `SortPreservingMergeExec` (streaming), because `EnsureRequirements` now runs `parallelize_sorts` + `replace_with_order_preserving_variants` on plan shapes that the single-rule path used to miss. These are improvements, not regressions — but worth a careful look in review since they are visible in the diff. ## Testing | Suite | Result | |-------|--------| | `datafusion-physical-optimizer` (lib, inline tests) | **59 passed** | | `core_integration physical_optimizer::` | **454 passed** | | `cargo clippy --all-targets -- -D warnings` | clean | | `cargo fmt --all --check` | clean | ### Idempotency / regression coverage in the inline tests | Scenario | Covered | |----------|---------| | Multi-partition sort + limit (1-64 partitions) | yes | | Union with mixed partition counts | yes | | Projection over multi-partition | yes | | HashJoin (Partitioned) | yes | | SortMergeJoin | yes | | Window function partitioning + ordering | yes | | Aggregate (Partial + FinalPartitioned) | yes | | Nested sort + limit | yes | | Hash repartition + sort | yes | | CoalescePartitions + sort (`parallelize_sorts`) | yes | | SPM → Sort → multi-partition | yes | | `OutputRequirementExec` + `SinglePartition` over multi-partition source | yes | | `ProjectionExec` + multi-partition + `SinglePartition` requirement | yes | | apache#14150 fetch preservation across passes | yes | | Triple optimization convergence | yes | | 10× consecutive optimization stability | yes | ## Architecture ``` EnsureRequirements::optimize(plan) Phase 1: join key reordering (top-down) — adjust_input_keys_ordering or reorder_join_keys_to_inputs depending on config. Phase 2: distribution enforcement (bottom-up) — ensure_distribution Fetch is preserved across SPM/Coalesce strip/re-add (apache#14150 fix). Phase 3: sort enforcement (bottom-up) — ensure_sorting Phase 4: parallelize_sorts (bottom-up, when repartition_sorts is on) Phase 5: replace_with_order_preserving_variants (bottom-up) Phase 6: pushdown_sorts (top-down, distribution-aware) Phase 7: replace_with_partial_sort (bottom-up) ``` Idempotent because: - `pushdown_sorts` now carries `distribution_requirement` and uses `add_sort_above_with_distribution`, so the second pass never re-violates an earlier-established `SinglePartition` requirement. - Distribution enforcement preserves `fetch` across strip/re-add cycles. - Running `EnsureRequirements` repeatedly converges (verified across the partition-count sweep, hash-join, sort-merge join, window, projection, and apache#14150 regression tests). ## Next steps (future PRs) - Gradually fold `pushdown_sorts` work into the bottom-up `ensure_sorting` pass. - Eliminate the separate top-down `pushdown_sorts` traversal. - Single-pass architecture (one `transform_up` for both distribution + sorting, like Spark's `EnsureRequirements`).
1 parent e27b6c6 commit fb1c0f3

16 files changed

Lines changed: 2307 additions & 701 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,19 @@ in multiple phases.
7575
| 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
7676
| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
7777
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
78-
| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
78+
| 6 | `EnsureRequirements` | - | Enforces both distribution and sorting requirements in a single idempotent rule. |
7979
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
80-
| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
81-
| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
82-
| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
83-
| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
84-
| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
85-
| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
86-
| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
87-
| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
88-
| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
89-
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
90-
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
91-
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
92-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
93-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
94-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
80+
| 8 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
81+
| 9 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
82+
| 10 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
83+
| 11 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
84+
| 12 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
85+
| 13 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
86+
| 14 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
87+
| 15 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
88+
| 16 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
89+
| 17 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
90+
| 18 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91+
| 19 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92+
| 20 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93+
| 21 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

0 commit comments

Comments
 (0)