Skip to content

[EPIC] Merge EnforceDistribution + EnforceSorting into a single EnsureRequirements rule for correctness and idempotency #21973

@zhuqi-lucas

Description

@zhuqi-lucas

Summary

EnforceDistribution and EnforceSorting are currently implemented as two separate physical optimizer rules that run independently. This design is unique among major query engines -- Spark (EnsureRequirements), Presto/Trino (AddExchanges), and others all handle distribution and sorting in a single combined rule. The separation in DataFusion leads to a class of correctness bugs where one rule undoes the invariants established by the other, because sorting and distribution are fundamentally coupled (preserve_partitioning on SortExec directly affects output partition count).

This epic tracks the work to merge these two rules into a single EnsureRequirements-style rule that handles both distribution and sorting in one pass, eliminating the non-idempotent composition and the recurring bugs it causes.

Motivation

The fundamental coupling

SortExec has a preserve_partitioning flag that determines whether it outputs one partition (merging all inputs) or N partitions (sorting each independently). This means every sorting decision is also a distribution decision, and vice versa. Handling them in separate rules creates a semantic gap where each rule makes locally correct decisions that are globally incorrect.

Non-idempotent composition

Running EnforceDistribution followed by EnforceSorting does not produce a stable plan. Running the pair again can produce a different (and sometimes invalid) plan:

Round 1: EnforceDistribution -> fixes distribution
         EnforceSorting      -> pushdown_sorts breaks distribution
         
Round 2: EnforceDistribution -> fixes the NEW distribution violation
         EnforceSorting      -> pushdown_sorts breaks it AGAIN (different location)

This is particularly problematic for downstream projects that run custom optimizer rules between or after these passes (e.g., remote execution, materialized view selection), which necessitate additional rounds of EnforceSorting.

Real-world impact

We maintain a production system (Polygon.io Atlas) serving financial market data APIs built on DataFusion. The separation of these rules has caused multiple production incidents over the past months:

  1. SanityCheckPlan failures on multi-partition StorageExec + GlobalLimitExec (April 2026): EnforceSorting's pushdown_sorts pushed a SortExec through an intermediate node onto a 32-partition input, setting preserve_partitioning=true without inserting SortPreservingMergeExec. GlobalLimitExec requires SinglePartition -> 502 errors for specific API users. Root cause: pushdown_sorts has no knowledge of distribution requirements.

  2. InterleaveExec::with_new_children panics (InterleaveExec::with_new_children panics when optimizer rewrites change children's partitioning #21826, April 2026): Running EnforceDistribution twice (which our custom optimizer chain does) causes InterleaveExec created in the first pass to panic when the second pass changes children's partitioning.

  3. Planning time explosion with materialized views (April 2026): EnforceDistribution's adjust_input_keys_ordering returns Transformed::yes unconditionally (EnforceDistribution: adjust_input_keys_ordering returns Transformed::yes unconditionally for non-join plans #21946), causing unnecessary plan tree rebuilds that trigger expensive cost re-evaluation in OneOfExec (materialized view candidate selection). Each EnforceSorting + EnforceDistribution round compounds this cost.

Existing upstream issues (same root cause)

All of these share the same root cause: two separate rules making independent decisions about coupled concerns.

How other engines solve this

Apache Spark: EnsureRequirements

Spark handles both in a single rule using transformUp:

// For each operator, in a single pass:
// 1. Check requiredChildDistribution -> add ShuffleExchangeExec if needed
// 2. Check requiredChildOrdering -> add SortExec if needed
// Distribution is always resolved before sorting for the same operator.

Key design: sorting decisions are made AFTER distribution is finalized for each operator. SortExec(global=false) preserves partition boundaries. No separate "sort pushdown" pass exists.

Presto/Trino: AddExchanges

Presto's AddExchanges rule similarly handles both distribution and sorting properties in a single rule, using a PreferredProperties structure that carries both distribution and ordering preferences through the plan tree.

Proposed approach

Phase 1: Make pushdown_sorts distribution-aware (short-term fix)

Add a distribution_requirement field to ParentRequirements in pushdown_sorts, so that add_sort_above knows when to insert SortPreservingMergeExec. This is a targeted fix for the most critical bug.

Status: Implemented in our internal fork, ready to port upstream as part of this epic.

Phase 2: Add idempotency tests (validation)

Add tests that verify EnforceDistribution -> EnforceSorting produces a stable plan:

fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
    let p1 = EnforceDistribution::optimize(plan)?;
    let p1 = EnforceSorting::optimize(p1)?;
    let p2 = EnforceDistribution::optimize(p1.clone())?;
    let p2 = EnforceSorting::optimize(p2)?;
    assert_eq!(display(p1), display(p2));
}

Test against various plan topologies: multi-partition sorts with limits, unions with mixed partition counts, projections over multi-partition sources, window functions, etc.

Phase 3: Merge into EnsureRequirements (architectural fix)

Create a new EnsureRequirements rule that replaces both EnforceDistribution and EnforceSorting:

pub struct EnsureRequirements;

impl PhysicalOptimizerRule for EnsureRequirements {
    fn optimize(&self, plan: Arc<dyn ExecutionPlan>, config: &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>> {
        // Single bottom-up pass:
        // For each operator, ensure children satisfy both
        // requiredChildDistribution AND requiredChildOrdering.
        // Distribution is resolved before ordering for each operator.
        plan.transform_up(|node| ensure_requirements(node, config))
    }
}

struct Requirements {
    distribution: Distribution,
    ordering: Option<OrderingRequirements>,
    fetch: Option<usize>,
}

fn ensure_requirements(node: PlanContext<Requirements>) -> Result<Transformed<...>> {
    for (child, required_dist, required_ordering) in zip(children, distributions, orderings) {
        // Step 1: Ensure distribution
        if !child.output_partitioning().satisfies(&required_dist) {
            child = add_exchange(child, required_dist);
        }
        // Step 2: Ensure ordering (distribution is already correct)
        if !child.output_ordering().satisfies(&required_ordering) {
            child = add_sort(child, required_ordering, required_dist); // dist-aware!
        }
    }
}

Key properties:

  • Single pass: No separate pushdown_sorts that can undo distribution work
  • Distribution before sorting: For each operator, distribution is settled before sorting decisions, like Spark
  • Naturally idempotent: Running it twice produces the same plan because each operator's children are already correct after the first pass
  • Sort pushdown integrated: Instead of a separate top-down pass, sort pushdown is handled by the bottom-up pass recognizing when a child already satisfies ordering (no sort needed)

Migration path

  1. EnsureRequirements can coexist with the old rules during development
  2. Add a feature flag to switch between old and new behavior
  3. Validate with the full DataFusion test suite + sqllogictest
  4. Deprecate EnforceDistribution + EnforceSorting after stabilization

Sub-tasks

  • Phase 1: Port pushdown_sorts distribution fix upstream (will be submitted as upstream PR)
  • Phase 1: Port ensure_sorting distribution fix upstream (will be submitted as upstream PR)
  • Phase 2: Add idempotency test framework for physical optimizer rules
  • Phase 2: Add idempotency tests for EnforceDistribution + EnforceSorting composition
  • Phase 3: Design EnsureRequirements API and Requirements structure
  • Phase 3: Implement combined distribution + ordering enforcement in single bottom-up pass
  • Phase 3: Integrate sort pushdown into the bottom-up pass
  • Phase 3: Handle parallelize_sorts and replace_with_order_preserving_variants optimizations
  • Phase 3: Migrate OutputRequirements (add/remove) into the new rule
  • Phase 3: Feature flag and migration path
  • Phase 3: Validate against full test suite

References

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions