Skip to content

Commit 9f4b78a

Browse files
nathanb9Nathan Bezualemclaude
authored
Benchmark multi-column GROUP BY performance (#22322)
## Which issue does this PR close? - Measuring: #17850 ## Rationale for this change 1. Measure multi-column GROUP BY performance with a fair apples-to-apples comparison 2. The default vectorized per-column approach (`GroupValuesColumn`) underperforms the row-based approach (`GroupValuesRows`) when distinct group count is small relative to input rows. The benchmark confirms this: row-based is 16-19% faster below ~200K groups, while vectorized wins by 15-33% above ~500K groups. ## What changes are included in this PR? Adds a benchmark in `datafusion/physical-plan/benches/multi_group_by.rs` that directly calls `GroupValues::intern()` with identical Int32 data for both implementations — no SQL/planning/IO overhead, same schema, same hashing. Makes `mod row` public so the benchmark can instantiate `GroupValuesRows` directly. Test cases: - **Issue #17850 reproduction** (3 cols, 64 groups, 1M-50M rows) — confirms row-based wins ~16-19% - **Low cardinality sweep** (8-4096 groups, 3-4 cols) — row-based wins 15-38% - **Batch size sensitivity** (1K-32K) — minimal effect on ratio - **Column scaling with low groups** (2-10 cols) — row-based advantage grows with columns - **High cardinality scaling** (1M groups, 2-10 cols) — vectorized wins 22-43% - **Group count sweep** (16 to 1M groups, 4 cols) — crossover at ~200K-500K groups ## Are these changes tested? - `cargo fmt --all` - `cargo clippy -p datafusion-physical-plan --bench multi_group_by -- -D warnings` - `cargo bench -p datafusion-physical-plan --bench multi_group_by` ## Are there any user-facing changes? No. This adds a benchmark only. --------- Co-authored-by: Nathan Bezualem <nbez@amazon.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c20f245 commit 9f4b78a

4 files changed

Lines changed: 363 additions & 3 deletions

File tree

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,7 @@ required-features = ["test_utils"]
121121
[[bench]]
122122
harness = false
123123
name = "dictionary_group_values"
124+
125+
[[bench]]
126+
harness = false
127+
name = "multi_group_by"
Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for multi-column GROUP BY performance comparing vectorized
19+
//! (`GroupValuesColumn`) vs row-based (`GroupValuesRows`) implementations.
20+
//!
21+
//! Motivated by <https://github.com/apache/datafusion/issues/17850> which
22+
//! showed vectorized can regress for low-cardinality, high-row-count scenarios.
23+
//!
24+
//! Uses the direct `GroupValues::intern()` API with identical Int32 data for
25+
//! both implementations — a fair apples-to-apples comparison with the same
26+
//! hashing and data layout.
27+
28+
use arrow::array::{ArrayRef, Int32Array};
29+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
30+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
31+
use datafusion_physical_plan::aggregates::group_values::GroupValues;
32+
use datafusion_physical_plan::aggregates::group_values::GroupValuesRows;
33+
use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupValuesColumn;
34+
use std::hint::black_box;
35+
use std::sync::Arc;
36+
37+
const DEFAULT_BATCH_SIZE: usize = 8192;
38+
39+
fn make_schema(num_cols: usize) -> SchemaRef {
40+
let fields: Vec<Field> = (0..num_cols)
41+
.map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
42+
.collect();
43+
Arc::new(Schema::new(fields))
44+
}
45+
46+
fn generate_batches(
47+
num_cols: usize,
48+
num_distinct_groups: usize,
49+
num_rows: usize,
50+
batch_size: usize,
51+
) -> Vec<Vec<ArrayRef>> {
52+
let per_col_card = (num_distinct_groups as f64)
53+
.powf(1.0 / num_cols as f64)
54+
.ceil() as usize;
55+
56+
let num_full_batches = num_rows / batch_size;
57+
let remainder = num_rows % batch_size;
58+
let num_batches = num_full_batches + if remainder > 0 { 1 } else { 0 };
59+
60+
(0..num_batches)
61+
.map(|batch_idx| {
62+
let batch_start = batch_idx * batch_size;
63+
let current_batch_size = if batch_idx == num_batches - 1 && remainder > 0 {
64+
remainder
65+
} else {
66+
batch_size
67+
};
68+
(0..num_cols)
69+
.map(|col_idx| {
70+
let values: Vec<i32> = (0..current_batch_size)
71+
.map(|row| {
72+
let global_row = batch_start + row;
73+
let group_id = global_row % num_distinct_groups;
74+
let divisor = per_col_card.pow(col_idx as u32);
75+
((group_id / divisor) % per_col_card) as i32
76+
})
77+
.collect();
78+
Arc::new(Int32Array::from(values)) as ArrayRef
79+
})
80+
.collect()
81+
})
82+
.collect()
83+
}
84+
85+
fn create_group_values(schema: &SchemaRef, vectorized: bool) -> Box<dyn GroupValues> {
86+
if vectorized {
87+
Box::new(GroupValuesColumn::<false>::try_new(Arc::clone(schema)).unwrap())
88+
} else {
89+
Box::new(GroupValuesRows::try_new(Arc::clone(schema)).unwrap())
90+
}
91+
}
92+
93+
fn bench_intern(
94+
gv: &mut Box<dyn GroupValues>,
95+
batches: &[Vec<ArrayRef>],
96+
groups: &mut Vec<usize>,
97+
) {
98+
for batch in batches {
99+
groups.clear();
100+
gv.intern(batch, groups).unwrap();
101+
}
102+
black_box(&*groups);
103+
}
104+
105+
/// Experiment 1: Issue #17850 regression scenario.
106+
/// 3 columns, 64 groups (4^3), scaling row count.
107+
fn bench_issue_17850_regression(c: &mut Criterion) {
108+
let mut group = c.benchmark_group("issue_17850_regression");
109+
group.sample_size(10);
110+
111+
let num_cols = 3;
112+
let num_groups = 64;
113+
let schema = make_schema(num_cols);
114+
115+
for num_rows in [1_000_000, 5_000_000, 10_000_000, 20_000_000, 50_000_000] {
116+
let batches =
117+
generate_batches(num_cols, num_groups, num_rows, DEFAULT_BATCH_SIZE);
118+
119+
for vectorized in [true, false] {
120+
let label = if vectorized {
121+
"vectorized"
122+
} else {
123+
"row_based"
124+
};
125+
group.bench_with_input(
126+
BenchmarkId::new(label, format!("{num_rows}_rows")),
127+
&batches,
128+
|b, batches| {
129+
b.iter_batched_ref(
130+
|| {
131+
(
132+
create_group_values(&schema, vectorized),
133+
Vec::<usize>::with_capacity(DEFAULT_BATCH_SIZE),
134+
)
135+
},
136+
|(gv, groups)| bench_intern(gv, batches, groups),
137+
criterion::BatchSize::LargeInput,
138+
);
139+
},
140+
);
141+
}
142+
}
143+
group.finish();
144+
}
145+
146+
/// Experiment 2: Low cardinality sweep.
147+
fn bench_low_cardinality(c: &mut Criterion) {
148+
let mut group = c.benchmark_group("low_cardinality");
149+
group.sample_size(15);
150+
151+
for (num_cols, per_col_card) in
152+
[(3usize, 2usize), (3, 4), (3, 8), (4, 2), (4, 4), (4, 8)]
153+
{
154+
let num_groups = per_col_card.pow(num_cols as u32);
155+
let schema = make_schema(num_cols);
156+
let batches =
157+
generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE);
158+
159+
for vectorized in [true, false] {
160+
let label = if vectorized {
161+
"vectorized"
162+
} else {
163+
"row_based"
164+
};
165+
group.bench_with_input(
166+
BenchmarkId::new(
167+
label,
168+
format!("cols_{num_cols}_card_{per_col_card}_grp_{num_groups}"),
169+
),
170+
&batches,
171+
|b, batches| {
172+
b.iter_batched_ref(
173+
|| {
174+
(
175+
create_group_values(&schema, vectorized),
176+
Vec::<usize>::with_capacity(DEFAULT_BATCH_SIZE),
177+
)
178+
},
179+
|(gv, groups)| bench_intern(gv, batches, groups),
180+
criterion::BatchSize::LargeInput,
181+
);
182+
},
183+
);
184+
}
185+
}
186+
group.finish();
187+
}
188+
189+
/// Experiment 3: Batch size sensitivity.
190+
fn bench_batch_size_sensitivity(c: &mut Criterion) {
191+
let mut group = c.benchmark_group("batch_size_sensitivity");
192+
group.sample_size(10);
193+
194+
let num_cols = 3;
195+
let num_groups = 64;
196+
let schema = make_schema(num_cols);
197+
198+
for batch_size in [1024, 4096, 8192, 16384, 32768] {
199+
let batches = generate_batches(num_cols, num_groups, 1_000_000, batch_size);
200+
201+
for vectorized in [true, false] {
202+
let label = if vectorized {
203+
"vectorized"
204+
} else {
205+
"row_based"
206+
};
207+
group.bench_with_input(
208+
BenchmarkId::new(label, format!("batch_{batch_size}")),
209+
&batches,
210+
|b, batches| {
211+
b.iter_batched_ref(
212+
|| {
213+
(
214+
create_group_values(&schema, vectorized),
215+
Vec::<usize>::with_capacity(batch_size),
216+
)
217+
},
218+
|(gv, groups)| bench_intern(gv, batches, groups),
219+
criterion::BatchSize::LargeInput,
220+
);
221+
},
222+
);
223+
}
224+
}
225+
group.finish();
226+
}
227+
228+
/// Experiment 4: Column count scaling with low groups.
229+
fn bench_column_scaling(c: &mut Criterion) {
230+
let mut group = c.benchmark_group("column_scaling");
231+
group.sample_size(15);
232+
233+
let cases: &[(usize, usize)] =
234+
&[(2, 100), (3, 125), (4, 81), (6, 729), (8, 256), (10, 1024)];
235+
236+
for &(num_cols, num_groups) in cases {
237+
let schema = make_schema(num_cols);
238+
let batches =
239+
generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE);
240+
241+
for vectorized in [true, false] {
242+
let label = if vectorized {
243+
"vectorized"
244+
} else {
245+
"row_based"
246+
};
247+
group.bench_with_input(
248+
BenchmarkId::new(label, format!("cols_{num_cols}_grp_{num_groups}")),
249+
&batches,
250+
|b, batches| {
251+
b.iter_batched_ref(
252+
|| {
253+
(
254+
create_group_values(&schema, vectorized),
255+
Vec::<usize>::with_capacity(DEFAULT_BATCH_SIZE),
256+
)
257+
},
258+
|(gv, groups)| bench_intern(gv, batches, groups),
259+
criterion::BatchSize::LargeInput,
260+
);
261+
},
262+
);
263+
}
264+
}
265+
group.finish();
266+
}
267+
268+
/// Experiment 5: High cardinality column scaling (~1M groups).
269+
fn bench_high_cardinality_scaling(c: &mut Criterion) {
270+
let mut group = c.benchmark_group("high_cardinality_scaling");
271+
group.sample_size(10);
272+
273+
for num_cols in [2, 3, 4, 6, 8, 10] {
274+
let num_groups = 1_000_000;
275+
let schema = make_schema(num_cols);
276+
let batches =
277+
generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE);
278+
279+
for vectorized in [true, false] {
280+
let label = if vectorized {
281+
"vectorized"
282+
} else {
283+
"row_based"
284+
};
285+
group.bench_with_input(
286+
BenchmarkId::new(label, format!("cols_{num_cols}_grp_1M")),
287+
&batches,
288+
|b, batches| {
289+
b.iter_batched_ref(
290+
|| {
291+
(
292+
create_group_values(&schema, vectorized),
293+
Vec::<usize>::with_capacity(DEFAULT_BATCH_SIZE),
294+
)
295+
},
296+
|(gv, groups)| bench_intern(gv, batches, groups),
297+
criterion::BatchSize::LargeInput,
298+
);
299+
},
300+
);
301+
}
302+
}
303+
group.finish();
304+
}
305+
306+
/// Experiment 6: Group count sweep with fixed 4 columns.
307+
fn bench_group_count_sweep(c: &mut Criterion) {
308+
let mut group = c.benchmark_group("group_count_sweep");
309+
group.sample_size(15);
310+
311+
let num_cols = 4;
312+
let schema = make_schema(num_cols);
313+
314+
for num_groups in [
315+
16, 64, 256, 1000, 5000, 10_000, 50_000, 100_000, 500_000, 1_000_000,
316+
] {
317+
let batches =
318+
generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE);
319+
320+
for vectorized in [true, false] {
321+
let label = if vectorized {
322+
"vectorized"
323+
} else {
324+
"row_based"
325+
};
326+
group.bench_with_input(
327+
BenchmarkId::new(label, format!("grp_{num_groups}")),
328+
&batches,
329+
|b, batches| {
330+
b.iter_batched_ref(
331+
|| {
332+
(
333+
create_group_values(&schema, vectorized),
334+
Vec::<usize>::with_capacity(DEFAULT_BATCH_SIZE),
335+
)
336+
},
337+
|(gv, groups)| bench_intern(gv, batches, groups),
338+
criterion::BatchSize::LargeInput,
339+
);
340+
},
341+
);
342+
}
343+
}
344+
group.finish();
345+
}
346+
347+
criterion_group!(
348+
benches,
349+
bench_issue_17850_regression,
350+
bench_low_cardinality,
351+
bench_batch_size_sensitivity,
352+
bench_column_scaling,
353+
bench_high_cardinality_scaling,
354+
bench_group_count_sweep,
355+
);
356+
criterion_main!(benches);

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ use datafusion_expr::EmitTo;
3131
pub mod multi_group_by;
3232

3333
mod row;
34+
pub use row::GroupValuesRows;
3435
mod single_group_by;
3536
use datafusion_physical_expr::binary_map::OutputType;
3637
use multi_group_by::GroupValuesColumn;
37-
use row::GroupValuesRows;
3838

3939
pub(crate) use single_group_by::primitive::HashValue;
4040

@@ -130,7 +130,7 @@ pub trait GroupValues: Send {
130130
///
131131
/// `GroupColumn`: crate::aggregates::group_values::multi_group_by::GroupColumn
132132
/// `GroupValuesColumn`: crate::aggregates::group_values::multi_group_by::GroupValuesColumn
133-
/// `GroupValuesRows`: crate::aggregates::group_values::row::GroupValuesRows
133+
/// `GroupValuesRows`: crate::aggregates::group_values::GroupValuesRows
134134
pub fn new_group_values(
135135
schema: SchemaRef,
136136
group_ordering: &GroupOrdering,

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
225225
/// more general purpose [`GroupValuesRows`]. See the ticket for details:
226226
/// <https://github.com/apache/datafusion/pull/12269>
227227
///
228-
/// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows
228+
/// [`GroupValuesRows`]: crate::aggregates::group_values::GroupValuesRows
229229
group_values: Vec<Box<dyn GroupColumn>>,
230230

231231
/// reused buffer to store hashes

0 commit comments

Comments
 (0)