combine #21351 + #21580 for benchmarking (draft, do not merge)#21731
combine #21351 + #21580 for benchmarking (draft, do not merge)#21731Dandandan wants to merge 19 commits intoapache:mainfrom
Conversation
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes apache#21317
For overlapping row group ranges, sorting by min for DESC can pick a worse first RG. Example: RG0(50-60) vs RG1(40-100) — min DESC picks RG0 first (max=60), but RG1 contains the largest values (max=100). Use min for ASC and max for DESC to correctly prioritize the row group most likely to contain the optimal values for TopK.
When sort_order_for_reorder is set, reorder_by_statistics already handles the sort direction (min for ASC, max for DESC). Applying reverse on top would undo the reorder. Use else-if so only one strategy is applied. Also adds sort_pushdown_inexact benchmark with pushdown_filters enabled to measure RG reorder benefit on wide-row TopK queries.
Address review feedback: extract row group optimization into a trait instead of post-pass flags. prepare_with_optimizer() integrates the optimization into the access plan preparation step. - AccessPlanOptimizer trait with ReverseRowGroups and ReorderByStatistics - prepare_with_optimizer() applies optimizer inside prepare flow - Original prepare() unchanged for backward compatibility
…lly exclusive Previously reorder_by_statistics and reverse_row_groups were mutually exclusive (else-if). This meant DESC queries on unsorted data could only get one optimization. Now they compose: reorder always sorts RGs by min ASC, then reverse flips for DESC. This ensures correct results for both sorted and unsorted inputs without regression. Also removes prepare_with_optimizer in favor of calling optimize() directly on each optimizer, and simplifies reorder_by_statistics to always use min ASC (direction handled by reverse).
…e#21351 (dynamic work scheduling) Conflict resolution in datafusion/datasource/src/source.rs: kept the as_any() method on DataSource (from apache#21351) instead of the Any supertrait (from apache#21580). Restored as_any() impls on FileScanConfig and MemorySourceConfig, added missing std::any::Any imports, and switched dyn DataSource::{is,downcast_ref} to use as_any().
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing combine-21351-21580 (e195044) to afc0784 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing combine-21351-21580 (e195044) to afc0784 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing combine-21351-21580 (e195044) to afc0784 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (e195044) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (e195044) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (e195044) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing combine-21351-21580 (e195044) to afc0784 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing combine-21351-21580 (e195044) to afc0784 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing combine-21351-21580 (e195044) to afc0784 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Purpose
Test branch combining:
The goal is to measure whether the two optimizations compound on TopK-style queries (e.g.
ORDER BY col LIMIT Non multi-file / multi-RG parquet), since they operate at different granularities:Conflict resolution
Only one real conflict:
datafusion/datasource/src/source.rs.#21580 had been rebased on top of upstream #21576 (which removes the explicit
as_anymethod onDataSourcein favor of anAnysupertrait bound). #21351's base predates #21576, so it still uses the explicitas_anymethod.Resolved in favor of #21351's style:
DataSource: Send + Sync + Debugwith anas_any(&self) -> &dyn Anymethod.as_anyimpls onFileScanConfigandMemorySourceConfig.use std::any::Anyimports infile_scan_config/mod.rsandmemory.rs.dyn DataSource::{is, downcast_ref}helpers to callself.as_any()(with aT: 'staticbound).Status
Draft — not for merge. This is an integration branch for benchmarking. The two PRs should be reviewed and merged independently upstream.
cargo check --workspacepassescargo testnot yet runFollow-ups