Skip to content

Commit 468b690

Browse files
authored
perf: optimize array_distinct with batched row conversion (#20364)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This PR optimizes the `array_distinct` function by batching value conversions and utilizing a `HashSet` for deduplication. It is a follow-up to #20243. ## What changes are included in this PR? This PR optimizes `array_distinct` by: 1. Converting all values to rows in a single batch rather than individually. 2. Using a HashSet to deduplicate values for each list. ### Benchmark ``` group main optimized ----- ---- --------- array_distinct/high_duplicate/10 2.66 855.1±28.18µs ? ?/sec 1.00 321.9±8.70µs ? ?/sec array_distinct/high_duplicate/100 2.21 6.4±0.13ms ? ?/sec 1.00 2.9±0.09ms ? ?/sec array_distinct/high_duplicate/50 2.14 3.2±0.05ms ? ?/sec 1.00 1478.3±41.90µs ? ?/sec array_distinct/low_duplicate/10 2.73 1017.3±44.67µs ? ?/sec 1.00 372.5±17.33µs ? ?/sec array_distinct/low_duplicate/100 1.32 4.4±0.13ms ? ?/sec 1.00 3.3±0.15ms ? ?/sec array_distinct/low_duplicate/50 1.55 2.6±0.06ms ? ?/sec 1.00 1689.0±94.15µs ? ?/sec ``` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, unit tests exist and pass. ## Are there any user-facing changes? Yes, there is a slight change in the output order. This new behavior is consistent with `array_union` and `array_intersect`, where the output order is more intuitive as it preserves the original order of elements in the array. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: lyne7-sc <lilinfeng0310@gmail.com>
1 parent ea51d90 commit 468b690

File tree

3 files changed

+139
-34
lines changed

3 files changed

+139
-34
lines changed

datafusion/functions-nested/benches/array_set_ops.rs

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use criterion::{
2323
};
2424
use datafusion_common::config::ConfigOptions;
2525
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
26-
use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion};
26+
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion};
2727
use rand::SeedableRng;
2828
use rand::prelude::SliceRandom;
2929
use rand::rngs::StdRng;
@@ -38,6 +38,7 @@ const SEED: u64 = 42;
3838
fn criterion_benchmark(c: &mut Criterion) {
3939
bench_array_union(c);
4040
bench_array_intersect(c);
41+
bench_array_distinct(c);
4142
}
4243

4344
fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) {
@@ -97,6 +98,48 @@ fn bench_array_intersect(c: &mut Criterion) {
9798
group.finish();
9899
}
99100

101+
fn bench_array_distinct(c: &mut Criterion) {
102+
let mut group = c.benchmark_group("array_distinct");
103+
let udf = ArrayDistinct::new();
104+
105+
for (duplicate_label, duplicate_ratio) in
106+
&[("high_duplicate", 0.8), ("low_duplicate", 0.2)]
107+
{
108+
for &array_size in ARRAY_SIZES {
109+
let array =
110+
create_array_with_duplicates(NUM_ROWS, array_size, *duplicate_ratio);
111+
group.bench_with_input(
112+
BenchmarkId::new(*duplicate_label, array_size),
113+
&array_size,
114+
|b, _| {
115+
b.iter(|| {
116+
black_box(
117+
udf.invoke_with_args(ScalarFunctionArgs {
118+
args: vec![ColumnarValue::Array(array.clone())],
119+
arg_fields: vec![
120+
Field::new("arr", array.data_type().clone(), false)
121+
.into(),
122+
],
123+
number_rows: NUM_ROWS,
124+
return_field: Field::new(
125+
"result",
126+
array.data_type().clone(),
127+
false,
128+
)
129+
.into(),
130+
config_options: Arc::new(ConfigOptions::default()),
131+
})
132+
.unwrap(),
133+
)
134+
})
135+
},
136+
);
137+
}
138+
}
139+
140+
group.finish();
141+
}
142+
100143
fn create_arrays_with_overlap(
101144
num_rows: usize,
102145
array_size: usize,
@@ -164,5 +207,53 @@ fn create_arrays_with_overlap(
164207
(array1, array2)
165208
}
166209

210+
fn create_array_with_duplicates(
211+
num_rows: usize,
212+
array_size: usize,
213+
duplicate_ratio: f64,
214+
) -> ArrayRef {
215+
assert!((0.0..=1.0).contains(&duplicate_ratio));
216+
let unique_count = ((array_size as f64) * (1.0 - duplicate_ratio)).round() as usize;
217+
let duplicate_count = array_size - unique_count;
218+
219+
let mut rng = StdRng::seed_from_u64(SEED);
220+
let mut values = Vec::with_capacity(num_rows * array_size);
221+
222+
for row in 0..num_rows {
223+
let base = (row as i64) * (array_size as i64) * 2;
224+
225+
// Add unique values first
226+
for i in 0..unique_count {
227+
values.push(base + i as i64);
228+
}
229+
230+
// Fill the rest with duplicates randomly picked from the unique values
231+
let mut unique_indices: Vec<i64> =
232+
(0..unique_count).map(|i| base + i as i64).collect();
233+
unique_indices.shuffle(&mut rng);
234+
235+
for i in 0..duplicate_count {
236+
values.push(unique_indices[i % unique_count]);
237+
}
238+
}
239+
240+
let values = Int64Array::from(values);
241+
let field = Arc::new(Field::new("item", DataType::Int64, true));
242+
243+
let offsets = (0..=num_rows)
244+
.map(|i| (i * array_size) as i32)
245+
.collect::<Vec<i32>>();
246+
247+
Arc::new(
248+
ListArray::try_new(
249+
field,
250+
OffsetBuffer::new(offsets.into()),
251+
Arc::new(values),
252+
None,
253+
)
254+
.unwrap(),
255+
)
256+
}
257+
167258
criterion_group!(benches, criterion_benchmark);
168259
criterion_main!(benches);

datafusion/functions-nested/src/set_ops.rs

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use arrow::array::{
2222
Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array,
2323
};
2424
use arrow::buffer::{NullBuffer, OffsetBuffer};
25-
use arrow::compute;
2625
use arrow::datatypes::DataType::{LargeList, List, Null};
2726
use arrow::datatypes::{DataType, Field, FieldRef};
2827
use arrow::row::{RowConverter, SortField};
@@ -35,7 +34,6 @@ use datafusion_expr::{
3534
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
3635
};
3736
use datafusion_macros::user_doc;
38-
use itertools::Itertools;
3937
use std::any::Any;
4038
use std::collections::HashSet;
4139
use std::fmt::{Display, Formatter};
@@ -264,7 +262,7 @@ impl ScalarUDFImpl for ArrayIntersect {
264262
)
265263
)]
266264
#[derive(Debug, PartialEq, Eq, Hash)]
267-
pub(super) struct ArrayDistinct {
265+
pub struct ArrayDistinct {
268266
signature: Signature,
269267
aliases: Vec<String>,
270268
}
@@ -278,6 +276,12 @@ impl ArrayDistinct {
278276
}
279277
}
280278

279+
impl Default for ArrayDistinct {
280+
fn default() -> Self {
281+
Self::new()
282+
}
283+
}
284+
281285
impl ScalarUDFImpl for ArrayDistinct {
282286
fn as_any(&self) -> &dyn Any {
283287
self
@@ -527,42 +531,52 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
527531
if array.is_empty() {
528532
return Ok(Arc::new(array.clone()) as ArrayRef);
529533
}
534+
let value_offsets = array.value_offsets();
530535
let dt = array.value_type();
531-
let mut offsets = Vec::with_capacity(array.len());
536+
let mut offsets = Vec::with_capacity(array.len() + 1);
532537
offsets.push(OffsetSize::usize_as(0));
533-
let mut new_arrays = Vec::with_capacity(array.len());
534-
let converter = RowConverter::new(vec![SortField::new(dt)])?;
535-
// distinct for each list in ListArray
536-
for arr in array.iter() {
537-
let last_offset: OffsetSize = offsets.last().copied().unwrap();
538-
let Some(arr) = arr else {
539-
// Add same offset for null
538+
539+
// Convert all values to row format in a single batch for performance
540+
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
541+
let rows = converter.convert_columns(&[Arc::clone(array.values())])?;
542+
let mut final_rows = Vec::with_capacity(rows.num_rows());
543+
let mut seen = HashSet::new();
544+
for i in 0..array.len() {
545+
let last_offset = *offsets.last().unwrap();
546+
547+
// Null list entries produce no output; just carry forward the offset.
548+
if array.is_null(i) {
540549
offsets.push(last_offset);
541550
continue;
542-
};
543-
let values = converter.convert_columns(&[arr])?;
544-
// sort elements in list and remove duplicates
545-
let rows = values.iter().sorted().dedup().collect::<Vec<_>>();
546-
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
547-
let arrays = converter.convert_rows(rows)?;
548-
let array = match arrays.first() {
549-
Some(array) => Arc::clone(array),
550-
None => {
551-
return internal_err!("array_distinct: failed to get array from rows");
551+
}
552+
553+
let start = value_offsets[i].as_usize();
554+
let end = value_offsets[i + 1].as_usize();
555+
seen.clear();
556+
seen.reserve(end - start);
557+
558+
// Walk the sub-array and keep only the first occurrence of each value.
559+
for idx in start..end {
560+
let row = rows.row(idx);
561+
if seen.insert(row) {
562+
final_rows.push(row);
552563
}
553-
};
554-
new_arrays.push(array);
555-
}
556-
if new_arrays.is_empty() {
557-
return Ok(Arc::new(array.clone()) as ArrayRef);
564+
}
565+
offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
558566
}
559-
let offsets = OffsetBuffer::new(offsets.into());
560-
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
561-
let values = compute::concat(&new_arrays_ref)?;
567+
568+
// Convert all collected distinct rows back
569+
let final_values = if final_rows.is_empty() {
570+
new_empty_array(&dt)
571+
} else {
572+
let arrays = converter.convert_rows(final_rows)?;
573+
Arc::clone(&arrays[0])
574+
};
575+
562576
Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
563577
Arc::clone(field),
564-
offsets,
565-
values,
578+
OffsetBuffer::new(offsets.into()),
579+
final_values,
566580
// Keep the list nulls
567581
array.nulls().cloned(),
568582
)?))

datafusion/sqllogictest/test_files/array.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6832,7 +6832,7 @@ from array_distinct_table_2D;
68326832
----
68336833
[[1, 2], [3, 4], [5, 6]]
68346834
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
6835-
[NULL, [5, 6]]
6835+
[[5, 6], NULL]
68366836

68376837
query ?
68386838
select array_distinct(column1)
@@ -6864,7 +6864,7 @@ from array_distinct_table_2D_fixed;
68646864
----
68656865
[[1, 2], [3, 4], [5, 6]]
68666866
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
6867-
[NULL, [5, 6]]
6867+
[[5, 6], NULL]
68686868

68696869
query ???
68706870
select array_intersect(column1, column2),

0 commit comments

Comments
 (0)