Skip to content

Commit cdfade5

Browse files
zhuqi-lucasclaude
andauthored
feat: sort file groups by statistics during sort pushdown (Sort pushdown phase 2) (#21182)
## Which issue does this PR close? - Closes #17348 - Closes #19329 ## Rationale for this change When a partition (file group) contains multiple files in wrong order, `validated_output_ordering()` strips the ordering and `EnforceSorting` inserts an unnecessary `SortExec` — even though the files are non-overlapping and internally sorted. This PR fixes it by **sorting files within each group by min/max statistics** during sort pushdown. After sorting, the file order matches the sort key order, the ordering becomes valid, and `SortExec` can be eliminated. This works for both single-partition and multi-partition plans with multi-file groups. ## What changes are included in this PR? ### Core optimization ```text Files in wrong order within a partition: After statistics-based sorting: [a_high(400k+), b_mid(200k), c_low(1)] [c_low(1), b_mid(200k), a_high(400k+)] → ordering stripped → ordering valid, non-overlapping → SortExec stays → SortExec eliminated ``` When `PushdownSort` finds a `SortExec` above a file-based `DataSourceExec`: 1. **FileSource returns Exact** (natural ordering satisfies request): - Sort files within each group by statistics, verify non-overlapping - SortExec removed, fetch (LIMIT) pushed to DataSourceExec 2. **FileSource returns Unsupported** (ordering stripped due to wrong file order): - Sort files within each group by statistics - Re-check: if files are now non-overlapping and ordering is valid → upgrade to Exact - SortExec eliminated + fetch pushed down 3. **FileSource returns Inexact** (reverse scan): - SortExec kept, scan optimized with reverse_row_groups ### Files Changed | File | Change | |------|--------| | `datasource-parquet/src/source.rs` | ParquetSource returns `Exact` when natural ordering satisfies request | | `datasource/src/file_scan_config.rs` | Statistics-based file sorting, non-overlapping re-check, Unsupported→Exact upgrade | | `physical-optimizer/src/pushdown_sort.rs` | Preserve fetch (LIMIT) when eliminating SortExec, module doc update | | `core/tests/physical_optimizer/pushdown_sort.rs` | Updated prefix match test | | `sqllogictest/test_files/sort_pushdown.slt` | Updated existing tests + 5 new test groups (A-E) | ## Benchmark Results Local release build, `--partitions 1`, 3 non-overlapping files with reversed naming (6M rows): | Query | Description | Main (ms) | PR (ms) | Speedup | |-------|-------------|-----------|---------|---------| | Q1 | `ORDER BY ASC` (full scan) | 259 | 122 | **2.1x faster** | | Q2 | `ORDER BY ASC LIMIT 100` | 80 | 3 | **27x faster** | | Q3 | `SELECT * ORDER BY ASC` | 700 | 313 | **2.2x faster** | | Q4 | `SELECT * LIMIT 100` | 342 | 7 | **49x faster** | LIMIT queries benefit most because sort elimination + limit pushdown means only the first ~100 rows are read. ## Tests - 13 new unit tests covering all sort pushdown paths - 5 new SLT integration test groups (sort elimination, overlapping files, LIMIT, multi-partition, inferred ordering) - All existing tests pass with no regressions ## Test plan - [x] `cargo test -p datafusion-datasource` — all tests pass - [x] `cargo test -p datafusion-datasource-parquet` — all tests pass - [x] `cargo test -p datafusion-physical-optimizer` — all tests pass - [x] `cargo test -p datafusion --test core_integration` — all tests pass - [x] SLT sort/order/topk/window/union/joins tests pass (no regressions) - [x] `cargo clippy` — 0 warnings 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8a2d758 commit cdfade5

File tree

6 files changed

+1302
-69
lines changed

6 files changed

+1302
-69
lines changed

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,9 @@ fn test_prefix_match_through_transparent_nodes() {
255255
}
256256

257257
#[test]
258-
fn test_no_prefix_match_wrong_direction() {
259-
// Test that prefix matching does NOT work if the direction is wrong
258+
fn test_exact_prefix_match_same_direction() {
259+
// Test that when the requested sort [a DESC] matches a prefix of the source's
260+
// natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown).
260261
let schema = schema();
261262

262263
// Source has [a DESC, b ASC] ordering
@@ -265,7 +266,7 @@ fn test_no_prefix_match_wrong_direction() {
265266
let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap();
266267
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
267268

268-
// Request [a DESC] - same direction as source, NOT a reverse prefix
269+
// Request [a DESC] - same direction as source prefix, Sort should be eliminated
269270
let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap();
270271
let plan = sort_exec(same_direction, source);
271272

@@ -278,8 +279,7 @@ fn test_no_prefix_match_wrong_direction() {
278279
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
279280
output:
280281
Ok:
281-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
282-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
282+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
283283
"
284284
);
285285
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -743,19 +743,17 @@ impl FileSource for ParquetSource {
743743
///
744744
/// With both pieces of information, ParquetSource can decide what optimizations to apply.
745745
///
746-
/// # Phase 1 Behavior (Current)
747-
/// Returns `Inexact` when reversing the row group scan order would help satisfy the
748-
/// requested ordering. We still need a Sort operator at a higher level because:
749-
/// - We only reverse row group read order, not rows within row groups
750-
/// - This provides approximate ordering that benefits limit pushdown
751-
///
752-
/// # Phase 2 (Future)
753-
/// Could return `Exact` when we can guarantee perfect ordering through techniques like:
754-
/// - File reordering based on statistics
755-
/// - Detecting already-sorted data
756-
/// This would allow removing the Sort operator entirely.
746+
/// # Behavior
747+
/// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already
748+
/// satisfies the requested ordering. This allows the Sort operator to be eliminated
749+
/// if the files within each group are also non-overlapping (checked by FileScanConfig).
750+
/// - Returns `Inexact` when reversing the row group scan order would help satisfy the
751+
/// requested ordering. We still need a Sort operator at a higher level because:
752+
/// - We only reverse row group read order, not rows within row groups
753+
/// - This provides approximate ordering that benefits limit pushdown
757754
///
758755
/// # Returns
756+
/// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed)
759757
/// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
760758
/// - `Unsupported`: Cannot optimize for this ordering
761759
fn try_pushdown_sort(
@@ -767,6 +765,16 @@ impl FileSource for ParquetSource {
767765
return Ok(SortOrderPushdownResult::Unsupported);
768766
}
769767

768+
// Check if the natural (non-reversed) ordering already satisfies the request.
769+
// Parquet metadata guarantees within-file ordering, so if the ordering matches
770+
// we can return Exact. FileScanConfig will verify that files within each group
771+
// are non-overlapping before declaring the entire scan as Exact.
772+
if eq_properties.ordering_satisfy(order.iter().cloned())? {
773+
return Ok(SortOrderPushdownResult::Exact {
774+
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
775+
});
776+
}
777+
770778
// Build new equivalence properties with the reversed ordering.
771779
// This allows us to check if the reversed ordering satisfies the request
772780
// by leveraging:
@@ -811,11 +819,6 @@ impl FileSource for ParquetSource {
811819
Ok(SortOrderPushdownResult::Inexact {
812820
inner: Arc::new(new_source) as Arc<dyn FileSource>,
813821
})
814-
815-
// TODO Phase 2: Add support for other optimizations:
816-
// - File reordering based on min/max statistics
817-
// - Detection of exact ordering (return Exact to remove Sort operator)
818-
// - Partial sort pushdown for prefix matches
819822
}
820823

821824
fn apply_expressions(

0 commit comments

Comments
 (0)