Skip to content

Commit b41eddf

Browse files
zhuqi-lucasclaude
andcommitted
feat: add sort pushdown benchmark and SLT tests
Add benchmark and integration tests for sort pushdown optimization as a precursor to the core optimization PR (#21182). Benchmark: new `sort-pushdown` subcommand with 4 queries testing sort elimination (ASC full scan, ASC LIMIT, wide full, wide LIMIT). SLT tests (5 new groups): - Test A: Non-overlapping files + WITH ORDER → Sort eliminated - Test B: Overlapping files → SortExec retained - Test C: LIMIT queries (ASC sort elimination + DESC reverse scan) - Test D: target_partitions=2 → SPM + per-partition sort elimination - Test E: Inferred ordering from Parquet metadata (no WITH ORDER) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a910b03 commit b41eddf

File tree

5 files changed

+798
-1
lines changed

5 files changed

+798
-1
lines changed

benchmarks/bench.sh

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
106106
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
107107
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
108108
109+
# Sort Pushdown Benchmarks
110+
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
111+
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
112+
109113
# Sorted Data Benchmarks (ORDER BY Optimization)
110114
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
111115
@@ -309,6 +313,10 @@ main() {
309313
# same data as for tpch
310314
data_tpch "1" "parquet"
311315
;;
316+
sort_pushdown|sort_pushdown_sorted)
317+
# same data as for tpch
318+
data_tpch "1" "parquet"
319+
;;
312320
sort_tpch)
313321
# same data as for tpch
314322
data_tpch "1" "parquet"
@@ -509,6 +517,12 @@ main() {
509517
external_aggr)
510518
run_external_aggr
511519
;;
520+
sort_pushdown)
521+
run_sort_pushdown
522+
;;
523+
sort_pushdown_sorted)
524+
run_sort_pushdown_sorted
525+
;;
512526
sort_tpch)
513527
run_sort_tpch "1"
514528
;;
@@ -1070,6 +1084,22 @@ run_external_aggr() {
10701084
debug_run $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
10711085
}
10721086

1087+
# Runs the sort pushdown benchmark (without WITH ORDER)
1088+
run_sort_pushdown() {
1089+
TPCH_DIR="${DATA_DIR}/tpch_sf1"
1090+
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown.json"
1091+
echo "Running sort pushdown benchmark (no WITH ORDER)..."
1092+
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
1093+
}
1094+
1095+
# Runs the sort pushdown benchmark with WITH ORDER (enables sort elimination)
1096+
run_sort_pushdown_sorted() {
1097+
TPCH_DIR="${DATA_DIR}/tpch_sf1"
1098+
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_sorted.json"
1099+
echo "Running sort pushdown benchmark (with WITH ORDER)..."
1100+
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
1101+
}
1102+
10731103
# Runs the sort integration benchmark
10741104
run_sort_tpch() {
10751105
SCALE_FACTOR=$1

benchmarks/src/bin/dfbench.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3434
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
3535

3636
use datafusion_benchmarks::{
37-
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
37+
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, sort_tpch, tpcds,
38+
tpch,
3839
};
3940

4041
#[derive(Debug, Parser)]
@@ -53,6 +54,7 @@ enum Options {
5354
Imdb(imdb::RunOpt),
5455
Nlj(nlj::RunOpt),
5556
Smj(smj::RunOpt),
57+
SortPushdown(sort_pushdown::RunOpt),
5658
SortTpch(sort_tpch::RunOpt),
5759
Tpch(tpch::RunOpt),
5860
Tpcds(tpcds::RunOpt),
@@ -72,6 +74,7 @@ pub async fn main() -> Result<()> {
7274
Options::Imdb(opt) => Box::pin(opt.run()).await,
7375
Options::Nlj(opt) => opt.run().await,
7476
Options::Smj(opt) => opt.run().await,
77+
Options::SortPushdown(opt) => opt.run().await,
7578
Options::SortTpch(opt) => opt.run().await,
7679
Options::Tpch(opt) => Box::pin(opt.run()).await,
7780
Options::Tpcds(opt) => Box::pin(opt.run()).await,

benchmarks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod hj;
2323
pub mod imdb;
2424
pub mod nlj;
2525
pub mod smj;
26+
pub mod sort_pushdown;
2627
pub mod sort_tpch;
2728
pub mod tpcds;
2829
pub mod tpch;

benchmarks/src/sort_pushdown.rs

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmark for sort pushdown optimization.
19+
//!
20+
//! Tests performance of sort elimination when files are non-overlapping and
21+
//! internally sorted (declared via `--sorted` / `WITH ORDER`).
22+
//!
23+
//! # Usage
24+
//!
25+
//! ```text
26+
//! # Prepare sorted TPCH lineitem data (SF=1)
27+
//! ./bench.sh data sort_pushdown
28+
//!
29+
//! # Baseline (no WITH ORDER, full SortExec)
30+
//! ./bench.sh run sort_pushdown
31+
//!
32+
//! # With sort elimination (WITH ORDER, SortExec removed)
33+
//! ./bench.sh run sort_pushdown_sorted
34+
//! ```
35+
//!
36+
//! # Reference Results
37+
//!
38+
//! Measured on 300k rows, 8 non-overlapping sorted parquet files, single partition,
39+
//! debug build (results vary by hardware; relative speedup is the key metric):
40+
//!
41+
//! ```text
42+
//! Query | Description | baseline (ms) | sort eliminated (ms) | speedup
43+
//! ------|----------------------|---------------|---------------------|--------
44+
//! Q1 | ASC full scan | 159 | 91 | 43%
45+
//! Q2 | ASC LIMIT 100 | 36 | 12 | 67%
46+
//! Q3 | ASC full (wide, *) | 487 | 333 | 31%
47+
//! Q4 | ASC LIMIT 100 (wide) | 119 | 30 | 74%
48+
//! ```
49+
//!
50+
//! Key observations:
51+
//! - **LIMIT queries benefit most** (67-74%): sort elimination + limit pushdown
52+
//! means only the first few rows are read before stopping.
53+
//! - **Full scans** (31-43%): saving comes from eliminating the O(n log n) sort
54+
//! step entirely.
55+
//! - **Wide projections** amplify the benefit: larger rows make sorting more
56+
//! expensive, so eliminating it saves more.
57+
58+
use clap::Args;
59+
use futures::StreamExt;
60+
use std::path::PathBuf;
61+
use std::sync::Arc;
62+
63+
use datafusion::datasource::TableProvider;
64+
use datafusion::datasource::file_format::parquet::ParquetFormat;
65+
use datafusion::datasource::listing::{
66+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
67+
};
68+
use datafusion::error::Result;
69+
use datafusion::execution::SessionStateBuilder;
70+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
71+
use datafusion::physical_plan::{displayable, execute_stream};
72+
use datafusion::prelude::*;
73+
use datafusion_common::DEFAULT_PARQUET_EXTENSION;
74+
use datafusion_common::instant::Instant;
75+
76+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};
77+
78+
#[derive(Debug, Args)]
79+
pub struct RunOpt {
80+
/// Common options
81+
#[command(flatten)]
82+
common: CommonOpt,
83+
84+
/// Sort pushdown query number. If not specified, runs all queries
85+
#[arg(short, long)]
86+
pub query: Option<usize>,
87+
88+
/// Path to data files (lineitem). Only parquet format is supported.
89+
/// Data should be pre-sorted by l_orderkey ASC for meaningful results.
90+
#[arg(required = true, short = 'p', long = "path")]
91+
path: PathBuf,
92+
93+
/// Path to JSON benchmark result to be compared using `compare.py`
94+
#[arg(short = 'o', long = "output")]
95+
output_path: Option<PathBuf>,
96+
97+
/// Mark the first column (l_orderkey) as sorted via WITH ORDER.
98+
/// When set, enables sort elimination for matching queries.
99+
#[arg(short = 't', long = "sorted")]
100+
sorted: bool,
101+
}
102+
103+
pub const SORT_PUSHDOWN_QUERY_START_ID: usize = 1;
104+
pub const SORT_PUSHDOWN_QUERY_END_ID: usize = 4;
105+
106+
impl RunOpt {
107+
const TABLES: [&'static str; 1] = ["lineitem"];
108+
109+
/// Queries benchmarking sort elimination when files are non-overlapping
110+
/// and internally sorted (WITH ORDER declared via `--sorted`).
111+
///
112+
/// With `--sorted`: ParquetSource returns Exact, files are verified
113+
/// non-overlapping by statistics → SortExec eliminated, no SPM needed
114+
/// for single partition.
115+
///
116+
/// Without `--sorted`: baseline with full SortExec.
117+
const QUERIES: [&'static str; 4] = [
118+
// Q1: Sort elimination — full scan
119+
// With --sorted: SortExec removed, sequential scan in file order
120+
// Without --sorted: full SortExec required
121+
r#"
122+
SELECT l_orderkey, l_partkey, l_suppkey
123+
FROM lineitem
124+
ORDER BY l_orderkey
125+
"#,
126+
// Q2: Sort elimination + limit pushdown
127+
// With --sorted: SortExec removed + limit pushed to DataSourceExec
128+
// → reads only first ~100 rows then stops
129+
// Without --sorted: TopK sort over all data
130+
r#"
131+
SELECT l_orderkey, l_partkey, l_suppkey
132+
FROM lineitem
133+
ORDER BY l_orderkey
134+
LIMIT 100
135+
"#,
136+
// Q3: Sort elimination — wide projection (all columns)
137+
// Tests sort elimination benefit with larger row payload
138+
r#"
139+
SELECT *
140+
FROM lineitem
141+
ORDER BY l_orderkey
142+
"#,
143+
// Q4: Sort elimination + limit — wide projection
144+
r#"
145+
SELECT *
146+
FROM lineitem
147+
ORDER BY l_orderkey
148+
LIMIT 100
149+
"#,
150+
];
151+
152+
pub async fn run(&self) -> Result<()> {
153+
let mut benchmark_run = BenchmarkRun::new();
154+
155+
let query_range = match self.query {
156+
Some(query_id) => query_id..=query_id,
157+
None => SORT_PUSHDOWN_QUERY_START_ID..=SORT_PUSHDOWN_QUERY_END_ID,
158+
};
159+
160+
for query_id in query_range {
161+
benchmark_run.start_new_case(&format!("{query_id}"));
162+
163+
let query_results = self.benchmark_query(query_id).await;
164+
match query_results {
165+
Ok(query_results) => {
166+
for iter in query_results {
167+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
168+
}
169+
}
170+
Err(e) => {
171+
benchmark_run.mark_failed();
172+
eprintln!("Query {query_id} failed: {e}");
173+
}
174+
}
175+
}
176+
177+
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
178+
benchmark_run.maybe_print_failures();
179+
Ok(())
180+
}
181+
182+
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
183+
let config = self.common.config()?;
184+
let rt = self.common.build_runtime()?;
185+
let state = SessionStateBuilder::new()
186+
.with_config(config)
187+
.with_runtime_env(rt)
188+
.with_default_features()
189+
.build();
190+
let ctx = SessionContext::from(state);
191+
192+
self.register_tables(&ctx).await?;
193+
194+
let mut millis = vec![];
195+
let mut query_results = vec![];
196+
for i in 0..self.iterations() {
197+
let start = Instant::now();
198+
199+
let query_idx = query_id - 1;
200+
let sql = Self::QUERIES[query_idx].to_string();
201+
let row_count = self.execute_query(&ctx, sql.as_str()).await?;
202+
203+
let elapsed = start.elapsed();
204+
let ms = elapsed.as_secs_f64() * 1000.0;
205+
millis.push(ms);
206+
207+
println!(
208+
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
209+
);
210+
query_results.push(QueryResult { elapsed, row_count });
211+
}
212+
213+
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
214+
println!("Query {query_id} avg time: {avg:.2} ms");
215+
216+
print_memory_stats();
217+
218+
Ok(query_results)
219+
}
220+
221+
async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
222+
for table in Self::TABLES {
223+
let table_provider = self.get_table(ctx, table).await?;
224+
ctx.register_table(table, table_provider)?;
225+
}
226+
Ok(())
227+
}
228+
229+
async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<usize> {
230+
let debug = self.common.debug;
231+
let plan = ctx.sql(sql).await?;
232+
let (state, plan) = plan.into_parts();
233+
234+
if debug {
235+
println!("=== Logical plan ===\n{plan}\n");
236+
}
237+
238+
let plan = state.optimize(&plan)?;
239+
if debug {
240+
println!("=== Optimized logical plan ===\n{plan}\n");
241+
}
242+
let physical_plan = state.create_physical_plan(&plan).await?;
243+
if debug {
244+
println!(
245+
"=== Physical plan ===\n{}\n",
246+
displayable(physical_plan.as_ref()).indent(true)
247+
);
248+
}
249+
250+
let mut row_count = 0;
251+
let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?;
252+
while let Some(batch) = stream.next().await {
253+
row_count += batch?.num_rows();
254+
}
255+
256+
if debug {
257+
println!(
258+
"=== Physical plan with metrics ===\n{}\n",
259+
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
260+
.indent(true)
261+
);
262+
}
263+
264+
Ok(row_count)
265+
}
266+
267+
async fn get_table(
268+
&self,
269+
ctx: &SessionContext,
270+
table: &str,
271+
) -> Result<Arc<dyn TableProvider>> {
272+
let path = self.path.to_str().unwrap();
273+
let state = ctx.state();
274+
let path = format!("{path}/{table}");
275+
let format = Arc::new(
276+
ParquetFormat::default()
277+
.with_options(ctx.state().table_options().parquet.clone()),
278+
);
279+
let extension = DEFAULT_PARQUET_EXTENSION;
280+
281+
let options = ListingOptions::new(format)
282+
.with_file_extension(extension)
283+
.with_collect_stat(true); // Always collect statistics for sort pushdown
284+
285+
let table_path = ListingTableUrl::parse(path)?;
286+
let schema = options.infer_schema(&state, &table_path).await?;
287+
let options = if self.sorted {
288+
// Declare the first column (l_orderkey) as sorted
289+
let key_column_name = schema.fields()[0].name();
290+
options
291+
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
292+
} else {
293+
options
294+
};
295+
296+
let config = ListingTableConfig::new(table_path)
297+
.with_listing_options(options)
298+
.with_schema(schema);
299+
300+
Ok(Arc::new(ListingTable::try_new(config)?))
301+
}
302+
303+
fn iterations(&self) -> usize {
304+
self.common.iterations
305+
}
306+
}

0 commit comments

Comments
 (0)