You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
EnforceDistribution and EnforceSorting are currently implemented as two separate physical optimizer rules that run independently. This design is unique among major query engines -- Spark (EnsureRequirements), Presto/Trino (AddExchanges), and others all handle distribution and sorting in a single combined rule. The separation in DataFusion leads to a class of correctness bugs where one rule undoes the invariants established by the other, because sorting and distribution are fundamentally coupled (preserve_partitioning on SortExec directly affects output partition count).
This epic tracks the work to merge these two rules into a single EnsureRequirements-style rule that handles both distribution and sorting in one pass, eliminating the non-idempotent composition and the recurring bugs it causes.
Motivation
The fundamental coupling
SortExec has a preserve_partitioning flag that determines whether it outputs one partition (merging all inputs) or N partitions (sorting each independently). This means every sorting decision is also a distribution decision, and vice versa. Handling them in separate rules creates a semantic gap where each rule makes locally correct decisions that are globally incorrect.
Non-idempotent composition
Running EnforceDistribution followed by EnforceSorting does not produce a stable plan. Running the pair again can produce a different (and sometimes invalid) plan:
Round 1: EnforceDistribution -> fixes distribution
EnforceSorting -> pushdown_sorts breaks distribution
Round 2: EnforceDistribution -> fixes the NEW distribution violation
EnforceSorting -> pushdown_sorts breaks it AGAIN (different location)
This is particularly problematic for downstream projects that run custom optimizer rules between or after these passes (e.g., remote execution, materialized view selection), which necessitate additional rounds of EnforceSorting.
Real-world impact
We maintain a production system (Polygon.io Atlas) serving financial market data APIs built on DataFusion. The separation of these rules has caused multiple production incidents over the past months:
SanityCheckPlan failures on multi-partition StorageExec + GlobalLimitExec (April 2026): EnforceSorting's pushdown_sorts pushed a SortExec through an intermediate node onto a 32-partition input, setting preserve_partitioning=true without inserting SortPreservingMergeExec. GlobalLimitExec requires SinglePartition -> 502 errors for specific API users. Root cause: pushdown_sorts has no knowledge of distribution requirements.
All of these share the same root cause: two separate rules making independent decisions about coupled concerns.
How other engines solve this
Apache Spark: EnsureRequirements
Spark handles both in a single rule using transformUp:
// For each operator, in a single pass:// 1. Check requiredChildDistribution -> add ShuffleExchangeExec if needed// 2. Check requiredChildOrdering -> add SortExec if needed// Distribution is always resolved before sorting for the same operator.
Key design: sorting decisions are made AFTER distribution is finalized for each operator. SortExec(global=false) preserves partition boundaries. No separate "sort pushdown" pass exists.
Presto/Trino: AddExchanges
Presto's AddExchanges rule similarly handles both distribution and sorting properties in a single rule, using a PreferredProperties structure that carries both distribution and ordering preferences through the plan tree.
Proposed approach
Phase 1: Make pushdown_sorts distribution-aware (short-term fix)
Add a distribution_requirement field to ParentRequirements in pushdown_sorts, so that add_sort_above knows when to insert SortPreservingMergeExec. This is a targeted fix for the most critical bug.
Status: Implemented in our internal fork, ready to port upstream as part of this epic.
Phase 2: Add idempotency tests (validation)
Add tests that verify EnforceDistribution -> EnforceSorting produces a stable plan:
Test against various plan topologies: multi-partition sorts with limits, unions with mixed partition counts, projections over multi-partition sources, window functions, etc.
Phase 3: Merge into EnsureRequirements (architectural fix)
Create a new EnsureRequirements rule that replaces both EnforceDistribution and EnforceSorting:
pubstructEnsureRequirements;implPhysicalOptimizerRuleforEnsureRequirements{fnoptimize(&self,plan:Arc<dynExecutionPlan>,config:&ConfigOptions) -> Result<Arc<dynExecutionPlan>>{// Single bottom-up pass:// For each operator, ensure children satisfy both// requiredChildDistribution AND requiredChildOrdering.// Distribution is resolved before ordering for each operator.
plan.transform_up(|node| ensure_requirements(node, config))}}structRequirements{distribution:Distribution,ordering:Option<OrderingRequirements>,fetch:Option<usize>,}fn ensure_requirements(node:PlanContext<Requirements>) -> Result<Transformed<...>> {for(child, required_dist, required_ordering)inzip(children, distributions, orderings){// Step 1: Ensure distributionif !child.output_partitioning().satisfies(&required_dist){
child = add_exchange(child, required_dist);}// Step 2: Ensure ordering (distribution is already correct)if !child.output_ordering().satisfies(&required_ordering){
child = add_sort(child, required_ordering, required_dist);// dist-aware!}}}
Key properties:
Single pass: No separate pushdown_sorts that can undo distribution work
Distribution before sorting: For each operator, distribution is settled before sorting decisions, like Spark
Naturally idempotent: Running it twice produces the same plan because each operator's children are already correct after the first pass
Sort pushdown integrated: Instead of a separate top-down pass, sort pushdown is handled by the bottom-up pass recognizing when a child already satisfies ordering (no sort needed)
Migration path
EnsureRequirements can coexist with the old rules during development
Add a feature flag to switch between old and new behavior
Validate with the full DataFusion test suite + sqllogictest
Deprecate EnforceDistribution + EnforceSorting after stabilization
Sub-tasks
Phase 1: Port pushdown_sorts distribution fix upstream (will be submitted as upstream PR)
Phase 1: Port ensure_sorting distribution fix upstream (will be submitted as upstream PR)
Phase 2: Add idempotency test framework for physical optimizer rules
Phase 2: Add idempotency tests for EnforceDistribution + EnforceSorting composition
Phase 3: Design EnsureRequirements API and Requirements structure
Phase 3: Implement combined distribution + ordering enforcement in single bottom-up pass
Phase 3: Integrate sort pushdown into the bottom-up pass
Phase 3: Handle parallelize_sorts and replace_with_order_preserving_variants optimizations
Phase 3: Migrate OutputRequirements (add/remove) into the new rule
Summary
EnforceDistributionandEnforceSortingare currently implemented as two separate physical optimizer rules that run independently. This design is unique among major query engines -- Spark (EnsureRequirements), Presto/Trino (AddExchanges), and others all handle distribution and sorting in a single combined rule. The separation in DataFusion leads to a class of correctness bugs where one rule undoes the invariants established by the other, because sorting and distribution are fundamentally coupled (preserve_partitioningonSortExecdirectly affects output partition count).This epic tracks the work to merge these two rules into a single
EnsureRequirements-style rule that handles both distribution and sorting in one pass, eliminating the non-idempotent composition and the recurring bugs it causes.Motivation
The fundamental coupling
SortExechas apreserve_partitioningflag that determines whether it outputs one partition (merging all inputs) or N partitions (sorting each independently). This means every sorting decision is also a distribution decision, and vice versa. Handling them in separate rules creates a semantic gap where each rule makes locally correct decisions that are globally incorrect.Non-idempotent composition
Running
EnforceDistributionfollowed byEnforceSortingdoes not produce a stable plan. Running the pair again can produce a different (and sometimes invalid) plan:This is particularly problematic for downstream projects that run custom optimizer rules between or after these passes (e.g., remote execution, materialized view selection), which necessitate additional rounds of
EnforceSorting.Real-world impact
We maintain a production system (Polygon.io Atlas) serving financial market data APIs built on DataFusion. The separation of these rules has caused multiple production incidents over the past months:
SanityCheckPlanfailures on multi-partitionStorageExec+GlobalLimitExec(April 2026):EnforceSorting'spushdown_sortspushed aSortExecthrough an intermediate node onto a 32-partition input, settingpreserve_partitioning=truewithout insertingSortPreservingMergeExec.GlobalLimitExecrequiresSinglePartition-> 502 errors for specific API users. Root cause:pushdown_sortshas no knowledge of distribution requirements.InterleaveExec::with_new_childrenpanics (InterleaveExec::with_new_children panics when optimizer rewrites change children's partitioning #21826, April 2026): RunningEnforceDistributiontwice (which our custom optimizer chain does) causesInterleaveExeccreated in the first pass to panic when the second pass changes children's partitioning.Planning time explosion with materialized views (April 2026):
EnforceDistribution'sadjust_input_keys_orderingreturnsTransformed::yesunconditionally (EnforceDistribution: adjust_input_keys_ordering returns Transformed::yes unconditionally for non-join plans #21946), causing unnecessary plan tree rebuilds that trigger expensive cost re-evaluation inOneOfExec(materialized view candidate selection). EachEnforceSorting+EnforceDistributionround compounds this cost.Existing upstream issues (same root cause)
EnforceDistributiongenerates invalid plan #14150: "Bug: applying multiple timesEnforceDistributiongenerates invalid plan" -- runningEnforceDistributiontwice loses limitfetchvalues, producing wrong results.EnforceDistributionfails to inject necessaryRepartitionExecbetween aggregate operations.EnforceDistributionbreaks ordering established byEnforceSorting.InterleaveExec::with_new_childrenpanics when optimizer rewrites change children's partitioning" -- secondEnforceDistributionpass panics on plan created by first pass.All of these share the same root cause: two separate rules making independent decisions about coupled concerns.
How other engines solve this
Apache Spark:
EnsureRequirementsSpark handles both in a single rule using
transformUp:Key design: sorting decisions are made AFTER distribution is finalized for each operator.
SortExec(global=false)preserves partition boundaries. No separate "sort pushdown" pass exists.Presto/Trino:
AddExchangesPresto's
AddExchangesrule similarly handles both distribution and sorting properties in a single rule, using aPreferredPropertiesstructure that carries both distribution and ordering preferences through the plan tree.Proposed approach
Phase 1: Make
pushdown_sortsdistribution-aware (short-term fix)Add a
distribution_requirementfield toParentRequirementsinpushdown_sorts, so thatadd_sort_aboveknows when to insertSortPreservingMergeExec. This is a targeted fix for the most critical bug.Status: Implemented in our internal fork, ready to port upstream as part of this epic.
Phase 2: Add idempotency tests (validation)
Add tests that verify
EnforceDistribution -> EnforceSortingproduces a stable plan:Test against various plan topologies: multi-partition sorts with limits, unions with mixed partition counts, projections over multi-partition sources, window functions, etc.
Phase 3: Merge into
EnsureRequirements(architectural fix)Create a new
EnsureRequirementsrule that replaces bothEnforceDistributionandEnforceSorting:Key properties:
pushdown_sortsthat can undo distribution workMigration path
EnsureRequirementscan coexist with the old rules during developmentEnforceDistribution+EnforceSortingafter stabilizationSub-tasks
pushdown_sortsdistribution fix upstream (will be submitted as upstream PR)ensure_sortingdistribution fix upstream (will be submitted as upstream PR)EnforceDistribution+EnforceSortingcompositionEnsureRequirementsAPI andRequirementsstructureparallelize_sortsandreplace_with_order_preserving_variantsoptimizationsOutputRequirements(add/remove) into the new ruleReferences
EnsureRequirements: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scalaAddExchanges: https://github.com/prestodb/presto/wiki/New-OptimizerEnforceDistributiongenerates invalid plan #14150: MultipleEnforceDistributiongenerates invalid planInterleaveExec::with_new_childrenpanics from optimizer rewritesadjust_input_keys_orderingreturnsTransformed::yesunconditionally