Skip to content

Commit 6154aba

Browse files
committed
map_extract benchmark
1 parent 1a0af76 commit 6154aba

2 files changed

Lines changed: 285 additions & 0 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ name = "array_slice"
9494
harness = false
9595
name = "map"
9696

97+
[[bench]]
98+
harness = false
99+
name = "map_extract"
100+
97101
[[bench]]
98102
harness = false
99103
name = "array_remove"
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
ArrayRef, BinaryArray, BinaryViewArray, Int32Array, ListArray, StringArray,
20+
StringViewArray,
21+
};
22+
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
23+
use arrow::datatypes::Field;
24+
use criterion::{Criterion, criterion_group, criterion_main};
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
27+
use datafusion_functions_nested::map::map_udf;
28+
use datafusion_functions_nested::map_extract::map_extract_udf;
29+
use rand::Rng;
30+
use rand::prelude::ThreadRng;
31+
use std::collections::HashSet;
32+
use std::hash::Hash;
33+
use std::hint::black_box;
34+
use std::sync::Arc;
35+
36+
const MAP_ROWS: usize = 1000;
37+
const MAP_KEYS_PER_ROW: usize = 1000;
38+
39+
fn gen_unique_values<T>(
40+
rng: &mut ThreadRng,
41+
mut make_value: impl FnMut(i32) -> T,
42+
) -> Vec<T>
43+
where
44+
T: Eq + Hash,
45+
{
46+
let mut values = HashSet::with_capacity(MAP_KEYS_PER_ROW);
47+
48+
while values.len() < MAP_KEYS_PER_ROW {
49+
values.insert(make_value(rng.random_range(0..10000)));
50+
}
51+
52+
values.into_iter().collect()
53+
}
54+
55+
fn gen_repeat_values<T: Clone>(values: &[T], repeats: usize) -> Vec<T> {
56+
let mut repeated = Vec::with_capacity(values.len() * repeats);
57+
58+
for _ in 0..repeats {
59+
repeated.extend_from_slice(values);
60+
}
61+
62+
repeated
63+
}
64+
65+
fn gen_utf8_values(rng: &mut ThreadRng) -> Vec<String> {
66+
gen_unique_values(rng, |value| value.to_string())
67+
}
68+
69+
fn gen_binary_values(rng: &mut ThreadRng) -> Vec<Vec<u8>> {
70+
gen_unique_values(rng, |value| value.to_le_bytes().to_vec())
71+
}
72+
73+
fn gen_primitive_values(rng: &mut ThreadRng) -> Vec<i32> {
74+
gen_unique_values(rng, |value| value)
75+
}
76+
77+
fn list_array(values: ArrayRef, row_count: usize, values_per_row: usize) -> ArrayRef {
78+
let offsets = (0..=row_count)
79+
.map(|index| (index * values_per_row) as i32)
80+
.collect::<Vec<_>>();
81+
Arc::new(ListArray::new(
82+
Arc::new(Field::new_list_field(values.data_type().clone(), true)),
83+
OffsetBuffer::new(ScalarBuffer::from(offsets)),
84+
values,
85+
None,
86+
))
87+
}
88+
89+
fn build_map_array(keys: ArrayRef, values: ArrayRef) -> ArrayRef {
90+
let number_rows = keys.len();
91+
let keys_arg = ColumnarValue::Array(keys);
92+
let values_arg = ColumnarValue::Array(values);
93+
let return_type = map_udf()
94+
.return_type(&[keys_arg.data_type(), values_arg.data_type()])
95+
.expect("should get return type");
96+
let arg_fields = vec![
97+
Field::new("keys", keys_arg.data_type(), true).into(),
98+
Field::new("values", values_arg.data_type(), true).into(),
99+
];
100+
let return_field = Field::new("map", return_type, true).into();
101+
let config_options = Arc::new(ConfigOptions::default());
102+
103+
match map_udf()
104+
.invoke_with_args(ScalarFunctionArgs {
105+
args: vec![keys_arg, values_arg],
106+
arg_fields,
107+
number_rows,
108+
return_field,
109+
config_options,
110+
})
111+
.expect("map should work on valid values")
112+
{
113+
ColumnarValue::Array(array) => array,
114+
other => panic!("expected array result, got {other:?}"),
115+
}
116+
}
117+
118+
fn bench_map_extract_case(
119+
c: &mut Criterion,
120+
name: &str,
121+
map_array: ArrayRef,
122+
query_keys: ArrayRef,
123+
) {
124+
let number_rows = map_array.len();
125+
let map_arg = ColumnarValue::Array(map_array);
126+
let key_arg = ColumnarValue::Array(query_keys);
127+
let return_type = map_extract_udf()
128+
.return_type(&[map_arg.data_type(), key_arg.data_type()])
129+
.expect("should get return type");
130+
let arg_fields = vec![
131+
Field::new("map", map_arg.data_type(), true).into(),
132+
Field::new("key", key_arg.data_type(), true).into(),
133+
];
134+
let return_field = Field::new("result", return_type, true).into();
135+
let config_options = Arc::new(ConfigOptions::default());
136+
137+
c.bench_function(name, |b| {
138+
b.iter(|| {
139+
black_box(
140+
map_extract_udf()
141+
.invoke_with_args(ScalarFunctionArgs {
142+
args: vec![map_arg.clone(), key_arg.clone()],
143+
arg_fields: arg_fields.clone(),
144+
number_rows,
145+
return_field: Arc::clone(&return_field),
146+
config_options: Arc::clone(&config_options),
147+
})
148+
.expect("map_extract should work on valid values"),
149+
);
150+
});
151+
});
152+
}
153+
154+
fn criterion_benchmark(c: &mut Criterion) {
155+
let mut rng = rand::rng();
156+
let primitive_values = gen_primitive_values(&mut rng);
157+
let utf8_values = gen_utf8_values(&mut rng);
158+
let binary_values = gen_binary_values(&mut rng);
159+
let values = Arc::new(Int32Array::from(gen_repeat_values(
160+
&primitive_values,
161+
MAP_ROWS,
162+
))) as ArrayRef;
163+
let values = list_array(values, MAP_ROWS, MAP_KEYS_PER_ROW);
164+
165+
let map_extract_cases = [
166+
(
167+
"map_extract_1000_utf8_found_middle",
168+
build_map_array(
169+
list_array(
170+
Arc::new(StringArray::from(gen_repeat_values(&utf8_values, MAP_ROWS)))
171+
as ArrayRef,
172+
MAP_ROWS,
173+
MAP_KEYS_PER_ROW,
174+
),
175+
Arc::clone(&values),
176+
),
177+
Arc::new(StringArray::from(vec![
178+
utf8_values[MAP_KEYS_PER_ROW / 2]
179+
.clone();
180+
MAP_ROWS
181+
])) as ArrayRef,
182+
),
183+
(
184+
"map_extract_1000_utf8_found_last",
185+
build_map_array(
186+
list_array(
187+
Arc::new(StringArray::from(gen_repeat_values(&utf8_values, MAP_ROWS)))
188+
as ArrayRef,
189+
MAP_ROWS,
190+
MAP_KEYS_PER_ROW,
191+
),
192+
Arc::clone(&values),
193+
),
194+
Arc::new(StringArray::from(vec![
195+
utf8_values[MAP_KEYS_PER_ROW - 1]
196+
.clone();
197+
MAP_ROWS
198+
])) as ArrayRef,
199+
),
200+
(
201+
"map_extract_1000_binary_found_last",
202+
build_map_array(
203+
list_array(
204+
Arc::new(BinaryArray::from_iter_values(gen_repeat_values(
205+
&binary_values,
206+
MAP_ROWS,
207+
))) as ArrayRef,
208+
MAP_ROWS,
209+
MAP_KEYS_PER_ROW,
210+
),
211+
Arc::clone(&values),
212+
),
213+
Arc::new(BinaryArray::from_iter_values(vec![
214+
binary_values[MAP_KEYS_PER_ROW - 1].clone();
215+
MAP_ROWS
216+
])) as ArrayRef,
217+
),
218+
(
219+
"map_extract_1000_utf8_view_found_last",
220+
build_map_array(
221+
list_array(
222+
Arc::new(StringViewArray::from(gen_repeat_values(
223+
&utf8_values,
224+
MAP_ROWS,
225+
))) as ArrayRef,
226+
MAP_ROWS,
227+
MAP_KEYS_PER_ROW,
228+
),
229+
Arc::clone(&values),
230+
),
231+
Arc::new(StringViewArray::from(vec![
232+
utf8_values[MAP_KEYS_PER_ROW - 1]
233+
.clone();
234+
MAP_ROWS
235+
])) as ArrayRef,
236+
),
237+
(
238+
"map_extract_1000_binary_view_found_last",
239+
build_map_array(
240+
list_array(
241+
Arc::new(BinaryViewArray::from_iter_values(gen_repeat_values(
242+
&binary_values,
243+
MAP_ROWS,
244+
))) as ArrayRef,
245+
MAP_ROWS,
246+
MAP_KEYS_PER_ROW,
247+
),
248+
Arc::clone(&values),
249+
),
250+
Arc::new(BinaryViewArray::from_iter_values(vec![
251+
binary_values[MAP_KEYS_PER_ROW - 1].clone();
252+
MAP_ROWS
253+
])) as ArrayRef,
254+
),
255+
(
256+
"map_extract_1000_int32_found_last",
257+
build_map_array(
258+
list_array(
259+
Arc::new(Int32Array::from(gen_repeat_values(
260+
&primitive_values,
261+
MAP_ROWS,
262+
))) as ArrayRef,
263+
MAP_ROWS,
264+
MAP_KEYS_PER_ROW,
265+
),
266+
Arc::clone(&values),
267+
),
268+
Arc::new(Int32Array::from(vec![
269+
primitive_values[MAP_KEYS_PER_ROW - 1];
270+
MAP_ROWS
271+
])) as ArrayRef,
272+
),
273+
];
274+
275+
for (name, map_array, query_keys) in map_extract_cases {
276+
bench_map_extract_case(c, name, map_array, query_keys);
277+
}
278+
}
279+
280+
criterion_group!(benches, criterion_benchmark);
281+
criterion_main!(benches);

0 commit comments

Comments
 (0)