Enhance sort pushdown logic to include distribution requirements for …#54
Merged
wyatt-herkamp merged 5 commits intobranch-52from May 1, 2026
Merged
Enhance sort pushdown logic to include distribution requirements for …#54wyatt-herkamp merged 5 commits intobranch-52from
wyatt-herkamp merged 5 commits intobranch-52from
Conversation
zhuqi-lucas
approved these changes
May 1, 2026
Collaborator
zhuqi-lucas
left a comment
There was a problem hiding this comment.
The fix looks correct , sort_pushdown was the missing code path from PR #53.
One suggestion: the two new tests only exercise pushdown_sorts in isolation. Would be good to add an end-to-end test that runs the full EnforceSorting::optimize (which calls ensure_sorting + pushdown_sorts + replace_with_order_preserving_variants sequentially) to make sure the interaction between all three stages doesn't regress. That's what bit us with PR #53 — the ensure_sorting path was fixed and tested, but the pushdown_sorts path was missed.
But we can do this as follow-up.
11 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Mirror PR #53's distribution-aware
add_sort_abovepattern at the twopushdown_sortscall sites it missed (sort_pushdown.rslines ~153 and ~207). Without this,EnforceSorting::pushdown_sortscan flip aSortExec'spreserve_partitioningfromfalsetotrueover a multi-partition input without inserting theSortPreservingMergeExecthat aSinglePartition-requiring parent (e.g.GlobalLimitExec) needs — producing a plan that failsSanityCheckPlan.Visible symptom (prod)
futures/vX/quotes502s for tickers UX and HRC on 2026-04-29:CL/ES and other tickers were unaffected.
Why the bug only hit UX/HRC
UX/HRC users have entitlements over 32 channels. The live tables for futures quotes have
in_list_partitioning = true, which turns a 32-elementchannel_id IN (…)filter into 32 parallelStorageExecpartitions. CL users have far fewer channel entitlements, soStorageExecproduces 1 partition and a single-partitionSortExecis trivially valid.Why PR #53 didn't fix it
PR #53 introduced
add_sort_above_with_distributionand wired it into 2 of the 4add_sort_abovecall sites (inensure_sortingandadjust_window_sort_removal). The two call sites insort_pushdown.rswere not migrated.pushdown_sortsis a top-down pass that removes the existingSortExec, walks toward the leaves, and reinserts aSortExecviaadd_sort_aboveat the deepest point it can. When the input has multiple partitions, the oldadd_sort_abovesetspreserve_partitioning=true(correct) but doesn't wrap the new sort inSortPreservingMergeExec(the bug).Per-pass diagnostic
From a synthetic minimal repro (
UnionExecover twoDataSourcepartitions, wrapped inOutputRequirementExec(SinglePartition, fetch=21)):After this fix:
What changed
ParentRequirements(sort_pushdown.rs): added adistribution_requirement: Distributionfield. Initialised inassign_initial_requirementsfromplan.required_input_distribution(). Propagated through the recursion the same wayordering_requirementis — preserving the grandparent's distribution requirement when an existingSortExecis removed, and re-deriving from the current node'srequired_input_distribution()when descending into non-sort children.pushdown_sorts_helper: switched the twoadd_sort_above(...)calls toadd_sort_above_with_distribution(..., &parent_distribution). The helper already knows to insertSortPreservingMergeExecwhen the parent requiresSinglePartitionand the input has multiple partitions.Regression fix (introduced by this PR's initial
45620e982commit): thedistribution_requirementwas incorrectly propagated through partition-merging nodes likeSortPreservingMergeExec. Two fixes applied:required_input_distribution()rather than combining it with the accumulatedparent_distribution.satisfy_parentbranch: when the current node already outputs a single partition, reseteffective_parent_disttoUnspecifiedDistributionbefore propagating to children.No changes to public API surface.
Tests
Added to
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:output_requirement_adds_merge_after_partition_preserving_sort(PR Fix sort enforcement for single-partition output #53) — end-to-endEnforceSorting+SanityCheckPlanonOutputRequirementExec(SinglePartition, fetch=21)overUnionExec; verifies thatSortPreservingMergeExecis correctly inserted.test_sort_pushdown_adds_spm_for_single_partition_requirement— positive test that directly callspushdown_sortsonOutputRequirementExec(SinglePartition) → ProjectionExec → RepartitionExec(10), verifying that bothSortExec(preserve_partitioning=true)andSortPreservingMergeExecare inserted when the sort is pushed through the projection.test_no_extra_spm_from_output_requirement_single_partition— regression test that directly callspushdown_sortson an already-optimalOutputRequirementExec(SinglePartition) → SortPreservingMergeExec → SortExec(preserve_partitioning=true) → RepartitionExec(10), verifying that no extraSortPreservingMergeExecis added.