Skip to content

Commit 152e91d

Browse files
committed
Add bench for first_last
1 parent 07e7fb5 commit 152e91d

File tree

2 files changed

+198
-0
lines changed

2 files changed

+198
-0
lines changed

datafusion/functions-aggregate/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,7 @@ name = "min_max_bytes"
7979
[[bench]]
8080
name = "approx_distinct"
8181
harness = false
82+
83+
[[bench]]
84+
name = "first_last"
85+
harness = false
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, BooleanArray};
22+
use arrow::compute::SortOptions;
23+
use arrow::datatypes::{DataType, Field, Int64Type, Schema};
24+
use arrow::util::bench_util::{create_boolean_array, create_primitive_array};
25+
26+
use datafusion_expr::{
27+
AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs,
28+
};
29+
use datafusion_functions_aggregate::first_last::{FirstValue, LastValue};
30+
use datafusion_physical_expr::PhysicalSortExpr;
31+
use datafusion_physical_expr::expressions::col;
32+
33+
use criterion::{Criterion, criterion_group, criterion_main};
34+
35+
fn prepare_groups_accumulator(is_first: bool) -> Box<dyn GroupsAccumulator> {
36+
let schema = Arc::new(Schema::new(vec![
37+
Field::new("value", DataType::Int64, true),
38+
Field::new("ord", DataType::Int64, true),
39+
]));
40+
41+
let order_expr = col("ord", &schema).unwrap();
42+
let sort_expr = PhysicalSortExpr {
43+
expr: order_expr,
44+
options: SortOptions::default(),
45+
};
46+
47+
let value_field: Arc<Field> = Field::new("value", DataType::Int64, true).into();
48+
let accumulator_args = AccumulatorArgs {
49+
return_field: Arc::clone(&value_field),
50+
schema: &schema,
51+
expr_fields: &[value_field],
52+
ignore_nulls: false,
53+
order_bys: std::slice::from_ref(&sort_expr),
54+
is_reversed: false,
55+
name: if is_first {
56+
"FIRST_VALUE(value ORDER BY ord)"
57+
} else {
58+
"LAST_VALUE(value ORDER BY ord)"
59+
},
60+
is_distinct: false,
61+
exprs: &[col("value", &schema).unwrap()],
62+
};
63+
64+
if is_first {
65+
FirstValue::new()
66+
.create_groups_accumulator(accumulator_args)
67+
.unwrap()
68+
} else {
69+
LastValue::new()
70+
.create_groups_accumulator(accumulator_args)
71+
.unwrap()
72+
}
73+
}
74+
75+
#[expect(clippy::needless_pass_by_value)]
76+
fn convert_to_state_bench(
77+
c: &mut Criterion,
78+
is_first: bool,
79+
name: &str,
80+
values: ArrayRef,
81+
opt_filter: Option<&BooleanArray>,
82+
) {
83+
c.bench_function(name, |b| {
84+
b.iter(|| {
85+
let accumulator = prepare_groups_accumulator(is_first);
86+
black_box(
87+
accumulator
88+
.convert_to_state(std::slice::from_ref(&values), opt_filter)
89+
.unwrap(),
90+
)
91+
})
92+
});
93+
}
94+
95+
#[expect(clippy::needless_pass_by_value)]
96+
#[expect(clippy::too_many_arguments)]
97+
fn evaluate_bench(
98+
c: &mut Criterion,
99+
is_first: bool,
100+
emit_to: EmitTo,
101+
name: &str,
102+
values: ArrayRef,
103+
ord: ArrayRef,
104+
opt_filter: Option<&BooleanArray>,
105+
num_groups: usize,
106+
) {
107+
let n = values.len();
108+
let group_indices: Vec<usize> = (0..n).map(|i| i % num_groups).collect();
109+
110+
c.bench_function(name, |b| {
111+
b.iter_batched(
112+
|| {
113+
// setup, not timed
114+
let mut accumulator = prepare_groups_accumulator(is_first);
115+
accumulator
116+
.update_batch(
117+
&[Arc::clone(&values), Arc::clone(&ord)],
118+
&group_indices,
119+
opt_filter,
120+
num_groups,
121+
)
122+
.unwrap();
123+
accumulator
124+
},
125+
|mut accumulator| black_box(accumulator.evaluate(emit_to).unwrap()),
126+
criterion::BatchSize::SmallInput,
127+
)
128+
});
129+
}
130+
131+
fn first_last_benchmark(c: &mut Criterion) {
132+
const N: usize = 65536;
133+
const NUM_GROUPS: usize = 1024;
134+
135+
assert_eq!(N % NUM_GROUPS, 0);
136+
137+
for is_first in [true, false] {
138+
for pct in [0, 90] {
139+
let fn_name = if is_first {
140+
"first_value"
141+
} else {
142+
"last_value"
143+
};
144+
145+
let null_density = (pct as f32) / 100.0;
146+
let values = Arc::new(create_primitive_array::<Int64Type>(N, null_density))
147+
as ArrayRef;
148+
let ord = Arc::new(create_primitive_array::<Int64Type>(N, null_density))
149+
as ArrayRef;
150+
151+
for with_filter in [false, true] {
152+
let filter = create_boolean_array(N, 0.0, 0.5);
153+
let opt_filter = if with_filter { Some(&filter) } else { None };
154+
155+
convert_to_state_bench(
156+
c,
157+
is_first,
158+
&format!(
159+
"{fn_name} convert_to_state nulls={pct}%, filter={with_filter}"
160+
),
161+
values.clone(),
162+
opt_filter,
163+
);
164+
evaluate_bench(
165+
c,
166+
is_first,
167+
EmitTo::First(2),
168+
&format!(
169+
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)"
170+
),
171+
values.clone(),
172+
ord.clone(),
173+
opt_filter,
174+
NUM_GROUPS,
175+
);
176+
evaluate_bench(
177+
c,
178+
is_first,
179+
EmitTo::All,
180+
&format!(
181+
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all"
182+
),
183+
values.clone(),
184+
ord.clone(),
185+
opt_filter,
186+
NUM_GROUPS,
187+
);
188+
}
189+
}
190+
}
191+
}
192+
193+
criterion_group!(benches, first_last_benchmark);
194+
criterion_main!(benches);

0 commit comments

Comments
 (0)