Skip to content

Commit d25b053

Browse files
committed
Add multi-column group by benchmark
1 parent dc80bd7 commit d25b053

2 files changed

Lines changed: 224 additions & 0 deletions

File tree

datafusion/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ harness = false
294294
name = "preserve_file_partitioning"
295295
required-features = ["parquet"]
296296

297+
[[bench]]
298+
harness = false
299+
name = "multi_group_by"
300+
required-features = ["parquet"]
301+
297302
[[bench]]
298303
harness = false
299304
name = "reset_plan_states"
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for multi-column GROUP BY performance.
19+
//!
20+
//! Tests the performance of grouping across different cardinality
21+
//! scenarios and column counts. Uses Parquet files so that column
22+
//! statistics (min/max) are available to the optimizer for heuristic
23+
//! decisions about GroupValues implementation selection.
24+
//!
25+
//! The benchmark pre-plans the query and only measures execution time
26+
//! (excludes planning and I/O setup overhead).
27+
28+
use arrow::array::{ArrayRef, Int32Array, RecordBatch};
29+
use arrow::datatypes::{DataType, Field, Schema};
30+
use criterion::{Criterion, criterion_group, criterion_main};
31+
use datafusion::prelude::{SessionConfig, SessionContext};
32+
use parking_lot::Mutex;
33+
use parquet::arrow::ArrowWriter;
34+
use parquet::file::properties::WriterProperties;
35+
use rand::rngs::StdRng;
36+
use rand::{Rng, SeedableRng};
37+
use std::hint::black_box;
38+
use std::sync::Arc;
39+
use tempfile::NamedTempFile;
40+
use tokio::runtime::Runtime;
41+
42+
const NUM_ROWS: usize = 1_000_000;
43+
const BATCH_SIZE: usize = 8192;
44+
45+
fn build_group_by_sql(num_cols: usize) -> String {
46+
let cols: Vec<String> = (0..num_cols).map(|i| format!("col_{i}")).collect();
47+
let col_list = cols.join(", ");
48+
format!("SELECT {col_list} FROM t GROUP BY {col_list}")
49+
}
50+
51+
fn generate_parquet_file(num_cols: usize, cardinality: usize) -> NamedTempFile {
52+
let mut rng = StdRng::seed_from_u64(42);
53+
let fields: Vec<Field> = (0..num_cols)
54+
.map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
55+
.collect();
56+
let schema = Arc::new(Schema::new(fields));
57+
58+
let mut temp_file = tempfile::Builder::new()
59+
.prefix("multi_group_by")
60+
.suffix(".parquet")
61+
.tempfile()
62+
.unwrap();
63+
64+
let props = WriterProperties::builder()
65+
.set_max_row_group_row_count(Some(NUM_ROWS))
66+
.build();
67+
68+
let mut writer =
69+
ArrowWriter::try_new(&mut temp_file, Arc::clone(&schema), Some(props)).unwrap();
70+
71+
let num_batches = NUM_ROWS / BATCH_SIZE;
72+
for _ in 0..num_batches {
73+
let columns: Vec<ArrayRef> = (0..num_cols)
74+
.map(|_| {
75+
let values: Vec<i32> = (0..BATCH_SIZE)
76+
.map(|_| rng.random_range(0..cardinality as i32))
77+
.collect();
78+
Arc::new(Int32Array::from(values)) as ArrayRef
79+
})
80+
.collect();
81+
let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
82+
writer.write(&batch).unwrap();
83+
}
84+
85+
writer.close().unwrap();
86+
temp_file
87+
}
88+
89+
struct BenchContext {
90+
ctx: Arc<Mutex<SessionContext>>,
91+
_temp_file: NamedTempFile,
92+
}
93+
94+
#[expect(clippy::needless_pass_by_value)]
95+
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
96+
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
97+
black_box(rt.block_on(df.collect()).unwrap());
98+
}
99+
100+
fn prepare_context(rt: &Runtime, num_cols: usize, cardinality: usize) -> BenchContext {
101+
let temp_file = generate_parquet_file(num_cols, cardinality);
102+
let path = temp_file.path().to_str().unwrap().to_string();
103+
104+
let config = SessionConfig::new().with_target_partitions(1);
105+
let ctx = SessionContext::new_with_config(config);
106+
rt.block_on(async {
107+
ctx.register_parquet("t", &path, Default::default())
108+
.await
109+
.unwrap();
110+
// Warm the OS page cache
111+
let df = ctx.sql(&build_group_by_sql(num_cols)).await.unwrap();
112+
let _ = df.collect().await.unwrap();
113+
});
114+
115+
BenchContext {
116+
ctx: Arc::new(Mutex::new(ctx)),
117+
_temp_file: temp_file,
118+
}
119+
}
120+
121+
fn criterion_benchmark(c: &mut Criterion) {
122+
let rt = Runtime::new().unwrap();
123+
124+
// === Experiment 1: Fixed ~100-1000 groups, vary column count ===
125+
let b_ctx = prepare_context(&rt, 2, 10); // 10^2 = 100 groups
126+
c.bench_function("fixed_groups_cols_2_grp_100", |b| {
127+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(2)))
128+
});
129+
130+
let b_ctx = prepare_context(&rt, 3, 5); // 5^3 = 125 groups
131+
c.bench_function("fixed_groups_cols_3_grp_125", |b| {
132+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(3)))
133+
});
134+
135+
let b_ctx = prepare_context(&rt, 4, 3); // 3^4 = 81 groups
136+
c.bench_function("fixed_groups_cols_4_grp_81", |b| {
137+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
138+
});
139+
140+
let b_ctx = prepare_context(&rt, 6, 3); // 3^6 = 729 groups
141+
c.bench_function("fixed_groups_cols_6_grp_729", |b| {
142+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(6)))
143+
});
144+
145+
let b_ctx = prepare_context(&rt, 8, 2); // 2^8 = 256 groups
146+
c.bench_function("fixed_groups_cols_8_grp_256", |b| {
147+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(8)))
148+
});
149+
150+
let b_ctx = prepare_context(&rt, 10, 2); // 2^10 = 1024 groups
151+
c.bench_function("fixed_groups_cols_10_grp_1024", |b| {
152+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(10)))
153+
});
154+
155+
// === Experiment 1b: High groups (~1M), vary column count ===
156+
let b_ctx = prepare_context(&rt, 2, 1000); // 1000^2 = 1M groups
157+
c.bench_function("high_groups_cols_2_grp_1M", |b| {
158+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(2)))
159+
});
160+
161+
let b_ctx = prepare_context(&rt, 3, 100); // 100^3 = 1M groups
162+
c.bench_function("high_groups_cols_3_grp_1M", |b| {
163+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(3)))
164+
});
165+
166+
let b_ctx = prepare_context(&rt, 4, 32); // 32^4 = ~1M groups
167+
c.bench_function("high_groups_cols_4_grp_1M", |b| {
168+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
169+
});
170+
171+
let b_ctx = prepare_context(&rt, 6, 10); // 10^6 = 1M groups
172+
c.bench_function("high_groups_cols_6_grp_1M", |b| {
173+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(6)))
174+
});
175+
176+
let b_ctx = prepare_context(&rt, 8, 6); // 6^8 = ~1.7M groups
177+
c.bench_function("high_groups_cols_8_grp_1M", |b| {
178+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(8)))
179+
});
180+
181+
let b_ctx = prepare_context(&rt, 10, 4); // 4^10 = ~1M groups
182+
c.bench_function("high_groups_cols_10_grp_1M", |b| {
183+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(10)))
184+
});
185+
186+
// === Experiment 2: Fixed 4 columns, vary group count ===
187+
let b_ctx = prepare_context(&rt, 4, 2); // 2^4 = 16 groups
188+
c.bench_function("fixed_4cols_grp_16", |b| {
189+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
190+
});
191+
192+
let b_ctx = prepare_context(&rt, 4, 5); // 5^4 = 625 groups
193+
c.bench_function("fixed_4cols_grp_625", |b| {
194+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
195+
});
196+
197+
let b_ctx = prepare_context(&rt, 4, 10); // 10^4 = 10K groups
198+
c.bench_function("fixed_4cols_grp_10000", |b| {
199+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
200+
});
201+
202+
let b_ctx = prepare_context(&rt, 4, 30); // 30^4 = 810K groups
203+
c.bench_function("fixed_4cols_grp_810000", |b| {
204+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
205+
});
206+
207+
let b_ctx = prepare_context(&rt, 4, 100); // 100^4 = 100M groups
208+
c.bench_function("fixed_4cols_grp_100M", |b| {
209+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
210+
});
211+
212+
let b_ctx = prepare_context(&rt, 4, 500); // 500^4 = 62.5B groups
213+
c.bench_function("fixed_4cols_grp_62B", |b| {
214+
b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4)))
215+
});
216+
}
217+
218+
criterion_group!(benches, criterion_benchmark);
219+
criterion_main!(benches);

0 commit comments

Comments
 (0)