Skip to content

Commit e3355de

Browse files
committed
Add benchmarks for dictionary path of new_group_values
1 parent 37cd3de commit e3355de

2 files changed

Lines changed: 179 additions & 0 deletions

File tree

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,7 @@ required-features = ["test_utils"]
106106
harness = false
107107
name = "aggregate_vectorized"
108108
required-features = ["test_utils"]
109+
110+
[[bench]]
111+
harness = false
112+
name = "dictionary_group_values"
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for `GroupValues` over a single `Dictionary<Int32, Utf8>`
19+
//! column. Each iteration is timed end-to-end: it constructs the
20+
//! `Box<dyn GroupValues>` returned by `new_group_values`, runs `intern`
21+
//! once (or N times), and then `emit(EmitTo::All)`.
22+
23+
use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
24+
use arrow::buffer::{Buffer, NullBuffer};
25+
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
26+
use criterion::{
27+
BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main,
28+
};
29+
use datafusion_expr::EmitTo;
30+
use datafusion_physical_plan::aggregates::group_values::new_group_values;
31+
use datafusion_physical_plan::aggregates::order::GroupOrdering;
32+
use rand::rngs::StdRng;
33+
use rand::seq::SliceRandom;
34+
use rand::{Rng, SeedableRng};
35+
use std::hint::black_box;
36+
use std::sync::Arc;
37+
38+
const SIZES: [usize; 2] = [8 * 1024, 64 * 1024];
39+
const CARDS_RELATIVE: [usize; 4] = [20, 75, 300, 8 * 1024];
40+
const N_BATCHES: usize = 4;
41+
// Fixed for reproducibility.
42+
const SEED: u64 = 0xD1C7;
43+
44+
fn dict_schema() -> SchemaRef {
45+
let dict_ty =
46+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
47+
Arc::new(Schema::new(vec![Field::new("g", dict_ty, true)]))
48+
}
49+
50+
/// Build a `Dictionary<Int32, Utf8>` column.
51+
fn make_dict(size: usize, cardinality: usize, null_density: f32, seed: u64) -> ArrayRef {
52+
let strings: Vec<String> = (0..cardinality).map(|i| format!("v_{i:08}")).collect();
53+
let values = Arc::new(StringArray::from(
54+
strings.iter().map(String::as_str).collect::<Vec<_>>(),
55+
));
56+
57+
let mut rng = StdRng::seed_from_u64(seed);
58+
let keys: Vec<i32> = if cardinality == size {
59+
let mut perm: Vec<i32> = (0..size as i32).collect();
60+
perm.shuffle(&mut rng);
61+
perm
62+
} else {
63+
(0..size)
64+
.map(|_| rng.random_range(0..cardinality) as i32)
65+
.collect()
66+
};
67+
let keys_buf = Buffer::from_slice_ref(&keys);
68+
69+
let nulls: Option<NullBuffer> = (null_density > 0.0).then(|| {
70+
(0..size)
71+
.map(|_| !rng.random_bool(null_density as f64))
72+
.collect()
73+
});
74+
75+
let key_array = PrimitiveArray::<Int32Type>::new(keys_buf.into(), nulls);
76+
Arc::new(DictionaryArray::<Int32Type>::try_new(key_array, values).unwrap())
77+
}
78+
79+
fn bench_id(
80+
label: &str,
81+
size: usize,
82+
cardinality: usize,
83+
null_density: f32,
84+
) -> BenchmarkId {
85+
BenchmarkId::new(
86+
label,
87+
format!("size_{size}_card_{cardinality}_null_{null_density:.2}"),
88+
)
89+
}
90+
91+
fn bench_intern_emit(c: &mut Criterion) {
92+
let mut group = c.benchmark_group("dict_intern_emit");
93+
let schema = dict_schema();
94+
let null_density = 0.0;
95+
96+
for &size in &SIZES {
97+
let mut cards = CARDS_RELATIVE.to_vec();
98+
cards.push(size); // all-unique stress case
99+
for cardinality in cards {
100+
let array = make_dict(size, cardinality, null_density, SEED);
101+
group.throughput(Throughput::Elements(size as u64));
102+
group.bench_function(
103+
bench_id("intern_emit", size, cardinality, null_density),
104+
|b| {
105+
b.iter_batched_ref(
106+
|| {
107+
(
108+
new_group_values(schema.clone(), &GroupOrdering::None)
109+
.unwrap(),
110+
Vec::<usize>::with_capacity(size),
111+
)
112+
},
113+
|(gv, groups)| {
114+
gv.intern(std::slice::from_ref(&array), groups).unwrap();
115+
black_box(&*groups);
116+
black_box(gv.emit(EmitTo::All).unwrap());
117+
},
118+
BatchSize::SmallInput,
119+
);
120+
},
121+
);
122+
}
123+
}
124+
group.finish();
125+
}
126+
127+
fn bench_repeated_intern_emit(c: &mut Criterion) {
128+
let mut group = c.benchmark_group("dict_repeated_intern_emit");
129+
let schema = dict_schema();
130+
let null_density = 0.10;
131+
132+
for &size in &SIZES {
133+
let mut cards = CARDS_RELATIVE.to_vec();
134+
cards.push(size);
135+
for cardinality in cards {
136+
let batches: Vec<ArrayRef> = (0..N_BATCHES)
137+
.map(|i| {
138+
make_dict(
139+
size,
140+
cardinality,
141+
null_density,
142+
SEED.wrapping_add(i as u64),
143+
)
144+
})
145+
.collect();
146+
group.throughput(Throughput::Elements((size * N_BATCHES) as u64));
147+
group.bench_function(
148+
bench_id("repeated_intern_emit", size, cardinality, null_density),
149+
|b| {
150+
b.iter_batched_ref(
151+
|| {
152+
(
153+
new_group_values(schema.clone(), &GroupOrdering::None)
154+
.unwrap(),
155+
Vec::<usize>::with_capacity(size),
156+
)
157+
},
158+
|(gv, groups)| {
159+
for arr in &batches {
160+
gv.intern(std::slice::from_ref(arr), groups).unwrap();
161+
black_box(&*groups);
162+
}
163+
black_box(gv.emit(EmitTo::All).unwrap());
164+
},
165+
BatchSize::SmallInput,
166+
);
167+
},
168+
);
169+
}
170+
}
171+
group.finish();
172+
}
173+
174+
criterion_group!(benches, bench_intern_emit, bench_repeated_intern_emit);
175+
criterion_main!(benches);

0 commit comments

Comments
 (0)