Epic: Dynamic row group pruning using TopK threshold during parquet scan
Is your feature request related to a problem or challenge?
DataFusion already supports file-level dynamic pruning — when TopK sets a threshold, EarlyStoppingStream can skip entire subsequent files. This is highly effective for multi-file scans.
However, for single large files (e.g., tens of GB with hundreds of row groups), this file-level pruning cannot help. Row group selection within a file is done upfront before any data is read, when the dynamic filter is still empty. All row groups are selected regardless, and TopK must iterate through every one of them.
Core insight: extend file-level pruning down to row-group level.
A single large file with 100 row groups is essentially the same as 100 small files — if we can prune files dynamically, we should be able to prune row groups the same way.
Current: file-level pruning Proposed: row-group-level pruning
================================ ==================================
File 1 ──► read File (large, 100 RGs)
File 2 ──► read, TopK threshold set RG1 ──► read (late materialization)
File 3 ──► PRUNED (skip entire file) RG2 ──► threshold set, PRUNE! (no IO)
File 4 ──► PRUNED RG3 ──► PRUNE!
...
Works great for many small files RG100 ─► PRUNE!
Useless for one large file
Same pruning logic, finer granularity
→ single large file gets same benefit
Describe the solution you'd like
Re-evaluate the TopK dynamic filter between row groups during the parquet scan. When a row group's min/max statistics show it cannot contain qualifying rows, skip it entirely — no I/O, no decode, no decompress. Exactly like file-level EarlyStoppingStream, but at row-group granularity.
Architecture: Current vs Proposed
CURRENT STATE
=============
TopK (threshold updates after each batch)
│
▼
DataSourceExec ──► Parquet Reader
│
├─ RG selection: UPFRONT (dynamic filter = empty)
│ → All RGs selected regardless
│
├─ Read RG3 ──► batch ──► TopK (threshold = 291)
├─ Read RG2 ──► batch ──► TopK (filtered at row level)
└─ Read RG1 ──► batch ──► TopK (filtered at row level)
▲
│
All RGs read: IO + decode + decompress
even though RG2, RG1 could be skipped
PROPOSED
========
TopK (threshold updates after each batch)
│
▼
DataSourceExec ──► Parquet Reader
│
├─ Read RG3 ──► batch ──► TopK (threshold = 291)
│
├─ Before RG2: re-check dynamic filter
│ rg2.max=200 < 291 → SKIP! (no IO)
│
├─ Before RG1: re-check dynamic filter
│ rg1.max=100 < 291 → SKIP! (no IO)
│
└─ Done. Only 1 of 3 RGs read!
(same as if they were 3 separate files)
Phase 1: Reverse scan (ordered row groups)
Building on the TopK + reverse row group optimization (#19064), when reverse_row_groups=true, row groups are already in sorted order. After TopK reads the first row group and sets a tight threshold, all subsequent row groups are guaranteed to have smaller values — the first pruned row group means all remaining can be skipped (early termination, just like file-level pruning).
Example: File sorted ASC, reverse_row_groups=true
Query: ORDER BY id DESC LIMIT 10
File Layout (original): Read Order (reversed):
┌──────────────────┐
│ RG1: id 1-100 │ RG3 ◄── read first (late materialization)
│ RG2: id 101-200 │ RG2 ◄── max=200 < 291 → PRUNE + EARLY STOP
│ RG3: id 201-300 │ RG1 ◄── skipped (all remaining pruned)
└──────────────────┘
Result: 1 of 3 RGs read (67% IO saved)
For 100 RGs: ~99% IO saved
Why dynamic RG pruning over reversing batch rows?
An alternative approach is to reverse rows within each decoded batch (via arrow::compute::take with reversed indices), making the output globally sorted DESC. This lets the optimizer replace TopK with a simple GlobalLimit and push LIMIT into the source. However, dynamic RG pruning is strictly better:
|
Reverse batch rows |
Dynamic RG pruning |
| Extra memory |
Full batch materialized before reversal |
Zero — skipped RGs never read |
| Cross-RG ordering |
Must also ensure correct ordering across row group boundaries (last rows of RG_n vs first rows of RG_n+1) |
Not needed — TopK handles global ordering |
| Sort operator |
Must declare Exact (requires non-overlapping files) |
TopK stays — works regardless of file layout |
| Correctness risk |
Must guarantee global ordering across files and across row groups within each file |
TopK guarantees correctness |
| Applicability |
Only sorted, non-overlapping files with non-overlapping RGs |
Any file, including unordered RGs (Phase 2) |
Dynamic RG pruning achieves the same I/O reduction (skip all but 1-2 RGs) without extra memory or correctness constraints.
Phase 2: Statistics-based row group reordering (#21317)
Extend to files where row groups are NOT in sorted order. Reorder row groups by statistics first, then apply the same pruning logic.
Example: File with unordered RGs
Query: ORDER BY price ASC LIMIT 10
Original Order: Reordered by min(price):
┌──────────────────┐ ┌──────────────────┐
│ RG1: price 50-99 │ │ RG3: price 1-30 │ ◄── read, threshold=30
│ RG2: price 200+ │ │ RG1: price 50-99 │ ◄── min=50 > 30 → PRUNE
│ RG3: price 1-30 │ │ RG2: price 200+ │ ◄── min=200 > 30 → PRUNE
└──────────────────┘ └──────────────────┘
Result: 1 of 3 RGs read
Data flow
┌─────────┐ threshold ┌──────────────────────┐
│ TopK │ ──────────────► │ DynamicFilterExpr │
│ Operator│ │ (generation counter) │
└────┬────┘ └──────────┬───────────┘
│ │
│ pulls batches │ generation changed?
│ ▼
┌────┴─────────────────────────────────────────────┐
│ PushDecoderStreamState::transition() │
│ │
│ loop { │
│ // NEW: check before each RG │
│ peek_next_row_group() │
│ → get RG statistics │
│ → evaluate PruningPredicate │
│ → SKIP or READ │
│ │
│ if READ: │
│ try_decode() → NeedsData → fetch IO │
│ → Data(batch) → return to TopK │
│ } │
└──────────────────────────────────────────────────┘
Implementation approach
Arrow-rs changes (~55 lines):
- Add
peek_next_row_group() and skip_next_row_group() to ParquetPushDecoder
- Expose
parquet_metadata() getter for statistics access
DataFusion changes (~80 lines):
- Add
RowGroupPruner that re-evaluates dynamic filter between row groups
- Integrate into
PushDecoderStreamState::transition() — check before requesting data
- Reuse existing
PruningPredicate infrastructure (same as file-level pruning)
- Add
row_groups_pruned_dynamic metric
Expected benefit
Scenario: ORDER BY col LIMIT 10, single large file
┌────────────┬──────────────┬───────────────────┬───────────────────┐
│ │ No optimize │ Phase 1 (reverse) │ Phase 2 (reorder) │
├────────────┼──────────────┼───────────────────┼───────────────────┤
│ RGs read │ 100 │ 1-2 │ 2-5 │
│ RGs skipped│ 0 │ 98-99 │ 95-98 │
│ IO saved │ 0% │ ~98% │ ~95% │
└────────────┴──────────────┴───────────────────┴───────────────────┘
Key: first RG does late materialization (sets TopK threshold),
all subsequent RGs get early-stopped — just like file pruning.
Sub-issues / tasks
Related issues (sort pushdown optimization series):
cc @alamb @Dandandan @adriangb
Epic: Dynamic row group pruning using TopK threshold during parquet scan
Is your feature request related to a problem or challenge?
DataFusion already supports file-level dynamic pruning — when TopK sets a threshold,
EarlyStoppingStreamcan skip entire subsequent files. This is highly effective for multi-file scans.However, for single large files (e.g., tens of GB with hundreds of row groups), this file-level pruning cannot help. Row group selection within a file is done upfront before any data is read, when the dynamic filter is still empty. All row groups are selected regardless, and TopK must iterate through every one of them.
Core insight: extend file-level pruning down to row-group level.
A single large file with 100 row groups is essentially the same as 100 small files — if we can prune files dynamically, we should be able to prune row groups the same way.
Describe the solution you'd like
Re-evaluate the TopK dynamic filter between row groups during the parquet scan. When a row group's min/max statistics show it cannot contain qualifying rows, skip it entirely — no I/O, no decode, no decompress. Exactly like file-level
EarlyStoppingStream, but at row-group granularity.Architecture: Current vs Proposed
Phase 1: Reverse scan (ordered row groups)
Building on the TopK + reverse row group optimization (#19064), when
reverse_row_groups=true, row groups are already in sorted order. After TopK reads the first row group and sets a tight threshold, all subsequent row groups are guaranteed to have smaller values — the first pruned row group means all remaining can be skipped (early termination, just like file-level pruning).Why dynamic RG pruning over reversing batch rows?
An alternative approach is to reverse rows within each decoded batch (via
arrow::compute::takewith reversed indices), making the output globally sorted DESC. This lets the optimizer replace TopK with a simpleGlobalLimitand push LIMIT into the source. However, dynamic RG pruning is strictly better:Exact(requires non-overlapping files)Dynamic RG pruning achieves the same I/O reduction (skip all but 1-2 RGs) without extra memory or correctness constraints.
Phase 2: Statistics-based row group reordering (#21317)
Extend to files where row groups are NOT in sorted order. Reorder row groups by statistics first, then apply the same pruning logic.
Data flow
Implementation approach
Arrow-rs changes (~55 lines):
peek_next_row_group()andskip_next_row_group()toParquetPushDecoderparquet_metadata()getter for statistics accessDataFusion changes (~80 lines):
RowGroupPrunerthat re-evaluates dynamic filter between row groupsPushDecoderStreamState::transition()— check before requesting dataPruningPredicateinfrastructure (same as file-level pruning)row_groups_pruned_dynamicmetricExpected benefit
Sub-issues / tasks
reverse_row_groups=true)peek/skip_next_row_grouponParquetPushDecoderRelated issues (sort pushdown optimization series):
cc @alamb @Dandandan @adriangb