Skip to content

Enhance sort pushdown logic to include distribution requirements for …#54

Merged
wyatt-herkamp merged 5 commits intobranch-52from
fix-sort-issue
May 1, 2026
Merged

Enhance sort pushdown logic to include distribution requirements for …#54
wyatt-herkamp merged 5 commits intobranch-52from
fix-sort-issue

Conversation

@wyatt-herkamp
Copy link
Copy Markdown

@wyatt-herkamp wyatt-herkamp commented Apr 30, 2026

Summary

Mirror PR #53's distribution-aware add_sort_above pattern at the two pushdown_sorts call sites it missed (sort_pushdown.rs lines ~153 and ~207). Without this, EnforceSorting::pushdown_sorts can flip a SortExec's preserve_partitioning from false to true over a multi-partition input without inserting the SortPreservingMergeExec that a SinglePartition-requiring parent (e.g. GlobalLimitExec) needs — producing a plan that fails SanityCheckPlan.

Visible symptom (prod)

futures/vX/quotes 502s for tickers UX and HRC on 2026-04-29:

Internal server error: SanityCheckPlan
caused by
Error during planning: Plan: ["GlobalLimitExec: skip=0, fetch=21",
  "  SortExec: expr=[timestamp@1 DESC, report_sequence@3 DESC], preserve_partitioning=[true]",
  "    ProjectionE..."]
does not satisfy distribution requirements: SinglePartition.
Child-0 output partitioning: UnknownPartitioning(2)

CL/ES and other tickers were unaffected.

Why the bug only hit UX/HRC

UX/HRC users have entitlements over 32 channels. The live tables for futures quotes have in_list_partitioning = true, which turns a 32-element channel_id IN (…) filter into 32 parallel StorageExec partitions. CL users have far fewer channel entitlements, so StorageExec produces 1 partition and a single-partition SortExec is trivially valid.

Why PR #53 didn't fix it

PR #53 introduced add_sort_above_with_distribution and wired it into 2 of the 4 add_sort_above call sites (in ensure_sorting and adjust_window_sort_removal). The two call sites in sort_pushdown.rs were not migrated. pushdown_sorts is a top-down pass that removes the existing SortExec, walks toward the leaves, and reinserts a SortExec via add_sort_above at the deepest point it can. When the input has multiple partitions, the old add_sort_above sets preserve_partitioning=true (correct) but doesn't wrap the new sort in SortPreservingMergeExec (the bug).

Per-pass diagnostic

From a synthetic minimal repro (UnionExec over two DataSource partitions, wrapped in OutputRequirementExec(SinglePartition, fetch=21)):

After ensure_sorting:                             preserve_partitioning=[false]   OK
After parallelize_sorts:                          preserve_partitioning=[false]   OK
After replace_with_order_preserving_variants:     preserve_partitioning=[false]   OK
After pushdown_sorts:                             preserve_partitioning=[true]    BROKEN

After this fix:

After pushdown_sorts:
  OutputRequirementExec (SinglePartition)
    SortPreservingMergeExec               ← inserted by the fix
      SortExec(preserve_partitioning=true)
        UnionExec
          DataSourceExec (partition 0)
          DataSourceExec (partition 1)

What changed

ParentRequirements (sort_pushdown.rs): added a distribution_requirement: Distribution field. Initialised in assign_initial_requirements from plan.required_input_distribution(). Propagated through the recursion the same way ordering_requirement is — preserving the grandparent's distribution requirement when an existing SortExec is removed, and re-deriving from the current node's required_input_distribution() when descending into non-sort children.

pushdown_sorts_helper: switched the two add_sort_above(...) calls to add_sort_above_with_distribution(..., &parent_distribution). The helper already knows to insert SortPreservingMergeExec when the parent requires SinglePartition and the input has multiple partitions.

Regression fix (introduced by this PR's initial 45620e982 commit): the distribution_requirement was incorrectly propagated through partition-merging nodes like SortPreservingMergeExec. Two fixes applied:

  1. In the "no ordering to push" branch: use only the node's own required_input_distribution() rather than combining it with the accumulated parent_distribution.
  2. In the satisfy_parent branch: when the current node already outputs a single partition, reset effective_parent_dist to UnspecifiedDistribution before propagating to children.

No changes to public API surface.

Tests

Added to datafusion/core/tests/physical_optimizer/enforce_sorting.rs:

  • output_requirement_adds_merge_after_partition_preserving_sort (PR Fix sort enforcement for single-partition output #53) — end-to-end EnforceSorting + SanityCheckPlan on OutputRequirementExec(SinglePartition, fetch=21) over UnionExec; verifies that SortPreservingMergeExec is correctly inserted.
  • test_sort_pushdown_adds_spm_for_single_partition_requirement — positive test that directly calls pushdown_sorts on OutputRequirementExec(SinglePartition) → ProjectionExec → RepartitionExec(10), verifying that both SortExec(preserve_partitioning=true) and SortPreservingMergeExec are inserted when the sort is pushed through the projection.
  • test_no_extra_spm_from_output_requirement_single_partition — regression test that directly calls pushdown_sorts on an already-optimal OutputRequirementExec(SinglePartition) → SortPreservingMergeExec → SortExec(preserve_partitioning=true) → RepartitionExec(10), verifying that no extra SortPreservingMergeExec is added.

@github-actions github-actions Bot added the core label May 1, 2026
Copy link
Copy Markdown
Collaborator

@zhuqi-lucas zhuqi-lucas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks correct , sort_pushdown was the missing code path from PR #53.

One suggestion: the two new tests only exercise pushdown_sorts in isolation. Would be good to add an end-to-end test that runs the full EnforceSorting::optimize (which calls ensure_sorting + pushdown_sorts + replace_with_order_preserving_variants sequentially) to make sure the interaction between all three stages doesn't regress. That's what bit us with PR #53 — the ensure_sorting path was fixed and tested, but the pushdown_sorts path was missed.

But we can do this as follow-up.

@wyatt-herkamp wyatt-herkamp merged commit 7768d24 into branch-52 May 1, 2026
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants