feat: globally reorder files and row groups by statistics for TopK queries#21956
feat: globally reorder files and row groups by statistics for TopK queries#21956zhuqi-lucas wants to merge 5 commits intoapache:mainfrom
Conversation
864a3d3 to
6e56cae
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves TopK (ORDER BY ... LIMIT K) performance for Parquet scans by using statistics to reorder work (files and row groups) so the dynamic filter threshold converges faster and more data can be pruned during execution.
Changes:
- Propagate TopK sort metadata (
sort_options,fetch) viaDynamicFilterPhysicalExprand fixSortExec::with_fetchordering so the dynamic filter sees the correct K. - Add file-level reordering hook (
FileSource::reorder_files) and implement statistics-based file reordering for Parquet. - Add row-group access plan optimization plumbing and statistics-based row-group reordering (composable with reverse scanning), plus SLT coverage.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds SLT coverage for row-group/file reordering behavior across multiple TopK scenarios |
| datafusion/physical-plan/src/sorts/sort.rs | Passes sort options + fetch into dynamic filter and adjusts fetch/filter initialization order |
| datafusion/physical-expr/src/expressions/dynamic_filters.rs | Extends DynamicFilterPhysicalExpr with optional sort metadata and fetch limit |
| datafusion/datasource/src/file_stream/work_source.rs | Calls FileSource::reorder_files before queueing shared work |
| datafusion/datasource/src/file.rs | Introduces FileSource::reorder_files default extension point |
| datafusion/datasource-parquet/src/source.rs | Implements Parquet file reordering using column statistics; wires fallback sort info |
| datafusion/datasource-parquet/src/opener.rs | Applies row-group access plan optimizers (reorder + reverse) based on TopK metadata |
| datafusion/datasource-parquet/src/mod.rs | Registers new access plan optimizer module |
| datafusion/datasource-parquet/src/access_plan_optimizer.rs | Adds AccessPlanOptimizer trait plus ReverseRowGroups / ReorderByStatistics implementations |
| datafusion/datasource-parquet/src/access_plan.rs | Adds PreparedAccessPlan::reorder_by_statistics (min-stat based RG ordering) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
6e56cae to
f0f4058
Compare
587297d to
235c4e1
Compare
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (235c4e1) to 0144570 (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
235c4e1 to
0c9c3d8
Compare
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
0c9c3d8 to
f7d9156
Compare
|
The benchmark results are expected — RG reorder alone doesn't skip any row groups, it only changes the read order so that TopK's dynamic filter threshold converges faster. The significant speedup (2-3x on
Without reorder, cumulative prune might truncate the wrong RGs. Reorder ensures the best RGs come first, making truncation safe and effective. |
|
Sorry @zhuqi-lucas - I am really struggling these days tor review all these large PRs. Last year it was very rare to see a more than 1000 line PR and now we get multiple such PRs a day. I will try and find the time to review this more carefuly |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (f7d9156) to 0144570 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Thanks @alamb, no rush at all, I really appreciate you taking the time. |
|
the benchmark results look mixed -- I'll see if that reproduces on a second run |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (f7d9156) to 0144570 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Thanks @alamb , i update the PR to skip reorder when overlap is heavy, and let me trigger again to see if it's the reason. |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (c8fd321) to 0c38ebb (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
When a parquet file has multiple row groups with out-of-order or overlapping statistics, TopK queries benefit from reading "best" row groups first so the dynamic filter threshold tightens quickly. This PR adds: 1. `reorder_by_statistics`: sorts row groups by min values (ASC) based on parquet column statistics. Direction (DESC) is handled by the existing `reverse()` applied after reorder. The two steps compose: - Sorted data: reorder is a no-op, reverse gives perfect DESC order - Unsorted data: reorder fixes the order, reverse flips for DESC 2. `AccessPlanOptimizer` trait: extensible interface for row group access plan optimizations (reorder, reverse) applied after pruning. 3. `DynamicFilterPhysicalExpr.sort_options/fetch`: SortExec now passes sort direction and fetch limit to the dynamic filter, enabling the parquet reader to determine reorder direction for any TopK query. 4. `FileSource::reorder_files`: file-level reordering in the shared work queue so multi-file TopK reads the most promising files first. 5. Fix `SortExec::with_fetch` ordering: fetch must be set before `create_filter()` so the DynamicFilter gets the correct K value.
Adds an overlap guard to `PreparedAccessPlan::reorder_by_statistics`: when sorted-by-min adjacent row-group `[min, max]` ranges overlap above 50%, reordering cannot enable RG-level pruning (every "later" RG still has values that could appear in TopK results) so the reorder cost — CPU sort, lost IO sequential locality, and parallel scheduling pessimization across workers all pulling "best" RGs first — dominates, producing a net regression. This addresses the ClickBench `hits_partitioned` regressions (Q24, Q25, Q26: 1.11x-1.44x slower) where `EventTime` and `SearchPhrase` are uniformly distributed across row groups via the user-id hash partitioning, so RG min/max ranges fully overlap and reorder cannot help. Sorted / non-overlapping data continues to take the reorder path — verified by `sort_pushdown.slt` Test H still passing. The overlap helper treats null mins/maxes (RGs without statistics) conservatively as overlaps, so missing stats discourage rather than silently disable the guard. Adds 7 unit tests covering: fully disjoint, disjoint after reorder, fully overlapping, partial overlap, null max in previous, null min in next, and the n<2 edge case.
Upstream `proto: serialize and dedupe dynamic filters v2 (apache#21807)` added a new `DynamicFilterPhysicalExpr::from_parts` constructor that builds the struct via a `Self { ... }` literal. After this PR's rebase brings that commit in, the new initializer is missing the `sort_options` and `fetch` fields this PR adds, breaking the build (E0063). Default both to `None` in `from_parts` — the proto wire format does not yet carry these fields, so reconstruction cannot recover them. Callers that need sort metadata still go through `new_with_sort_options`. Proto round-trip tests (162 in `datafusion-proto`) continue to pass.
c8fd321 to
ab09242
Compare
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (4fac37b) to 3b634aa (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Mirrors the row-group-level overlap guard at the file level. When adjacent file `[min, max]` ranges (in sorted-by-min order) overlap above 50%, file-level reorder cannot enable file pruning and the reorder cost (CPU sort + lost IO sequential locality + parallel work-stealing pessimization) dominates. This addresses the remaining ClickBench `hits_partitioned` regressions (Q24, Q25, Q26 still 1.10x-1.31x slower after the RG-level guard landed). `hits_partitioned` is partitioned by user-id hash, so each file covers the full range of `EventTime` and `SearchPhrase` — file ranges fully overlap and reorder cannot help. Files lacking statistics are conservatively counted as overlaps so missing stats discourage rather than silently disable the guard. Adds 6 unit tests (disjoint sorted, disjoint after reorder, fully overlapping, partial below threshold, missing stats, n<2).
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (6987aa2) to 3b634aa (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
The benchmark is ok now. |
Which issue does this PR close?
Rationale for this change
TopK queries (
ORDER BY col LIMIT K) on parquet files with multiple out-of-order row groups are suboptimal — the dynamic filter threshold converges slowly because row groups are read in arbitrary order. By reordering row groups so the "best" ones (containing optimal values) are read first, the threshold tightens quickly and subsequent row groups are pruned at runtime.What changes are included in this PR?
Row group reorder by statistics:
PreparedAccessPlan::reorder_by_statistics(): sorts row groups by min values (ASC) using parquet column statistics. Direction (DESC) is handled by existingreverse()applied after reorder. The two compose correctly for both sorted and unsorted data.AccessPlanOptimizertrait: extensible interface for row group access plan optimizations applied after pruning.DynamicFilter sort metadata:
DynamicFilterPhysicalExprgainssort_optionsandfetchfields, set bySortExec::create_filter(). This lets the parquet reader determine reorder direction for any TopK query (not just sort-pushdown path).SortExec::with_fetchnow sets fetch before callingcreate_filter()so the DynamicFilter gets the correct K value.File-level reorder in shared work queue:
FileSource::reorder_files()trait method + parquet implementation: reorders files in the shared work queue by statistics so multi-file TopK reads the most promising files first across all partitions.Are these changes tested?
Are there any user-facing changes?
No API changes. TopK queries on parquet with multiple row groups will automatically benefit from better row group ordering. This is a performance optimization only — query results are unchanged.