Skip to content

Commit fc514c2

Browse files
authored
perf: Optimize set operations to avoid RowConverter deserialization overhead (#20623)
## Which issue does this PR close? - Closes #20622. ## Rationale for this change Several array set operations (e.g., `array_distinct`, `array_union`, `array_intersect`, `array_except`) share a similar structure: * Convert the input(s) using `RowConverter`, ideally in bulk * Apply the set operation as appropriate, which involves adding or removing elements from the candidate set of result `Rows` * Convert the final set of `Rows` back into `ArrayRef` We can do better for the final step: instead of converting from `Rows` back into `ArrayRef`, we can just track which indices in the input(s) correspond to the values we want to return. We can then grab those values with a single `take`, which avoids the `Row` -> `ArrayRef` deserialization overhead. This is a 5-20% performance win, depending on the set operation and the characteristics of the input. The only wrinkle is that for `intersect` and `union`, because there are multiple inputs we need to concatenate the inputs together so that we have a single index space. It turns out that this optimization is a win, even incurring the `concat` overhead. ## What changes are included in this PR? * Add a benchmark for `array_except` * Implement this optimization for `array_distinct`, `array_union`, `array_intersect`, `array_except` ## Are these changes tested? Yes, and benchmarked. ## Are there any user-facing changes? No.
1 parent daa8f52 commit fc514c2

3 files changed

Lines changed: 106 additions & 37 deletions

File tree

datafusion/functions-nested/benches/array_set_ops.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use criterion::{
2323
};
2424
use datafusion_common::config::ConfigOptions;
2525
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
26+
use datafusion_functions_nested::except::ArrayExcept;
2627
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion};
2728
use rand::SeedableRng;
2829
use rand::prelude::SliceRandom;
@@ -38,6 +39,7 @@ const SEED: u64 = 42;
3839
fn criterion_benchmark(c: &mut Criterion) {
3940
bench_array_union(c);
4041
bench_array_intersect(c);
42+
bench_array_except(c);
4143
bench_array_distinct(c);
4244
}
4345

@@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) {
98100
group.finish();
99101
}
100102

103+
fn bench_array_except(c: &mut Criterion) {
104+
let mut group = c.benchmark_group("array_except");
105+
let udf = ArrayExcept::new();
106+
107+
for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
108+
for &array_size in ARRAY_SIZES {
109+
let (array1, array2) =
110+
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
111+
group.bench_with_input(
112+
BenchmarkId::new(*overlap_label, array_size),
113+
&array_size,
114+
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
115+
);
116+
}
117+
}
118+
119+
group.finish();
120+
}
121+
101122
fn bench_array_distinct(c: &mut Criterion) {
102123
let mut group = c.benchmark_group("array_distinct");
103124
let udf = ArrayDistinct::new();

datafusion/functions-nested/src/except.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
2020
use crate::utils::{check_datatypes, make_scalar_function};
2121
use arrow::array::new_null_array;
22-
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
22+
use arrow::array::{
23+
Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
24+
cast::AsArray,
25+
};
2326
use arrow::buffer::{NullBuffer, OffsetBuffer};
27+
use arrow::compute::take;
2428
use arrow::datatypes::{DataType, FieldRef};
2529
use arrow::row::{RowConverter, SortField};
2630
use datafusion_common::utils::{ListCoercion, take_function_args};
@@ -179,7 +183,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
179183
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
180184
offsets.push(OffsetSize::usize_as(0));
181185

182-
let mut rows = Vec::with_capacity(l_values.num_rows());
186+
let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
183187
let mut dedup = HashSet::new();
184188

185189
let nulls = NullBuffer::union(l.nulls(), r.nulls());
@@ -193,7 +197,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
193197
.as_ref()
194198
.is_some_and(|nulls| nulls.is_null(list_index))
195199
{
196-
offsets.push(OffsetSize::usize_as(rows.len()));
200+
offsets.push(OffsetSize::usize_as(indices.len()));
197201
continue;
198202
}
199203

@@ -204,22 +208,32 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
204208
for element_index in l_start.as_usize()..l_end.as_usize() {
205209
let left_row = l_values.row(element_index);
206210
if dedup.insert(left_row) {
207-
rows.push(left_row);
211+
indices.push(element_index);
208212
}
209213
}
210214

211-
offsets.push(OffsetSize::usize_as(rows.len()));
215+
offsets.push(OffsetSize::usize_as(indices.len()));
212216
dedup.clear();
213217
}
214218

215-
if let Some(values) = converter.convert_rows(rows)?.first() {
216-
Ok(GenericListArray::<OffsetSize>::new(
217-
field.to_owned(),
218-
OffsetBuffer::new(offsets.into()),
219-
values.to_owned(),
220-
nulls,
221-
))
219+
// Gather distinct left-side values by index.
220+
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
221+
let values = if indices.is_empty() {
222+
arrow::array::new_empty_array(&l.value_type())
223+
} else if OffsetSize::IS_LARGE {
224+
let indices =
225+
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
226+
take(l.values().as_ref(), &indices, None)?
222227
} else {
223-
internal_err!("array_except failed to convert rows")
224-
}
228+
let indices =
229+
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
230+
take(l.values().as_ref(), &indices, None)?
231+
};
232+
233+
Ok(GenericListArray::<OffsetSize>::new(
234+
field.to_owned(),
235+
OffsetBuffer::new(offsets.into()),
236+
values,
237+
nulls,
238+
))
225239
}

datafusion/functions-nested/src/set_ops.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
2020
use crate::utils::make_scalar_function;
2121
use arrow::array::{
22-
Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array,
22+
Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
23+
new_empty_array, new_null_array,
2324
};
2425
use arrow::buffer::{NullBuffer, OffsetBuffer};
26+
use arrow::compute::{concat, take};
2527
use arrow::datatypes::DataType::{LargeList, List, Null};
2628
use arrow::datatypes::{DataType, Field, FieldRef};
2729
use arrow::row::{RowConverter, SortField};
@@ -373,12 +375,28 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
373375
let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
374376
let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;
375377

378+
// Combine value arrays so indices from both sides share a single index space.
379+
let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?;
380+
let r_offset = l.values().len();
381+
376382
match set_op {
377383
SetOp::Union => generic_set_loop::<OffsetSize, true>(
378-
l, r, &rows_l, &rows_r, field, &converter,
384+
l,
385+
r,
386+
&rows_l,
387+
&rows_r,
388+
field,
389+
&combined_values,
390+
r_offset,
379391
),
380392
SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
381-
l, r, &rows_l, &rows_r, field, &converter,
393+
l,
394+
r,
395+
&rows_l,
396+
&rows_r,
397+
field,
398+
&combined_values,
399+
r_offset,
382400
),
383401
}
384402
}
@@ -391,7 +409,8 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
391409
rows_l: &arrow::row::Rows,
392410
rows_r: &arrow::row::Rows,
393411
field: Arc<Field>,
394-
converter: &RowConverter,
412+
combined_values: &ArrayRef,
413+
r_offset: usize,
395414
) -> Result<ArrayRef> {
396415
let l_offsets = l.value_offsets();
397416
let r_offsets = r.value_offsets();
@@ -406,7 +425,7 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
406425
rows_l.num_rows().min(rows_r.num_rows())
407426
};
408427

409-
let mut final_rows = Vec::with_capacity(initial_capacity);
428+
let mut indices: Vec<usize> = Vec::with_capacity(initial_capacity);
410429

411430
// Reuse hash sets across iterations
412431
let mut seen = HashSet::new();
@@ -430,25 +449,27 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
430449
for idx in l_start..l_end {
431450
let row = rows_l.row(idx);
432451
if seen.insert(row) {
433-
final_rows.push(row);
452+
indices.push(idx);
434453
}
435454
}
436455
for idx in r_start..r_end {
437456
let row = rows_r.row(idx);
438457
if seen.insert(row) {
439-
final_rows.push(row);
458+
indices.push(idx + r_offset);
440459
}
441460
}
442461
} else {
443462
let l_len = l_end - l_start;
444463
let r_len = r_end - r_start;
445464

446-
// Select shorter side for lookup, longer side for probing
447-
let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len {
448-
(rows_l, l_start..l_end, rows_r, r_start..r_end)
449-
} else {
450-
(rows_r, r_start..r_end, rows_l, l_start..l_end)
451-
};
465+
// Select shorter side for lookup, longer side for probing.
466+
// Track the probe side's offset into the combined values array.
467+
let (lookup_rows, lookup_range, probe_rows, probe_range, probe_offset) =
468+
if l_len < r_len {
469+
(rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset)
470+
} else {
471+
(rows_r, r_start..r_end, rows_l, l_start..l_end, 0)
472+
};
452473
lookup_set.clear();
453474
lookup_set.reserve(lookup_range.len());
454475

@@ -461,18 +482,25 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
461482
for idx in probe_range {
462483
let row = probe_rows.row(idx);
463484
if lookup_set.contains(&row) && seen.insert(row) {
464-
final_rows.push(row);
485+
indices.push(idx + probe_offset);
465486
}
466487
}
467488
}
468489
result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
469490
}
470491

471-
let final_values = if final_rows.is_empty() {
492+
// Gather distinct values by index from the combined values array.
493+
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
494+
let final_values = if indices.is_empty() {
472495
new_empty_array(&l.value_type())
496+
} else if OffsetSize::IS_LARGE {
497+
let indices =
498+
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
499+
take(combined_values.as_ref(), &indices, None)?
473500
} else {
474-
let arrays = converter.convert_rows(final_rows)?;
475-
Arc::clone(&arrays[0])
501+
let indices =
502+
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
503+
take(combined_values.as_ref(), &indices, None)?
476504
};
477505

478506
let arr = GenericListArray::<OffsetSize>::try_new(
@@ -539,7 +567,7 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
539567
// Convert all values to row format in a single batch for performance
540568
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
541569
let rows = converter.convert_columns(&[Arc::clone(array.values())])?;
542-
let mut final_rows = Vec::with_capacity(rows.num_rows());
570+
let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
543571
let mut seen = HashSet::new();
544572
for i in 0..array.len() {
545573
let last_offset = *offsets.last().unwrap();
@@ -559,18 +587,24 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
559587
for idx in start..end {
560588
let row = rows.row(idx);
561589
if seen.insert(row) {
562-
final_rows.push(row);
590+
indices.push(idx);
563591
}
564592
}
565593
offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
566594
}
567595

568-
// Convert all collected distinct rows back
569-
let final_values = if final_rows.is_empty() {
596+
// Gather distinct values in a single pass, using the computed `indices`.
597+
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
598+
let final_values = if indices.is_empty() {
570599
new_empty_array(&dt)
600+
} else if OffsetSize::IS_LARGE {
601+
let indices =
602+
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
603+
take(array.values().as_ref(), &indices, None)?
571604
} else {
572-
let arrays = converter.convert_rows(final_rows)?;
573-
Arc::clone(&arrays[0])
605+
let indices =
606+
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
607+
take(array.values().as_ref(), &indices, None)?
574608
};
575609

576610
Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(

0 commit comments

Comments
 (0)