Skip to content

Commit 1875365

Browse files
zhuqi-lucasadriangbclaude
authored
[branch-54] Cherry-pick #22493: restore SortExec elimination after stats-based file reorder (#22501)
## Which issue does this PR close? Cherry-pick of #22493 onto `branch-54`. ## Rationale for this change `branch-54` includes #21956 (`feat: globally reorder files and row groups by statistics for TopK queries`), which introduced a regression: for plain-column, multi-file scans where the on-disk file order does not match the declared sort order, `SortExec` was no longer eliminated even when stats-based reorder produced non-overlapping file groups whose declared ordering re-validated. #22493 restores the pre-#21956 sort-elimination behaviour by re-validating `output_ordering` after `rebuild_with_source` reorders files, and (per @adriangb's correctness follow-up) restoring the original hint-free `file_source` on the Inexact→Exact upgrade so leftover `reverse_row_groups` / `sort_order_for_reorder` hints don't mis-order row groups within a single file once the `SortExec` safety net is gone. ## What changes are included in this PR? Straight cherry-pick of merge commit `94c58d086`. Includes: - `FileScanConfig::try_pushdown_sort` Inexact arm: re-validate, upgrade to Exact (with file_source restore), guard with NULL safety + early-return - `rebuild_with_source`: `match (all_non_overlapping, is_exact)` decision table for keep_ordering - SLT updates restoring `SortExec` elimination expectations + Tests 5b/5c/8b for the NULL-safety and same-min row-group edge cases ## Are these changes tested? Cherry-picked cleanly (auto-merge in `sort_pushdown.rs`). `cargo build -p datafusion-datasource` — passes. `cargo test -p datafusion-sqllogictest --test sqllogictests -- sort_pushdown` — passes. ## Are there any user-facing changes? Same as #22493: plain-column wrong-order-files cases regain SortExec elimination when files happen to be non-overlapping by statistics. No new API. Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a4a0804 commit 1875365

3 files changed

Lines changed: 343 additions & 49 deletions

File tree

datafusion/datasource/src/file_scan_config/mod.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -937,14 +937,19 @@ impl DataSource for FileScanConfig {
937937
/// │ → SortExec removed, fetch (LIMIT) pushed to DataSourceExec
938938
/// │
939939
/// ├─► FileSource returns Inexact
940-
/// │ (reverse_row_groups=true)
941-
/// │ → SortExec kept, scan optimized
940+
/// │ (e.g. column_in_file_schema: opener will reorder RGs at runtime)
941+
/// │ → rebuild_with_source: sort files by stats; if the post-sort
942+
/// │ file groups are non-overlapping AND the request now validates
943+
/// │ AND no NULLs sit in the sort columns of non-last files,
944+
/// │ upgrade back to Exact (SortExec removed). Otherwise stays
945+
/// │ Inexact and SortExec is kept while the scan is still
946+
/// │ optimised via `sort_order_for_reorder` / `reverse_row_groups`.
942947
/// │
943948
/// └─► FileSource returns Unsupported
944-
/// (ordering stripped because files in wrong order)
949+
/// (e.g. expression sort key or partition column)
945950
/// → try_sort_file_groups_by_statistics():
946951
/// 1. Sort files within each group by min/max statistics
947-
/// 2. Re-check: non-overlapping + ordering valid?
952+
/// 2. Re-check: non-overlapping + ordering valid + no NULLs?
948953
/// YES → Exact → SortExec removed
949954
/// NO → Inexact (files reordered, Sort stays)
950955
/// ```
@@ -973,8 +978,42 @@ impl DataSource for FileScanConfig {
973978
}
974979
}
975980
SortOrderPushdownResult::Inexact { inner } => {
976-
Ok(SortOrderPushdownResult::Inexact {
977-
inner: Arc::new(self.rebuild_with_source(inner, false, order)?),
981+
let mut config = self.rebuild_with_source(inner, false, order)?;
982+
// `rebuild_with_source` reorders files by stats; if the
983+
// post-sort files are non-overlapping AND the request now
984+
// validates against the new file groups, `output_ordering`
985+
// is preserved and we can upgrade back to Exact. This
986+
// restores the sort-elimination behaviour that lived in
987+
// the `Unsupported` → `try_sort_file_groups_by_statistics`
988+
// path before #21956 routed `column_in_file_schema` cases
989+
// here.
990+
if config.output_ordering.is_empty() {
991+
return Ok(SortOrderPushdownResult::Inexact {
992+
inner: Arc::new(config),
993+
});
994+
}
995+
// Upgrading to Exact: the post-sort file groups are
996+
// non-overlapping and each file's declared ordering
997+
// re-validates, so reading the files in their natural
998+
// (declared-sorted) order already yields the requested
999+
// ordering — exactly like the `Unsupported` → Exact path,
1000+
// which reads files in natural order too.
1001+
//
1002+
// Drop the runtime row-group reorder hints the Inexact
1003+
// source carried (`sort_order_for_reorder` /
1004+
// `reverse_row_groups`) by restoring the original,
1005+
// hint-free source. With the `SortExec` removed those
1006+
// hints are not just redundant but unsafe: for a DESC
1007+
// request the opener sorts row groups ASC-by-min and then
1008+
// reverses them, which reorders two row groups within a
1009+
// single file that share the same `min` incorrectly
1010+
// (e.g. a file `[10,8,8,8]` whose row groups are
1011+
// `[10,8]` and `[8,8]` would stream as `8,8,10,8`).
1012+
// The `SortExec` used to mask this; once it is gone the
1013+
// reordered stream is the final, wrong answer.
1014+
config.file_source = Arc::clone(&self.file_source);
1015+
Ok(SortOrderPushdownResult::Exact {
1016+
inner: Arc::new(config),
9781017
})
9791018
}
9801019
SortOrderPushdownResult::Unsupported => {

datafusion/datasource/src/file_scan_config/sort_pushdown.rs

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -138,31 +138,76 @@ impl FileScanConfig {
138138
false
139139
};
140140

141-
if is_exact && all_non_overlapping {
142-
// Truly exact: within-file ordering guaranteed and files are non-overlapping.
143-
// Keep output_ordering so SortExec can be eliminated for each partition.
144-
//
145-
// We intentionally do NOT redistribute files across groups here.
146-
// The planning-phase bin-packing may interleave file ranges across groups:
147-
//
148-
// Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1
149-
// Group 1: [f2(11-20), f4(31-40)]
150-
//
151-
// This interleaving is actually beneficial because SPM pulls from both
152-
// partitions concurrently, keeping parallel I/O active:
153-
//
154-
// SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → pull P1 [31-40]
155-
// ^^^^^^^^^^^^ ^^^^^^^^^^^^
156-
// both partitions scanning files simultaneously
157-
//
158-
// If we were to redistribute files consecutively:
159-
// Group 0: [f1(1-10), f2(11-20)] ← all values < group 1
160-
// Group 1: [f3(21-30), f4(31-40)]
141+
// Decide whether to keep `output_ordering` (i.e. let the outer
142+
// pushdown report `Exact` and drop `SortExec`).
143+
//
144+
// Two paths can produce a keep:
145+
//
146+
// 1. `is_exact && all_non_overlapping`: the source already had
147+
// validated ordering and the post-sort files still don't
148+
// overlap — Exact carries through unchanged.
149+
//
150+
// 2. `!is_exact && all_non_overlapping`: source returned
151+
// `Inexact` because pre-sort `validated_output_ordering()`
152+
// stripped the declaration (files were listed out of order
153+
// on disk). After our stats-based sort the files are now
154+
// non-overlapping — re-validate against the new file
155+
// groups and, if it passes, upgrade back to Exact so the
156+
// outer wrapper drops the `SortExec`. Without this, the
157+
// `Inexact` branch stayed Inexact even when reorder
158+
// restored a perfectly valid ordering, leaving an
159+
// unnecessary `SortExec` above the source (regression
160+
// after #21956's `column_in_file_schema` signal pushed
161+
// this scenario into the Inexact branch instead of the
162+
// `try_sort_file_groups_by_statistics` fallback).
163+
//
164+
// We intentionally do NOT redistribute files across groups here.
165+
// The planning-phase bin-packing may interleave file ranges across groups:
166+
//
167+
// Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1
168+
// Group 1: [f2(11-20), f4(31-40)]
169+
//
170+
// This interleaving is actually beneficial because SPM pulls from both
171+
// partitions concurrently, keeping parallel I/O active.
172+
let keep_ordering = match (all_non_overlapping, is_exact) {
173+
// Files still overlap after the stats sort — the combined
174+
// stream isn't ordered, so `output_ordering` must be dropped.
175+
(false, _) => false,
176+
// Source already had validated ordering and the post-sort
177+
// files still don't overlap — Exact carries through.
178+
(true, true) => true,
179+
// Source returned `Inexact`; re-validate against the
180+
// reordered file groups to decide whether to upgrade.
161181
//
162-
// SPM would read ALL of group 0 first (values always smaller), then group 1.
163-
// This degrades to single-threaded sequential I/O — the other partition
164-
// sits idle the entire time, losing the parallelism benefit.
165-
} else {
182+
// Same NULL guard as `try_sort_file_groups_by_statistics`:
183+
// we cannot claim Exact if any non-last file contains
184+
// NULLs in the sort columns. With NULLS LAST those
185+
// NULLs sit after all non-null rows in the file, so
186+
// when the next file's non-nulls are smaller than the
187+
// previous file's max, they'd appear *after* the NULLs
188+
// in the concatenated stream — breaking the ordering.
189+
(true, false) => {
190+
let projected_schema = new_config.projected_schema()?;
191+
let projection_indices = new_config
192+
.file_source
193+
.projection()
194+
.as_ref()
195+
.and_then(|p| ordered_column_indices_from_projection(p));
196+
if any_file_has_nulls_in_sort_columns(
197+
&new_config.file_groups,
198+
order,
199+
&projected_schema,
200+
projection_indices.as_deref(),
201+
) {
202+
false
203+
} else {
204+
let new_eq_props = new_config.eq_properties();
205+
new_eq_props.ordering_satisfy(order.iter().cloned())?
206+
}
207+
}
208+
};
209+
210+
if !keep_ordering {
166211
new_config.output_ordering = vec![];
167212
}
168213

0 commit comments

Comments
 (0)