Skip to content

Commit 3c56e5d

Browse files
authored
perf: Use batched row conversion for array_has_any, array_has_all (#20588)
## Which issue does this PR close? - Closes #20587 . ## Rationale for this change `array_has_any` and `array_has_all` called `RowConverter::convert_columns` twice for every input row. `convert_columns` has a lot of per-call overhead: allocating a new `Rows` buffer, doing various schema checking, and so on. It is more efficient `RowConverter` on multiple rows at once. In this PR, we break the input batch into chunks of 512 rows. We do row conversion for the entire chunk in bulk, and then implement the `has_any` / `has_all` predicate comparison by indexing into the converted rows. `array_has_any` / `array_has_all` had a special-case for strings, but it had an analogous problem: it iterated over rows, materialized each row's inner list, and then called `string_array_to_vec` twice per row. That does a lot of per-row work; it is significantly faster to call `string_array_to_vec` on many rows at once, and then index into the results to implement the per-row comparisons. In both cases, we don't convert the entire batch in a single call because benchmarks suggested that it was inefficient to do whole-batch row conversion for large arrays (500 elements per row) on some machines. It seems plausible that this slowdown happens because a large batch full of large arrays results in a large working set, which falls outside the CPU cache. Splitting the batch into smaller chunks avoids this problem. ## What changes are included in this PR? * Implement optimization * Improve test coverage for sliced arrays; not strictly related to this PR but more coverage for this codepath made me feel more comfortable ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent 422b545 commit 3c56e5d

File tree

1 file changed

+215
-36
lines changed

1 file changed

+215
-36
lines changed

datafusion/functions-nested/src/array_has.rs

Lines changed: 215 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::utils::make_scalar_function;
4242

4343
use hashbrown::HashSet;
4444
use std::any::Any;
45+
use std::ops::Range;
4546
use std::sync::Arc;
4647

4748
// Create static instances of ScalarUDFs for each function
@@ -422,30 +423,66 @@ fn array_has_all_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
422423
array_has_all_and_any_inner(args, ComparisonType::All)
423424
}
424425

426+
/// Number of rows to process at a time when doing batched row conversion. This
427+
/// amortizes the row conversion overhead over more rows, but making this too
428+
/// large can cause cache pressure for large arrays. See
429+
/// <https://github.com/apache/datafusion/pull/20588> for context.
430+
const ROW_CONVERSION_CHUNK_SIZE: usize = 512;
431+
425432
// General row comparison for array_has_all and array_has_any
426433
fn general_array_has_for_all_and_any<'a>(
427434
haystack: ArrayWrapper<'a>,
428435
needle: ArrayWrapper<'a>,
429436
comparison_type: ComparisonType,
430437
) -> Result<ArrayRef> {
431-
let mut boolean_builder = BooleanArray::builder(haystack.len());
438+
let num_rows = haystack.len();
432439
let converter = RowConverter::new(vec![SortField::new(haystack.value_type())])?;
433440

434-
for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
435-
if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
436-
let arr_values = converter.convert_columns(&[arr])?;
437-
let sub_arr_values = converter.convert_columns(&[sub_arr])?;
438-
boolean_builder.append_value(general_array_has_all_and_any_kernel(
439-
&arr_values,
440-
&sub_arr_values,
441+
let h_offsets: Vec<usize> = haystack.offsets().collect();
442+
let n_offsets: Vec<usize> = needle.offsets().collect();
443+
444+
let h_nulls = haystack.nulls();
445+
let n_nulls = needle.nulls();
446+
let mut builder = BooleanArray::builder(num_rows);
447+
448+
for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
449+
let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
450+
451+
// For efficiency with sliced arrays, only process the visible elements,
452+
// not the entire underlying buffer.
453+
let h_elem_start = h_offsets[chunk_start];
454+
let h_elem_end = h_offsets[chunk_end];
455+
let n_elem_start = n_offsets[chunk_start];
456+
let n_elem_end = n_offsets[chunk_end];
457+
458+
let h_vals = haystack
459+
.values()
460+
.slice(h_elem_start, h_elem_end - h_elem_start);
461+
let n_vals = needle
462+
.values()
463+
.slice(n_elem_start, n_elem_end - n_elem_start);
464+
465+
let chunk_h_rows = converter.convert_columns(&[h_vals])?;
466+
let chunk_n_rows = converter.convert_columns(&[n_vals])?;
467+
468+
for i in chunk_start..chunk_end {
469+
if h_nulls.is_some_and(|n| n.is_null(i))
470+
|| n_nulls.is_some_and(|n| n.is_null(i))
471+
{
472+
builder.append_null();
473+
continue;
474+
}
475+
builder.append_value(general_array_has_all_and_any_kernel(
476+
&chunk_h_rows,
477+
(h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - h_elem_start),
478+
&chunk_n_rows,
479+
(n_offsets[i] - n_elem_start)..(n_offsets[i + 1] - n_elem_start),
441480
comparison_type,
442481
));
443-
} else {
444-
boolean_builder.append_null();
445482
}
446483
}
447484

448-
Ok(Arc::new(boolean_builder.finish()))
485+
Ok(Arc::new(builder.finish()))
449486
}
450487

451488
// String comparison for array_has_all and array_has_any
@@ -454,25 +491,53 @@ fn array_has_all_and_any_string_internal<'a>(
454491
needle: ArrayWrapper<'a>,
455492
comparison_type: ComparisonType,
456493
) -> Result<ArrayRef> {
457-
let mut boolean_builder = BooleanArray::builder(haystack.len());
458-
for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
459-
match (arr, sub_arr) {
460-
(Some(arr), Some(sub_arr)) => {
461-
let haystack_array = string_array_to_vec(&arr);
462-
let needle_array = string_array_to_vec(&sub_arr);
463-
boolean_builder.append_value(array_has_string_kernel(
464-
&haystack_array,
465-
&needle_array,
466-
comparison_type,
467-
));
468-
}
469-
(_, _) => {
470-
boolean_builder.append_null();
494+
let num_rows = haystack.len();
495+
496+
let h_offsets: Vec<usize> = haystack.offsets().collect();
497+
let n_offsets: Vec<usize> = needle.offsets().collect();
498+
499+
let h_nulls = haystack.nulls();
500+
let n_nulls = needle.nulls();
501+
let mut builder = BooleanArray::builder(num_rows);
502+
503+
for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
504+
let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
505+
506+
let h_elem_start = h_offsets[chunk_start];
507+
let h_elem_end = h_offsets[chunk_end];
508+
let n_elem_start = n_offsets[chunk_start];
509+
let n_elem_end = n_offsets[chunk_end];
510+
511+
let h_vals = haystack
512+
.values()
513+
.slice(h_elem_start, h_elem_end - h_elem_start);
514+
let n_vals = needle
515+
.values()
516+
.slice(n_elem_start, n_elem_end - n_elem_start);
517+
518+
let chunk_h_strings = string_array_to_vec(h_vals.as_ref());
519+
let chunk_n_strings = string_array_to_vec(n_vals.as_ref());
520+
521+
for i in chunk_start..chunk_end {
522+
if h_nulls.is_some_and(|n| n.is_null(i))
523+
|| n_nulls.is_some_and(|n| n.is_null(i))
524+
{
525+
builder.append_null();
526+
continue;
471527
}
528+
let h_start = h_offsets[i] - h_elem_start;
529+
let h_end = h_offsets[i + 1] - h_elem_start;
530+
let n_start = n_offsets[i] - n_elem_start;
531+
let n_end = n_offsets[i + 1] - n_elem_start;
532+
builder.append_value(array_has_string_kernel(
533+
&chunk_h_strings[h_start..h_end],
534+
&chunk_n_strings[n_start..n_end],
535+
comparison_type,
536+
));
472537
}
473538
}
474539

475-
Ok(Arc::new(boolean_builder.finish()))
540+
Ok(Arc::new(builder.finish()))
476541
}
477542

478543
fn array_has_all_and_any_dispatch<'a>(
@@ -905,19 +970,22 @@ fn array_has_string_kernel(
905970

906971
fn general_array_has_all_and_any_kernel(
907972
haystack_rows: &Rows,
973+
h_range: Range<usize>,
908974
needle_rows: &Rows,
975+
mut n_range: Range<usize>,
909976
comparison_type: ComparisonType,
910977
) -> bool {
978+
let h_start = h_range.start;
979+
let h_end = h_range.end;
980+
911981
match comparison_type {
912-
ComparisonType::All => needle_rows.iter().all(|needle_row| {
913-
haystack_rows
914-
.iter()
915-
.any(|haystack_row| haystack_row == needle_row)
982+
ComparisonType::All => n_range.all(|ni| {
983+
let needle_row = needle_rows.row(ni);
984+
(h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
916985
}),
917-
ComparisonType::Any => needle_rows.iter().any(|needle_row| {
918-
haystack_rows
919-
.iter()
920-
.any(|haystack_row| haystack_row == needle_row)
986+
ComparisonType::Any => n_range.any(|ni| {
987+
let needle_row = needle_rows.row(ni);
988+
(h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
921989
}),
922990
}
923991
}
@@ -928,7 +996,10 @@ mod tests {
928996

929997
use arrow::datatypes::Int32Type;
930998
use arrow::{
931-
array::{Array, ArrayRef, AsArray, Int32Array, ListArray, create_array},
999+
array::{
1000+
Array, ArrayRef, AsArray, FixedSizeListArray, Int32Array, ListArray,
1001+
create_array,
1002+
},
9321003
buffer::OffsetBuffer,
9331004
datatypes::{DataType, Field},
9341005
};
@@ -943,7 +1014,7 @@ mod tests {
9431014

9441015
use crate::expr_fn::make_array;
9451016

946-
use super::ArrayHas;
1017+
use super::{ArrayHas, ArrayHasAll, ArrayHasAny};
9471018

9481019
#[test]
9491020
fn test_simplify_array_has_to_in_list() {
@@ -1153,4 +1224,112 @@ mod tests {
11531224

11541225
Ok(())
11551226
}
1227+
1228+
/// Invoke a two-argument list UDF with the given arrays and assert the
1229+
/// boolean output matches `expected`.
1230+
fn invoke_and_assert(
1231+
udf: &dyn ScalarUDFImpl,
1232+
haystack: &ArrayRef,
1233+
needle: ArrayRef,
1234+
expected: &[Option<bool>],
1235+
) {
1236+
let num_rows = haystack.len();
1237+
let list_type = haystack.data_type();
1238+
let result = udf
1239+
.invoke_with_args(ScalarFunctionArgs {
1240+
args: vec![
1241+
ColumnarValue::Array(Arc::clone(haystack)),
1242+
ColumnarValue::Array(needle),
1243+
],
1244+
arg_fields: vec![
1245+
Arc::new(Field::new("haystack", list_type.clone(), false)),
1246+
Arc::new(Field::new("needle", list_type.clone(), false)),
1247+
],
1248+
number_rows: num_rows,
1249+
return_field: Arc::new(Field::new("return", DataType::Boolean, true)),
1250+
config_options: Arc::new(ConfigOptions::default()),
1251+
})
1252+
.unwrap();
1253+
let output = result.into_array(num_rows).unwrap();
1254+
assert_eq!(output.as_boolean().iter().collect::<Vec<_>>(), expected);
1255+
}
1256+
1257+
#[test]
1258+
fn test_sliced_list_offsets() {
1259+
// Full rows:
1260+
// row 0: [1, 2] (not visible after slicing)
1261+
// row 1: [11, 12] (visible row 0)
1262+
// row 2: [21, 22] (visible row 1)
1263+
// row 3: [31, 32] (not visible after slicing)
1264+
let field: Arc<Field> = Arc::new(Field::new("item", DataType::Int32, false));
1265+
let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
1266+
let full_offsets = OffsetBuffer::new(vec![0, 2, 4, 6, 8].into());
1267+
let full = ListArray::new(Arc::clone(&field), full_offsets, full_values, None);
1268+
let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
1269+
1270+
// array_has_all: needle row 0 = [11], row 1 = [21]
1271+
let needle_all: ArrayRef = Arc::new(ListArray::new(
1272+
Arc::clone(&field),
1273+
OffsetBuffer::new(vec![0, 1, 2].into()),
1274+
Arc::new(Int32Array::from(vec![11, 21])),
1275+
None,
1276+
));
1277+
invoke_and_assert(
1278+
&ArrayHasAll::new(),
1279+
&sliced_haystack,
1280+
needle_all,
1281+
&[Some(true), Some(true)],
1282+
);
1283+
1284+
// array_has_any: needle row 0 = [99, 11], row 1 = [99, 21]
1285+
let needle_any: ArrayRef = Arc::new(ListArray::new(
1286+
field,
1287+
OffsetBuffer::new(vec![0, 2, 4].into()),
1288+
Arc::new(Int32Array::from(vec![99, 11, 99, 21])),
1289+
None,
1290+
));
1291+
invoke_and_assert(
1292+
&ArrayHasAny::new(),
1293+
&sliced_haystack,
1294+
needle_any,
1295+
&[Some(true), Some(true)],
1296+
);
1297+
}
1298+
1299+
#[test]
1300+
fn test_sliced_fixed_size_list_offsets() {
1301+
// Same logical data as test_sliced_list_offsets, but using FixedSizeListArray.
1302+
let field = Arc::new(Field::new("item", DataType::Int32, false));
1303+
let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
1304+
let full = FixedSizeListArray::new(Arc::clone(&field), 2, full_values, None);
1305+
let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
1306+
1307+
// array_has_all: needle row 0 = [11, 12], row 1 = [21, 22]
1308+
let needle_all: ArrayRef = Arc::new(FixedSizeListArray::new(
1309+
Arc::clone(&field),
1310+
2,
1311+
Arc::new(Int32Array::from(vec![11, 12, 21, 22])),
1312+
None,
1313+
));
1314+
invoke_and_assert(
1315+
&ArrayHasAll::new(),
1316+
&sliced_haystack,
1317+
needle_all,
1318+
&[Some(true), Some(true)],
1319+
);
1320+
1321+
// array_has_any: needle row 0 = [99, 12], row 1 = [99, 22]
1322+
let needle_any: ArrayRef = Arc::new(FixedSizeListArray::new(
1323+
field,
1324+
2,
1325+
Arc::new(Int32Array::from(vec![99, 12, 99, 22])),
1326+
None,
1327+
));
1328+
invoke_and_assert(
1329+
&ArrayHasAny::new(),
1330+
&sliced_haystack,
1331+
needle_any,
1332+
&[Some(true), Some(true)],
1333+
);
1334+
}
11561335
}

0 commit comments

Comments
 (0)