Skip to content

[Epic] Dynamic row group pruning using TopK threshold during parquet scan #21399

@zhuqi-lucas

Description

@zhuqi-lucas

Epic: Dynamic row group pruning using TopK threshold during parquet scan

Is your feature request related to a problem or challenge?

DataFusion already supports file-level dynamic pruning — when TopK sets a threshold, EarlyStoppingStream can skip entire subsequent files. This is highly effective for multi-file scans.

However, for single large files (e.g., tens of GB with hundreds of row groups), this file-level pruning cannot help. Row group selection within a file is done upfront before any data is read, when the dynamic filter is still empty. All row groups are selected regardless, and TopK must iterate through every one of them.

Core insight: extend file-level pruning down to row-group level.

A single large file with 100 row groups is essentially the same as 100 small files — if we can prune files dynamically, we should be able to prune row groups the same way.

  Current: file-level pruning           Proposed: row-group-level pruning
  ================================      ==================================

  File 1 ──► read                       File (large, 100 RGs)
  File 2 ──► read, TopK threshold set     RG1 ──► read (late materialization)
  File 3 ──► PRUNED (skip entire file)    RG2 ──► threshold set, PRUNE! (no IO)
  File 4 ──► PRUNED                       RG3 ──► PRUNE!
                                          ...
  Works great for many small files        RG100 ─► PRUNE!
  Useless for one large file
                                          Same pruning logic, finer granularity
                                          → single large file gets same benefit

Describe the solution you'd like

Re-evaluate the TopK dynamic filter between row groups during the parquet scan. When a row group's min/max statistics show it cannot contain qualifying rows, skip it entirely — no I/O, no decode, no decompress. Exactly like file-level EarlyStoppingStream, but at row-group granularity.

Architecture: Current vs Proposed

                        CURRENT STATE
                        =============

  TopK (threshold updates after each batch)
    │
    ▼
  DataSourceExec ──► Parquet Reader
                       │
                       ├─ RG selection: UPFRONT (dynamic filter = empty)
                       │   → All RGs selected regardless
                       │
                       ├─ Read RG3 ──► batch ──► TopK (threshold = 291)
                       ├─ Read RG2 ──► batch ──► TopK (filtered at row level)
                       └─ Read RG1 ──► batch ──► TopK (filtered at row level)
                           ▲
                           │
                     All RGs read: IO + decode + decompress
                     even though RG2, RG1 could be skipped


                        PROPOSED
                        ========

  TopK (threshold updates after each batch)
    │
    ▼
  DataSourceExec ──► Parquet Reader
                       │
                       ├─ Read RG3 ──► batch ──► TopK (threshold = 291)
                       │
                       ├─ Before RG2: re-check dynamic filter
                       │   rg2.max=200 < 291 → SKIP! (no IO)
                       │
                       ├─ Before RG1: re-check dynamic filter
                       │   rg1.max=100 < 291 → SKIP! (no IO)
                       │
                       └─ Done. Only 1 of 3 RGs read!
                          (same as if they were 3 separate files)

Phase 1: Reverse scan (ordered row groups)

Building on the TopK + reverse row group optimization (#19064), when reverse_row_groups=true, row groups are already in sorted order. After TopK reads the first row group and sets a tight threshold, all subsequent row groups are guaranteed to have smaller values — the first pruned row group means all remaining can be skipped (early termination, just like file-level pruning).

Example: File sorted ASC, reverse_row_groups=true
         Query: ORDER BY id DESC LIMIT 10

  File Layout (original):     Read Order (reversed):
  ┌──────────────────┐
  │ RG1: id 1-100    │        RG3 ◄── read first (late materialization)
  │ RG2: id 101-200  │        RG2 ◄── max=200 < 291 → PRUNE + EARLY STOP
  │ RG3: id 201-300  │        RG1 ◄── skipped (all remaining pruned)
  └──────────────────┘

  Result: 1 of 3 RGs read (67% IO saved)
  For 100 RGs: ~99% IO saved

Why dynamic RG pruning over reversing batch rows?

An alternative approach is to reverse rows within each decoded batch (via arrow::compute::take with reversed indices), making the output globally sorted DESC. This lets the optimizer replace TopK with a simple GlobalLimit and push LIMIT into the source. However, dynamic RG pruning is strictly better:

Reverse batch rows Dynamic RG pruning
Extra memory Full batch materialized before reversal Zero — skipped RGs never read
Cross-RG ordering Must also ensure correct ordering across row group boundaries (last rows of RG_n vs first rows of RG_n+1) Not needed — TopK handles global ordering
Sort operator Must declare Exact (requires non-overlapping files) TopK stays — works regardless of file layout
Correctness risk Must guarantee global ordering across files and across row groups within each file TopK guarantees correctness
Applicability Only sorted, non-overlapping files with non-overlapping RGs Any file, including unordered RGs (Phase 2)

Dynamic RG pruning achieves the same I/O reduction (skip all but 1-2 RGs) without extra memory or correctness constraints.

Phase 2: Statistics-based row group reordering (#21317)

Extend to files where row groups are NOT in sorted order. Reorder row groups by statistics first, then apply the same pruning logic.

Example: File with unordered RGs
         Query: ORDER BY price ASC LIMIT 10

  Original Order:              Reordered by min(price):
  ┌──────────────────┐         ┌──────────────────┐
  │ RG1: price 50-99 │         │ RG3: price 1-30  │ ◄── read, threshold=30
  │ RG2: price 200+  │         │ RG1: price 50-99 │ ◄── min=50 > 30 → PRUNE
  │ RG3: price 1-30  │         │ RG2: price 200+  │ ◄── min=200 > 30 → PRUNE
  └──────────────────┘         └──────────────────┘

  Result: 1 of 3 RGs read

Data flow

  ┌─────────┐    threshold    ┌──────────────────────┐
  │  TopK   │ ──────────────► │ DynamicFilterExpr    │
  │ Operator│                 │ (generation counter) │
  └────┬────┘                 └──────────┬───────────┘
       │                                 │
       │ pulls batches                   │ generation changed?
       │                                 ▼
  ┌────┴─────────────────────────────────────────────┐
  │  PushDecoderStreamState::transition()            │
  │                                                  │
  │  loop {                                          │
  │    // NEW: check before each RG                  │
  │    peek_next_row_group()                         │
  │      → get RG statistics                         │
  │      → evaluate PruningPredicate                 │
  │      → SKIP or READ                             │
  │                                                  │
  │    if READ:                                      │
  │      try_decode() → NeedsData → fetch IO         │
  │      → Data(batch) → return to TopK              │
  │  }                                               │
  └──────────────────────────────────────────────────┘

Implementation approach

Arrow-rs changes (~55 lines):

  • Add peek_next_row_group() and skip_next_row_group() to ParquetPushDecoder
  • Expose parquet_metadata() getter for statistics access

DataFusion changes (~80 lines):

  • Add RowGroupPruner that re-evaluates dynamic filter between row groups
  • Integrate into PushDecoderStreamState::transition() — check before requesting data
  • Reuse existing PruningPredicate infrastructure (same as file-level pruning)
  • Add row_groups_pruned_dynamic metric

Expected benefit

  Scenario: ORDER BY col LIMIT 10, single large file

  ┌────────────┬──────────────┬───────────────────┬───────────────────┐
  │            │ No optimize  │ Phase 1 (reverse)  │ Phase 2 (reorder) │
  ├────────────┼──────────────┼───────────────────┼───────────────────┤
  │ RGs read   │     100      │       1-2          │       2-5         │
  │ RGs skipped│       0      │      98-99         │      95-98        │
  │ IO saved   │       0%     │      ~98%          │      ~95%         │
  └────────────┴──────────────┴───────────────────┴───────────────────┘

  Key: first RG does late materialization (sets TopK threshold),
       all subsequent RGs get early-stopped — just like file pruning.

Sub-issues / tasks

Related issues (sort pushdown optimization series):

cc @alamb @Dandandan @adriangb

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions