Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
4130ce4
feat: sort file groups by statistics during sort pushdown
zhuqi-lucas Mar 26, 2026
e2ac228
address review: remove unnecessary stats computation in reverse path,…
zhuqi-lucas Mar 26, 2026
21da48f
add test: reverse path preserves file order, does not apply stats sor…
zhuqi-lucas Mar 26, 2026
25938bb
restore doc comment with ASCII diagrams for get_projected_output_orde…
zhuqi-lucas Mar 27, 2026
29c526b
docs: add non-overlapping exception to partition ordering diagram
zhuqi-lucas Mar 27, 2026
aa290d4
remove redistribute_files_across_groups_by_statistics
zhuqi-lucas Mar 28, 2026
b3ed744
fix: re-check ordering after statistics-based file sorting
zhuqi-lucas Mar 31, 2026
5718f8a
fix: preserve fetch (LIMIT) when eliminating SortExec via Exact pushdown
zhuqi-lucas Apr 2, 2026
28661d4
docs: accurate PR description and code comments for sort pushdown
zhuqi-lucas Apr 3, 2026
911f0dd
docs: fix PR description and code comments to accurately describe sor…
zhuqi-lucas Apr 3, 2026
35a581c
perf: increase SortPreservingMergeExec prefetch buffer from 1 to 16
zhuqi-lucas Apr 3, 2026
59e99a4
fix: prevent sort elimination when files have NULLs in sort columns
zhuqi-lucas Apr 4, 2026
c1d0a33
fix comments
zhuqi-lucas Apr 4, 2026
7cadeca
refactor: scope SPM prefetch buffer to sort elimination path only
zhuqi-lucas Apr 4, 2026
63fd896
fmt
zhuqi-lucas Apr 4, 2026
eb1af95
refactor: replace SPM prefetch with BufferExec for sort elimination
zhuqi-lucas Apr 6, 2026
790cbce
perf: increase BufferExec capacity from 8MB to 64MB for sort elimination
zhuqi-lucas Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ fn test_prefix_match_through_transparent_nodes() {
}

#[test]
fn test_no_prefix_match_wrong_direction() {
// Test that prefix matching does NOT work if the direction is wrong
fn test_exact_prefix_match_same_direction() {
// Test that when the requested sort [a DESC] matches a prefix of the source's
// natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown).
let schema = schema();

// Source has [a DESC, b ASC] ordering
Expand All @@ -265,7 +266,7 @@ fn test_no_prefix_match_wrong_direction() {
let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Request [a DESC] - same direction as source, NOT a reverse prefix
// Request [a DESC] - same direction as source prefix, Sort should be eliminated
let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap();
let plan = sort_exec(same_direction, source);

Expand All @@ -278,8 +279,7 @@ fn test_no_prefix_match_wrong_direction() {
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
"
);
}
Expand Down
35 changes: 19 additions & 16 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,19 +743,17 @@ impl FileSource for ParquetSource {
///
/// With both pieces of information, ParquetSource can decide what optimizations to apply.
///
/// # Phase 1 Behavior (Current)
/// Returns `Inexact` when reversing the row group scan order would help satisfy the
/// requested ordering. We still need a Sort operator at a higher level because:
/// - We only reverse row group read order, not rows within row groups
/// - This provides approximate ordering that benefits limit pushdown
///
/// # Phase 2 (Future)
/// Could return `Exact` when we can guarantee perfect ordering through techniques like:
/// - File reordering based on statistics
/// - Detecting already-sorted data
/// This would allow removing the Sort operator entirely.
/// # Behavior
/// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already
/// satisfies the requested ordering. This allows the Sort operator to be eliminated
/// if the files within each group are also non-overlapping (checked by FileScanConfig).
/// - Returns `Inexact` when reversing the row group scan order would help satisfy the
/// requested ordering. We still need a Sort operator at a higher level because:
/// - We only reverse row group read order, not rows within row groups
/// - This provides approximate ordering that benefits limit pushdown
///
/// # Returns
/// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed)
/// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
/// - `Unsupported`: Cannot optimize for this ordering
fn try_pushdown_sort(
Expand All @@ -767,6 +765,16 @@ impl FileSource for ParquetSource {
return Ok(SortOrderPushdownResult::Unsupported);
}

// Check if the natural (non-reversed) ordering already satisfies the request.
// Parquet metadata guarantees within-file ordering, so if the ordering matches
// we can return Exact. FileScanConfig will verify that files within each group
// are non-overlapping before declaring the entire scan as Exact.
if eq_properties.ordering_satisfy(order.iter().cloned())? {
return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
});
}

// Build new equivalence properties with the reversed ordering.
// This allows us to check if the reversed ordering satisfies the request
// by leveraging:
Expand Down Expand Up @@ -811,11 +819,6 @@ impl FileSource for ParquetSource {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})

// TODO Phase 2: Add support for other optimizations:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think there's one more trick we could have up our sleeves: instead of only reversing row group orders we could pass the desired sort order into the opener and have it re-sort the row groups based on stats to try to match the scan's desired ordering. This might be especially effective once we have morselized scans since we could terminate after a single row group for TopK queries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! Row-group-level statistics reordering would be a natural extension of our file-level reordering but at finer granularity. Especially powerful with morselized scans where TopK could terminate after a single row group. Will track as a follow-up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're able to open a followup issue and link it to the PR and code that would be great!

Copy link
Copy Markdown
Contributor Author

@zhuqi-lucas zhuqi-lucas Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb Created follow-up issue: #21317

Great idea! Row-group-level statistics reordering would be a natural extension of our file-level reordering but at finer granularity. Especially powerful with morselized scans where TopK could terminate after a single row group.

// - File reordering based on min/max statistics
// - Detection of exact ordering (return Exact to remove Sort operator)
// - Partial sort pushdown for prefix matches
}

fn apply_expressions(
Expand Down
Loading
Loading