Skip to content

Commit 538a201

Browse files
authored
perf: Optimize array set ops on sliced arrays (#20693)
## Which issue does this PR close? N/A ## Rationale for this change Several array set operations (`array_union`, `array_intersect`, `array_distinct`, `array_except`) operate on all values in the underlying values buffer of a `ListArray` when doing batched row conversion. For sliced ListArrays, `values()` returns the full underlying buffer, which means we end up doing row conversion for rows that aren't in the visible slice. This was not a correctness issue but it is inefficient. ## What changes are included in this PR? - Change array set ops to do row conversion on the visible slice, not the full values buffer - Add unit tests for array set ops on sliced ListArrays. These tests pass with or without this PR, but it seems wise to have more test coverage for sliced ListArrays - Add benchmarks for array set ops on sliced ListArrays. ## Are these changes tested? Yes. ## Are there any user-facing changes? No. ## AI usage Multiple AI tools were used to iterate on this PR. I have reviewed and understand the resulting code.
1 parent 9b7d092 commit 538a201

File tree

3 files changed

+343
-27
lines changed

3 files changed

+343
-27
lines changed

datafusion/functions-nested/benches/array_set_ops.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,19 @@ use std::sync::Arc;
3535
const NUM_ROWS: usize = 1000;
3636
const ARRAY_SIZES: &[usize] = &[10, 50, 100];
3737
const SEED: u64 = 42;
38+
/// Extra rows on each side when building sliced arrays, so the underlying
39+
/// values buffer is much larger than the visible portion.
40+
const SLICE_PADDING: usize = 5000;
3841

3942
fn criterion_benchmark(c: &mut Criterion) {
4043
bench_array_union(c);
4144
bench_array_intersect(c);
4245
bench_array_except(c);
4346
bench_array_distinct(c);
47+
bench_array_union_sliced(c);
48+
bench_array_intersect_sliced(c);
49+
bench_array_distinct_sliced(c);
50+
bench_array_except_sliced(c);
4451
}
4552

4653
fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) {
@@ -276,5 +283,107 @@ fn create_array_with_duplicates(
276283
)
277284
}
278285

286+
/// Slice a pair of arrays to the middle `NUM_ROWS` rows from a larger array.
287+
fn slice_pair(arrays: &(ArrayRef, ArrayRef)) -> (ArrayRef, ArrayRef) {
288+
let a1 = arrays.0.slice(SLICE_PADDING, NUM_ROWS);
289+
let a2 = arrays.1.slice(SLICE_PADDING, NUM_ROWS);
290+
(a1, a2)
291+
}
292+
293+
fn bench_array_union_sliced(c: &mut Criterion) {
294+
let mut group = c.benchmark_group("array_union_sliced");
295+
let udf = ArrayUnion::new();
296+
297+
for &array_size in ARRAY_SIZES {
298+
let (a1, a2) = slice_pair(&create_arrays_with_overlap(
299+
NUM_ROWS + 2 * SLICE_PADDING,
300+
array_size,
301+
0.5,
302+
));
303+
group.bench_with_input(
304+
BenchmarkId::from_parameter(array_size),
305+
&array_size,
306+
|b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)),
307+
);
308+
}
309+
group.finish();
310+
}
311+
312+
fn bench_array_intersect_sliced(c: &mut Criterion) {
313+
let mut group = c.benchmark_group("array_intersect_sliced");
314+
let udf = ArrayIntersect::new();
315+
316+
for &array_size in ARRAY_SIZES {
317+
let (a1, a2) = slice_pair(&create_arrays_with_overlap(
318+
NUM_ROWS + 2 * SLICE_PADDING,
319+
array_size,
320+
0.5,
321+
));
322+
group.bench_with_input(
323+
BenchmarkId::from_parameter(array_size),
324+
&array_size,
325+
|b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)),
326+
);
327+
}
328+
group.finish();
329+
}
330+
331+
fn bench_array_except_sliced(c: &mut Criterion) {
332+
let mut group = c.benchmark_group("array_except_sliced");
333+
let udf = ArrayExcept::new();
334+
335+
for &array_size in ARRAY_SIZES {
336+
let (a1, a2) = slice_pair(&create_arrays_with_overlap(
337+
NUM_ROWS + 2 * SLICE_PADDING,
338+
array_size,
339+
0.5,
340+
));
341+
group.bench_with_input(
342+
BenchmarkId::from_parameter(array_size),
343+
&array_size,
344+
|b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)),
345+
);
346+
}
347+
group.finish();
348+
}
349+
350+
fn bench_array_distinct_sliced(c: &mut Criterion) {
351+
let mut group = c.benchmark_group("array_distinct_sliced");
352+
let udf = ArrayDistinct::new();
353+
354+
for &array_size in ARRAY_SIZES {
355+
let array =
356+
create_array_with_duplicates(NUM_ROWS + 2 * SLICE_PADDING, array_size, 0.5)
357+
.slice(SLICE_PADDING, NUM_ROWS);
358+
group.bench_with_input(
359+
BenchmarkId::from_parameter(array_size),
360+
&array_size,
361+
|b, _| {
362+
b.iter(|| {
363+
black_box(
364+
udf.invoke_with_args(ScalarFunctionArgs {
365+
args: vec![ColumnarValue::Array(array.clone())],
366+
arg_fields: vec![
367+
Field::new("arr", array.data_type().clone(), false)
368+
.into(),
369+
],
370+
number_rows: NUM_ROWS,
371+
return_field: Field::new(
372+
"result",
373+
array.data_type().clone(),
374+
false,
375+
)
376+
.into(),
377+
config_options: Arc::new(ConfigOptions::default()),
378+
})
379+
.unwrap(),
380+
)
381+
})
382+
},
383+
);
384+
}
385+
group.finish();
386+
}
387+
279388
criterion_group!(benches, criterion_benchmark);
280389
criterion_main!(benches);

datafusion/functions-nested/src/except.rs

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,16 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
175175
) -> Result<GenericListArray<OffsetSize>> {
176176
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
177177

178-
let l_values = l.values().to_owned();
179-
let r_values = r.values().to_owned();
180-
let l_values = converter.convert_columns(&[l_values])?;
181-
let r_values = converter.convert_columns(&[r_values])?;
178+
// Only convert the visible portion of the values array. For sliced
179+
// ListArrays, values() returns the full underlying array but only
180+
// elements between the first and last offset are referenced.
181+
let l_first = l.offsets()[0].as_usize();
182+
let l_len = l.offsets()[l.len()].as_usize() - l_first;
183+
let l_values = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;
184+
185+
let r_first = r.offsets()[0].as_usize();
186+
let r_len = r.offsets()[r.len()].as_usize() - r_first;
187+
let r_values = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;
182188

183189
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
184190
offsets.push(OffsetSize::usize_as(0));
@@ -201,14 +207,14 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
201207
continue;
202208
}
203209

204-
for element_index in r_start.as_usize()..r_end.as_usize() {
210+
for element_index in r_start.as_usize() - r_first..r_end.as_usize() - r_first {
205211
let right_row = r_values.row(element_index);
206212
dedup.insert(right_row);
207213
}
208-
for element_index in l_start.as_usize()..l_end.as_usize() {
214+
for element_index in l_start.as_usize() - l_first..l_end.as_usize() - l_first {
209215
let left_row = l_values.row(element_index);
210216
if dedup.insert(left_row) {
211-
indices.push(element_index);
217+
indices.push(element_index + l_first);
212218
}
213219
}
214220

@@ -237,3 +243,64 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
237243
nulls,
238244
))
239245
}
246+
247+
#[cfg(test)]
248+
mod tests {
249+
use super::ArrayExcept;
250+
use arrow::array::{Array, AsArray, Int32Array, ListArray};
251+
use arrow::datatypes::{Field, Int32Type};
252+
use datafusion_common::{Result, config::ConfigOptions};
253+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
254+
use std::sync::Arc;
255+
256+
#[test]
257+
fn test_array_except_sliced_lists() -> Result<()> {
258+
// l: [[1,2], [3,4], [5,6], [7,8]] → slice(1,2) → [[3,4], [5,6]]
259+
// r: [[3], [5], [6], [8]] → slice(1,2) → [[5], [6]]
260+
// except(l, r) should be [[3,4], [5]]
261+
let l_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
262+
Some(vec![Some(1), Some(2)]),
263+
Some(vec![Some(3), Some(4)]),
264+
Some(vec![Some(5), Some(6)]),
265+
Some(vec![Some(7), Some(8)]),
266+
]);
267+
let r_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
268+
Some(vec![Some(3)]),
269+
Some(vec![Some(5)]),
270+
Some(vec![Some(6)]),
271+
Some(vec![Some(8)]),
272+
]);
273+
let l_sliced = l_full.slice(1, 2);
274+
let r_sliced = r_full.slice(1, 2);
275+
276+
let list_field = Arc::new(Field::new("item", l_sliced.data_type().clone(), true));
277+
let return_field =
278+
Arc::new(Field::new("return", l_sliced.data_type().clone(), true));
279+
280+
let result = ArrayExcept::new().invoke_with_args(ScalarFunctionArgs {
281+
args: vec![
282+
ColumnarValue::Array(Arc::new(l_sliced)),
283+
ColumnarValue::Array(Arc::new(r_sliced)),
284+
],
285+
arg_fields: vec![Arc::clone(&list_field), Arc::clone(&list_field)],
286+
number_rows: 2,
287+
return_field,
288+
config_options: Arc::new(ConfigOptions::default()),
289+
})?;
290+
291+
let output = result.into_array(2)?;
292+
let output = output.as_list::<i32>();
293+
294+
// Row 0: [3,4] except [5] = [3,4]
295+
let row0 = output.value(0);
296+
let row0 = row0.as_any().downcast_ref::<Int32Array>().unwrap();
297+
assert_eq!(row0.values().as_ref(), &[3, 4]);
298+
299+
// Row 1: [5,6] except [6] = [5]
300+
let row1 = output.value(1);
301+
let row1 = row1.as_any().downcast_ref::<Int32Array>().unwrap();
302+
assert_eq!(row1.values().as_ref(), &[5]);
303+
304+
Ok(())
305+
}
306+
}

0 commit comments

Comments
 (0)