Skip to content

Commit fbb5240

Browse files
authored
perf: Optimize NULL handling in arrays_zip (#21475)
## Which issue does this PR close? - Closes #21474. ## Rationale for this change `arrays_zip` was building a `Vec<bool>` and then converting it to a `NullBuffer`. It is simpler and (a bit) faster to just use `NullBuffer` / `NullBufferBuilder` directly. Benchmarks (ARM64): ``` - arrays_zip_no_nulls_8192: 1096.5µs → 1010.7µs, -7.8% - arrays_zip_10pct_nulls_8192: 1131.8µs → 1100.1µs, -2.8% ``` The improvement is not massive but it's non-zero, and the resulting code is cleaner and more idiomatic. ## What changes are included in this PR? * Implement optimization * Add benchmark ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent ad7c57a commit fbb5240

File tree

3 files changed

+141
-22
lines changed

3 files changed

+141
-22
lines changed

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ name = "array_min_max"
7979
harness = false
8080
name = "array_expression"
8181

82+
[[bench]]
83+
harness = false
84+
name = "arrays_zip"
85+
8286
[[bench]]
8387
harness = false
8488
name = "array_has"
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, Int64Array, ListArray};
22+
use arrow::buffer::{NullBuffer, OffsetBuffer};
23+
use arrow::datatypes::{DataType, Field};
24+
use criterion::{Criterion, criterion_group, criterion_main};
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
27+
use datafusion_functions_nested::arrays_zip::ArraysZip;
28+
use rand::rngs::StdRng;
29+
use rand::{Rng, SeedableRng};
30+
31+
const NUM_ROWS: usize = 8192;
32+
const LIST_SIZE: usize = 10;
33+
const SEED: u64 = 42;
34+
35+
/// Build a ListArray of Int64 with `num_rows` rows, each containing
36+
/// `list_size` elements. If `null_density > 0`, that fraction of
37+
/// rows will be null at the list level.
38+
fn make_list_array(
39+
rng: &mut StdRng,
40+
num_rows: usize,
41+
list_size: usize,
42+
null_density: f64,
43+
) -> ArrayRef {
44+
let total = num_rows * list_size;
45+
let values: Vec<i64> = (0..total).map(|_| rng.random_range(0..1000i64)).collect();
46+
let values_array = Arc::new(Int64Array::from(values)) as ArrayRef;
47+
48+
let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as i32).collect();
49+
50+
let nulls = if null_density > 0.0 {
51+
let valid: Vec<bool> = (0..num_rows)
52+
.map(|_| rng.random::<f64>() >= null_density)
53+
.collect();
54+
Some(NullBuffer::from(valid))
55+
} else {
56+
None
57+
};
58+
59+
Arc::new(
60+
ListArray::try_new(
61+
Arc::new(Field::new_list_field(DataType::Int64, true)),
62+
OffsetBuffer::new(offsets.into()),
63+
values_array,
64+
nulls,
65+
)
66+
.unwrap(),
67+
)
68+
}
69+
70+
fn bench_arrays_zip(c: &mut Criterion, name: &str, null_density: f64) {
71+
let mut rng = StdRng::seed_from_u64(SEED);
72+
let arr1 = make_list_array(&mut rng, NUM_ROWS, LIST_SIZE, null_density);
73+
let arr2 = make_list_array(&mut rng, NUM_ROWS, LIST_SIZE, null_density);
74+
let arr3 = make_list_array(&mut rng, NUM_ROWS, LIST_SIZE, null_density);
75+
76+
let udf = ArraysZip::new();
77+
let args_vec = vec![
78+
ColumnarValue::Array(Arc::clone(&arr1)),
79+
ColumnarValue::Array(Arc::clone(&arr2)),
80+
ColumnarValue::Array(Arc::clone(&arr3)),
81+
];
82+
let return_type = udf
83+
.return_type(&[
84+
arr1.data_type().clone(),
85+
arr2.data_type().clone(),
86+
arr3.data_type().clone(),
87+
])
88+
.unwrap();
89+
let return_field = Arc::new(Field::new("f", return_type, true));
90+
let arg_fields: Vec<_> = (0..3)
91+
.map(|_| Arc::new(Field::new("a", arr1.data_type().clone(), true)))
92+
.collect();
93+
let config_options = Arc::new(ConfigOptions::default());
94+
95+
c.bench_function(name, |b| {
96+
b.iter(|| {
97+
black_box(
98+
udf.invoke_with_args(ScalarFunctionArgs {
99+
args: args_vec.clone(),
100+
arg_fields: arg_fields.clone(),
101+
number_rows: NUM_ROWS,
102+
return_field: Arc::clone(&return_field),
103+
config_options: Arc::clone(&config_options),
104+
})
105+
.expect("arrays_zip should work"),
106+
)
107+
})
108+
});
109+
}
110+
111+
fn criterion_benchmark(c: &mut Criterion) {
112+
bench_arrays_zip(c, "arrays_zip_no_nulls_8192", 0.0);
113+
bench_arrays_zip(c, "arrays_zip_10pct_nulls_8192", 0.1);
114+
}
115+
116+
criterion_group!(benches, criterion_benchmark);
117+
criterion_main!(benches);

datafusion/functions-nested/src/arrays_zip.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
2020
use crate::utils::make_scalar_function;
2121
use arrow::array::{
22-
Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, new_null_array,
22+
Array, ArrayRef, Capacities, ListArray, MutableArrayData, NullBufferBuilder,
23+
StructArray, new_null_array,
2324
};
24-
use arrow::buffer::{NullBuffer, OffsetBuffer};
25+
use arrow::buffer::OffsetBuffer;
2526
use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
2627
use arrow::datatypes::{DataType, Field, Fields};
2728
use datafusion_common::cast::{
@@ -42,8 +43,14 @@ struct ListColumnView {
4243
values: ArrayRef,
4344
/// Pre-computed per-row start offsets (length = num_rows + 1).
4445
offsets: Vec<usize>,
45-
/// Pre-computed null bitmap: true means the row is null.
46-
is_null: Vec<bool>,
46+
/// Null bitmap from the input array (None means no nulls).
47+
nulls: Option<arrow::buffer::NullBuffer>,
48+
}
49+
50+
impl ListColumnView {
51+
fn is_null(&self, idx: usize) -> bool {
52+
self.nulls.as_ref().is_some_and(|n| n.is_null(idx))
53+
}
4754
}
4855

4956
make_udf_expr_and_func!(
@@ -170,37 +177,34 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
170177
let raw_offsets = arr.value_offsets();
171178
let offsets: Vec<usize> =
172179
raw_offsets.iter().map(|&o| o as usize).collect();
173-
let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
174180
element_types.push(field.data_type().clone());
175181
views.push(Some(ListColumnView {
176182
values: Arc::clone(arr.values()),
177183
offsets,
178-
is_null,
184+
nulls: arr.nulls().cloned(),
179185
}));
180186
}
181187
LargeList(field) => {
182188
let arr = as_large_list_array(arg)?;
183189
let raw_offsets = arr.value_offsets();
184190
let offsets: Vec<usize> =
185191
raw_offsets.iter().map(|&o| o as usize).collect();
186-
let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
187192
element_types.push(field.data_type().clone());
188193
views.push(Some(ListColumnView {
189194
values: Arc::clone(arr.values()),
190195
offsets,
191-
is_null,
196+
nulls: arr.nulls().cloned(),
192197
}));
193198
}
194199
FixedSizeList(field, size) => {
195200
let arr = as_fixed_size_list_array(arg)?;
196201
let size = *size as usize;
197202
let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();
198-
let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
199203
element_types.push(field.data_type().clone());
200204
views.push(Some(ListColumnView {
201205
values: Arc::clone(arr.values()),
202206
offsets,
203-
is_null,
207+
nulls: arr.nulls().cloned(),
204208
}));
205209
}
206210
Null => {
@@ -239,7 +243,7 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
239243

240244
let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
241245
offsets.push(0);
242-
let mut null_mask: Vec<bool> = Vec::with_capacity(num_rows);
246+
let mut null_builder = NullBufferBuilder::new(num_rows);
243247
let mut total_values: usize = 0;
244248

245249
// Process each row: compute per-array lengths, then copy values
@@ -249,24 +253,24 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
249253
let mut all_null = true;
250254

251255
for view in views.iter().flatten() {
252-
if !view.is_null[row_idx] {
256+
if !view.is_null(row_idx) {
253257
all_null = false;
254258
let len = view.offsets[row_idx + 1] - view.offsets[row_idx];
255259
max_len = max_len.max(len);
256260
}
257261
}
258262

259263
if all_null {
260-
null_mask.push(true);
264+
null_builder.append_null();
261265
offsets.push(*offsets.last().unwrap());
262266
continue;
263267
}
264-
null_mask.push(false);
268+
null_builder.append_non_null();
265269

266270
// Extend each column builder for this row.
267271
for (col_idx, view) in views.iter().enumerate() {
268272
match view {
269-
Some(v) if !v.is_null[row_idx] => {
273+
Some(v) if !v.is_null(row_idx) => {
270274
let start = v.offsets[row_idx];
271275
let end = v.offsets[row_idx + 1];
272276
let len = end - start;
@@ -309,13 +313,7 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
309313

310314
let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;
311315

312-
let null_buffer = if null_mask.iter().any(|&v| v) {
313-
Some(NullBuffer::from(
314-
null_mask.iter().map(|v| !v).collect::<Vec<bool>>(),
315-
))
316-
} else {
317-
None
318-
};
316+
let null_buffer = null_builder.finish();
319317

320318
let result = ListArray::try_new(
321319
Arc::new(Field::new_list_field(

0 commit comments

Comments
 (0)