diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index 31462d5e509ed..53b0be2f72093 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -79,6 +79,10 @@ name = "array_min_max" harness = false name = "array_expression" +[[bench]] +harness = false +name = "arrays_zip" + [[bench]] harness = false name = "array_has" diff --git a/datafusion/functions-nested/benches/arrays_zip.rs b/datafusion/functions-nested/benches/arrays_zip.rs new file mode 100644 index 0000000000000..bc82b2978cc42 --- /dev/null +++ b/datafusion/functions-nested/benches/arrays_zip.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array, ListArray}; +use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::datatypes::{DataType, Field}; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::arrays_zip::ArraysZip; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const NUM_ROWS: usize = 8192; +const LIST_SIZE: usize = 10; +const SEED: u64 = 42; + +/// Build a ListArray of Int64 with `num_rows` rows, each containing +/// `list_size` elements. If `null_density > 0`, that fraction of +/// rows will be null at the list level. +fn make_list_array( + rng: &mut StdRng, + num_rows: usize, + list_size: usize, + null_density: f64, +) -> ArrayRef { + let total = num_rows * list_size; + let values: Vec = (0..total).map(|_| rng.random_range(0..1000i64)).collect(); + let values_array = Arc::new(Int64Array::from(values)) as ArrayRef; + + let offsets: Vec = (0..=num_rows).map(|i| (i * list_size) as i32).collect(); + + let nulls = if null_density > 0.0 { + let valid: Vec = (0..num_rows) + .map(|_| rng.random::() >= null_density) + .collect(); + Some(NullBuffer::from(valid)) + } else { + None + }; + + Arc::new( + ListArray::try_new( + Arc::new(Field::new_list_field(DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + values_array, + nulls, + ) + .unwrap(), + ) +} + +fn bench_arrays_zip(c: &mut Criterion, name: &str, null_density: f64) { + let mut rng = StdRng::seed_from_u64(SEED); + let arr1 = make_list_array(&mut rng, NUM_ROWS, LIST_SIZE, null_density); + let arr2 = make_list_array(&mut rng, NUM_ROWS, LIST_SIZE, null_density); + let arr3 = make_list_array(&mut rng, NUM_ROWS, LIST_SIZE, null_density); + + let udf = ArraysZip::new(); + let args_vec = vec![ + ColumnarValue::Array(Arc::clone(&arr1)), + ColumnarValue::Array(Arc::clone(&arr2)), + ColumnarValue::Array(Arc::clone(&arr3)), + ]; + let return_type = udf + .return_type(&[ + arr1.data_type().clone(), + arr2.data_type().clone(), + arr3.data_type().clone(), + ]) + .unwrap(); + let return_field = Arc::new(Field::new("f", return_type, true)); + let arg_fields: Vec<_> = (0..3) + .map(|_| Arc::new(Field::new("a", arr1.data_type().clone(), true))) + .collect(); + let config_options = Arc::new(ConfigOptions::default()); + + c.bench_function(name, |b| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args_vec.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("arrays_zip should work"), + ) + }) + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + bench_arrays_zip(c, "arrays_zip_no_nulls_8192", 0.0); + bench_arrays_zip(c, "arrays_zip_10pct_nulls_8192", 0.1); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs index 61b69726f0fbd..5f1cb9dedf408 100644 --- a/datafusion/functions-nested/src/arrays_zip.rs +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -19,9 +19,10 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, new_null_array, + Array, ArrayRef, Capacities, ListArray, MutableArrayData, NullBufferBuilder, + StructArray, new_null_array, }; -use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::cast::{ @@ -42,8 +43,14 @@ struct ListColumnView { values: ArrayRef, /// Pre-computed per-row start offsets (length = num_rows + 1). offsets: Vec, - /// Pre-computed null bitmap: true means the row is null. - is_null: Vec, + /// Null bitmap from the input array (None means no nulls). + nulls: Option, +} + +impl ListColumnView { + fn is_null(&self, idx: usize) -> bool { + self.nulls.as_ref().is_some_and(|n| n.is_null(idx)) + } } make_udf_expr_and_func!( @@ -170,12 +177,11 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let raw_offsets = arr.value_offsets(); let offsets: Vec = raw_offsets.iter().map(|&o| o as usize).collect(); - let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); element_types.push(field.data_type().clone()); views.push(Some(ListColumnView { values: Arc::clone(arr.values()), offsets, - is_null, + nulls: arr.nulls().cloned(), })); } LargeList(field) => { @@ -183,24 +189,22 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let raw_offsets = arr.value_offsets(); let offsets: Vec = raw_offsets.iter().map(|&o| o as usize).collect(); - let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); element_types.push(field.data_type().clone()); views.push(Some(ListColumnView { values: Arc::clone(arr.values()), offsets, - is_null, + nulls: arr.nulls().cloned(), })); } FixedSizeList(field, size) => { let arr = as_fixed_size_list_array(arg)?; let size = *size as usize; let offsets: Vec = (0..=num_rows).map(|row| row * size).collect(); - let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); element_types.push(field.data_type().clone()); views.push(Some(ListColumnView { values: Arc::clone(arr.values()), offsets, - is_null, + nulls: arr.nulls().cloned(), })); } Null => { @@ -239,7 +243,7 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let mut offsets: Vec = Vec::with_capacity(num_rows + 1); offsets.push(0); - let mut null_mask: Vec = Vec::with_capacity(num_rows); + let mut null_builder = NullBufferBuilder::new(num_rows); let mut total_values: usize = 0; // Process each row: compute per-array lengths, then copy values @@ -249,7 +253,7 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let mut all_null = true; for view in views.iter().flatten() { - if !view.is_null[row_idx] { + if !view.is_null(row_idx) { all_null = false; let len = view.offsets[row_idx + 1] - view.offsets[row_idx]; max_len = max_len.max(len); @@ -257,16 +261,16 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { } if all_null { - null_mask.push(true); + null_builder.append_null(); offsets.push(*offsets.last().unwrap()); continue; } - null_mask.push(false); + null_builder.append_non_null(); // Extend each column builder for this row. for (col_idx, view) in views.iter().enumerate() { match view { - Some(v) if !v.is_null[row_idx] => { + Some(v) if !v.is_null(row_idx) => { let start = v.offsets[row_idx]; let end = v.offsets[row_idx + 1]; let len = end - start; @@ -309,13 +313,7 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; - let null_buffer = if null_mask.iter().any(|&v| v) { - Some(NullBuffer::from( - null_mask.iter().map(|v| !v).collect::>(), - )) - } else { - None - }; + let null_buffer = null_builder.finish(); let result = ListArray::try_new( Arc::new(Field::new_list_field(