Skip to content

Commit c73b6e1

Browse files
authored
bench: add benchmarks for first_value, last_value (#21409)
## Which issue does this PR close? ## Rationale for this change Spin-off of #21383 to have a bench for `First_Value`, `Last_Value` available before a PR with logic change. ## What changes are included in this PR? - Add benchmark for `GroupsAccumulator`. It's pretty complicated to test aggregates with grouping, since many operations are stateful, so I introduced end-to-end evaluate test (to actually test taking state) and convert_to_state (as in other benches) - A bench for a simple `Accumulator` ## Are these changes tested? - Manual bench run ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 727022d commit c73b6e1

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed

datafusion/functions-aggregate/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,7 @@ name = "min_max_bytes"
7979
[[bench]]
8080
name = "approx_distinct"
8181
harness = false
82+
83+
[[bench]]
84+
name = "first_last"
85+
harness = false
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, BooleanArray};
22+
use arrow::compute::SortOptions;
23+
use arrow::datatypes::{DataType, Field, Int64Type, Schema};
24+
use arrow::util::bench_util::{create_boolean_array, create_primitive_array};
25+
26+
use datafusion_expr::{
27+
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs,
28+
};
29+
use datafusion_functions_aggregate::first_last::{FirstValue, LastValue};
30+
use datafusion_physical_expr::PhysicalSortExpr;
31+
use datafusion_physical_expr::expressions::col;
32+
33+
use criterion::{Criterion, criterion_group, criterion_main};
34+
35+
fn prepare_groups_accumulator(is_first: bool) -> Box<dyn GroupsAccumulator> {
36+
let schema = Arc::new(Schema::new(vec![
37+
Field::new("value", DataType::Int64, true),
38+
Field::new("ord", DataType::Int64, true),
39+
]));
40+
41+
let order_expr = col("ord", &schema).unwrap();
42+
let sort_expr = PhysicalSortExpr {
43+
expr: order_expr,
44+
options: SortOptions::default(),
45+
};
46+
47+
let value_field: Arc<Field> = Field::new("value", DataType::Int64, true).into();
48+
let accumulator_args = AccumulatorArgs {
49+
return_field: Arc::clone(&value_field),
50+
schema: &schema,
51+
expr_fields: &[value_field],
52+
ignore_nulls: false,
53+
order_bys: std::slice::from_ref(&sort_expr),
54+
is_reversed: false,
55+
name: if is_first {
56+
"FIRST_VALUE(value ORDER BY ord)"
57+
} else {
58+
"LAST_VALUE(value ORDER BY ord)"
59+
},
60+
is_distinct: false,
61+
exprs: &[col("value", &schema).unwrap()],
62+
};
63+
64+
if is_first {
65+
FirstValue::new()
66+
.create_groups_accumulator(accumulator_args)
67+
.unwrap()
68+
} else {
69+
LastValue::new()
70+
.create_groups_accumulator(accumulator_args)
71+
.unwrap()
72+
}
73+
}
74+
75+
fn prepare_accumulator(is_first: bool) -> Box<dyn Accumulator> {
76+
let schema = Arc::new(Schema::new(vec![
77+
Field::new("value", DataType::Int64, true),
78+
Field::new("ord", DataType::Int64, true),
79+
]));
80+
81+
let order_expr = col("ord", &schema).unwrap();
82+
let sort_expr = PhysicalSortExpr {
83+
expr: order_expr,
84+
options: SortOptions::default(),
85+
};
86+
87+
let value_field: Arc<Field> = Field::new("value", DataType::Int64, true).into();
88+
let accumulator_args = AccumulatorArgs {
89+
return_field: Arc::clone(&value_field),
90+
schema: &schema,
91+
expr_fields: &[value_field],
92+
ignore_nulls: false,
93+
order_bys: std::slice::from_ref(&sort_expr),
94+
is_reversed: false,
95+
name: if is_first {
96+
"FIRST_VALUE(value ORDER BY ord)"
97+
} else {
98+
"LAST_VALUE(value ORDER BY ord)"
99+
},
100+
is_distinct: false,
101+
exprs: &[col("value", &schema).unwrap()],
102+
};
103+
104+
if is_first {
105+
FirstValue::new().accumulator(accumulator_args).unwrap()
106+
} else {
107+
LastValue::new().accumulator(accumulator_args).unwrap()
108+
}
109+
}
110+
111+
#[expect(clippy::needless_pass_by_value)]
112+
fn convert_to_state_bench(
113+
c: &mut Criterion,
114+
is_first: bool,
115+
name: &str,
116+
values: ArrayRef,
117+
opt_filter: Option<&BooleanArray>,
118+
) {
119+
c.bench_function(name, |b| {
120+
b.iter(|| {
121+
let accumulator = prepare_groups_accumulator(is_first);
122+
black_box(
123+
accumulator
124+
.convert_to_state(std::slice::from_ref(&values), opt_filter)
125+
.unwrap(),
126+
)
127+
})
128+
});
129+
}
130+
131+
#[expect(clippy::needless_pass_by_value)]
132+
fn evaluate_accumulator_bench(
133+
c: &mut Criterion,
134+
is_first: bool,
135+
name: &str,
136+
values: ArrayRef,
137+
ord: ArrayRef,
138+
) {
139+
c.bench_function(name, |b| {
140+
b.iter_batched(
141+
|| {
142+
// setup, not timed
143+
let mut accumulator = prepare_accumulator(is_first);
144+
accumulator
145+
.update_batch(&[Arc::clone(&values), Arc::clone(&ord)])
146+
.unwrap();
147+
accumulator
148+
},
149+
|mut accumulator| black_box(accumulator.evaluate().unwrap()),
150+
criterion::BatchSize::SmallInput,
151+
)
152+
});
153+
}
154+
155+
#[expect(clippy::needless_pass_by_value)]
156+
#[expect(clippy::too_many_arguments)]
157+
fn evaluate_bench(
158+
c: &mut Criterion,
159+
is_first: bool,
160+
emit_to: EmitTo,
161+
name: &str,
162+
values: ArrayRef,
163+
ord: ArrayRef,
164+
opt_filter: Option<&BooleanArray>,
165+
num_groups: usize,
166+
) {
167+
let n = values.len();
168+
let group_indices: Vec<usize> = (0..n).map(|i| i % num_groups).collect();
169+
170+
c.bench_function(name, |b| {
171+
b.iter_batched(
172+
|| {
173+
// setup, not timed
174+
let mut accumulator = prepare_groups_accumulator(is_first);
175+
accumulator
176+
.update_batch(
177+
&[Arc::clone(&values), Arc::clone(&ord)],
178+
&group_indices,
179+
opt_filter,
180+
num_groups,
181+
)
182+
.unwrap();
183+
accumulator
184+
},
185+
|mut accumulator| black_box(accumulator.evaluate(emit_to).unwrap()),
186+
criterion::BatchSize::SmallInput,
187+
)
188+
});
189+
}
190+
191+
fn first_last_benchmark(c: &mut Criterion) {
192+
const N: usize = 65536;
193+
const NUM_GROUPS: usize = 1024;
194+
195+
assert_eq!(N % NUM_GROUPS, 0);
196+
197+
for is_first in [true, false] {
198+
for pct in [0, 90] {
199+
let fn_name = if is_first {
200+
"first_value"
201+
} else {
202+
"last_value"
203+
};
204+
205+
let null_density = (pct as f32) / 100.0;
206+
let values = Arc::new(create_primitive_array::<Int64Type>(N, null_density))
207+
as ArrayRef;
208+
let ord = Arc::new(create_primitive_array::<Int64Type>(N, null_density))
209+
as ArrayRef;
210+
211+
evaluate_accumulator_bench(
212+
c,
213+
is_first,
214+
&format!("{fn_name} evaluate_accumulator_bench nulls={pct}%"),
215+
values.clone(),
216+
ord.clone(),
217+
);
218+
219+
for with_filter in [false, true] {
220+
let filter = create_boolean_array(N, 0.0, 0.5);
221+
let opt_filter = if with_filter { Some(&filter) } else { None };
222+
223+
convert_to_state_bench(
224+
c,
225+
is_first,
226+
&format!(
227+
"{fn_name} convert_to_state nulls={pct}%, filter={with_filter}"
228+
),
229+
values.clone(),
230+
opt_filter,
231+
);
232+
evaluate_bench(
233+
c,
234+
is_first,
235+
EmitTo::First(2),
236+
&format!(
237+
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)"
238+
),
239+
values.clone(),
240+
ord.clone(),
241+
opt_filter,
242+
NUM_GROUPS,
243+
);
244+
evaluate_bench(
245+
c,
246+
is_first,
247+
EmitTo::All,
248+
&format!(
249+
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all"
250+
),
251+
values.clone(),
252+
ord.clone(),
253+
opt_filter,
254+
NUM_GROUPS,
255+
);
256+
}
257+
}
258+
}
259+
}
260+
261+
criterion_group!(benches, first_last_benchmark);
262+
criterion_main!(benches);

0 commit comments

Comments
 (0)