Skip to content

Commit 47e2709

Browse files
authored
Merge branch 'main' into alamb/forward_port_52.3.0
2 parents 24d6b71 + 8e02b8e commit 47e2709

6 files changed

Lines changed: 112 additions & 43 deletions

File tree

.github/workflows/audit.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@d6e286fa45544157a02d45a43742857ebbc25d12 # v2.68.16
45+
uses: taiki-e/install-action@a37010ded18ff788be4440302bd6830b1ae50d8b # v2.68.25
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check
4949
# Note: you can ignore specific RUSTSEC issues using the `--ignore` flag ,for example:
5050
# run: cargo audit --ignore RUSTSEC-2026-0001
51-
run: cargo audit
51+
run: cargo audit --ignore RUSTSEC-2024-0436

.github/workflows/codeql.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ jobs:
4545
persist-credentials: false
4646

4747
- name: Initialize CodeQL
48-
uses: github/codeql-action/init@c793b717bc78562f491db7b0e93a3a178b099162 # v4
48+
uses: github/codeql-action/init@0d579ffd059c29b07949a3cce3983f0780820c98 # v4
4949
with:
5050
languages: actions
5151

5252
- name: Perform CodeQL Analysis
53-
uses: github/codeql-action/analyze@c793b717bc78562f491db7b0e93a3a178b099162 # v4
53+
uses: github/codeql-action/analyze@0d579ffd059c29b07949a3cce3983f0780820c98 # v4
5454
with:
5555
category: "/language:actions"

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ jobs:
431431
sudo apt-get update -qq
432432
sudo apt-get install -y -qq clang
433433
- name: Setup wasm-pack
434-
uses: taiki-e/install-action@d6e286fa45544157a02d45a43742857ebbc25d12 # v2.68.16
434+
uses: taiki-e/install-action@a37010ded18ff788be4440302bd6830b1ae50d8b # v2.68.25
435435
with:
436436
tool: wasm-pack
437437
- name: Run tests with headless mode
@@ -771,7 +771,7 @@ jobs:
771771
- name: Setup Rust toolchain
772772
uses: ./.github/actions/setup-builder
773773
- name: Install cargo-msrv
774-
uses: taiki-e/install-action@d6e286fa45544157a02d45a43742857ebbc25d12 # v2.68.16
774+
uses: taiki-e/install-action@a37010ded18ff788be4440302bd6830b1ae50d8b # v2.68.25
775775
with:
776776
tool: cargo-msrv
777777

datafusion/functions-nested/benches/array_set_ops.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use criterion::{
2323
};
2424
use datafusion_common::config::ConfigOptions;
2525
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
26+
use datafusion_functions_nested::except::ArrayExcept;
2627
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion};
2728
use rand::SeedableRng;
2829
use rand::prelude::SliceRandom;
@@ -38,6 +39,7 @@ const SEED: u64 = 42;
3839
fn criterion_benchmark(c: &mut Criterion) {
3940
bench_array_union(c);
4041
bench_array_intersect(c);
42+
bench_array_except(c);
4143
bench_array_distinct(c);
4244
}
4345

@@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) {
98100
group.finish();
99101
}
100102

103+
fn bench_array_except(c: &mut Criterion) {
104+
let mut group = c.benchmark_group("array_except");
105+
let udf = ArrayExcept::new();
106+
107+
for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
108+
for &array_size in ARRAY_SIZES {
109+
let (array1, array2) =
110+
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
111+
group.bench_with_input(
112+
BenchmarkId::new(*overlap_label, array_size),
113+
&array_size,
114+
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
115+
);
116+
}
117+
}
118+
119+
group.finish();
120+
}
121+
101122
fn bench_array_distinct(c: &mut Criterion) {
102123
let mut group = c.benchmark_group("array_distinct");
103124
let udf = ArrayDistinct::new();

datafusion/functions-nested/src/except.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
2020
use crate::utils::{check_datatypes, make_scalar_function};
2121
use arrow::array::new_null_array;
22-
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
22+
use arrow::array::{
23+
Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
24+
cast::AsArray,
25+
};
2326
use arrow::buffer::{NullBuffer, OffsetBuffer};
27+
use arrow::compute::take;
2428
use arrow::datatypes::{DataType, FieldRef};
2529
use arrow::row::{RowConverter, SortField};
2630
use datafusion_common::utils::{ListCoercion, take_function_args};
@@ -179,7 +183,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
179183
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
180184
offsets.push(OffsetSize::usize_as(0));
181185

182-
let mut rows = Vec::with_capacity(l_values.num_rows());
186+
let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
183187
let mut dedup = HashSet::new();
184188

185189
let nulls = NullBuffer::union(l.nulls(), r.nulls());
@@ -193,7 +197,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
193197
.as_ref()
194198
.is_some_and(|nulls| nulls.is_null(list_index))
195199
{
196-
offsets.push(OffsetSize::usize_as(rows.len()));
200+
offsets.push(OffsetSize::usize_as(indices.len()));
197201
continue;
198202
}
199203

@@ -204,22 +208,32 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
204208
for element_index in l_start.as_usize()..l_end.as_usize() {
205209
let left_row = l_values.row(element_index);
206210
if dedup.insert(left_row) {
207-
rows.push(left_row);
211+
indices.push(element_index);
208212
}
209213
}
210214

211-
offsets.push(OffsetSize::usize_as(rows.len()));
215+
offsets.push(OffsetSize::usize_as(indices.len()));
212216
dedup.clear();
213217
}
214218

215-
if let Some(values) = converter.convert_rows(rows)?.first() {
216-
Ok(GenericListArray::<OffsetSize>::new(
217-
field.to_owned(),
218-
OffsetBuffer::new(offsets.into()),
219-
values.to_owned(),
220-
nulls,
221-
))
219+
// Gather distinct left-side values by index.
220+
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
221+
let values = if indices.is_empty() {
222+
arrow::array::new_empty_array(&l.value_type())
223+
} else if OffsetSize::IS_LARGE {
224+
let indices =
225+
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
226+
take(l.values().as_ref(), &indices, None)?
222227
} else {
223-
internal_err!("array_except failed to convert rows")
224-
}
228+
let indices =
229+
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
230+
take(l.values().as_ref(), &indices, None)?
231+
};
232+
233+
Ok(GenericListArray::<OffsetSize>::new(
234+
field.to_owned(),
235+
OffsetBuffer::new(offsets.into()),
236+
values,
237+
nulls,
238+
))
225239
}

datafusion/functions-nested/src/set_ops.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
2020
use crate::utils::make_scalar_function;
2121
use arrow::array::{
22-
Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array,
22+
Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
23+
new_empty_array, new_null_array,
2324
};
2425
use arrow::buffer::{NullBuffer, OffsetBuffer};
26+
use arrow::compute::{concat, take};
2527
use arrow::datatypes::DataType::{LargeList, List, Null};
2628
use arrow::datatypes::{DataType, Field, FieldRef};
2729
use arrow::row::{RowConverter, SortField};
@@ -373,12 +375,28 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
373375
let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
374376
let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;
375377

378+
// Combine value arrays so indices from both sides share a single index space.
379+
let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?;
380+
let r_offset = l.values().len();
381+
376382
match set_op {
377383
SetOp::Union => generic_set_loop::<OffsetSize, true>(
378-
l, r, &rows_l, &rows_r, field, &converter,
384+
l,
385+
r,
386+
&rows_l,
387+
&rows_r,
388+
field,
389+
&combined_values,
390+
r_offset,
379391
),
380392
SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
381-
l, r, &rows_l, &rows_r, field, &converter,
393+
l,
394+
r,
395+
&rows_l,
396+
&rows_r,
397+
field,
398+
&combined_values,
399+
r_offset,
382400
),
383401
}
384402
}
@@ -391,7 +409,8 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
391409
rows_l: &arrow::row::Rows,
392410
rows_r: &arrow::row::Rows,
393411
field: Arc<Field>,
394-
converter: &RowConverter,
412+
combined_values: &ArrayRef,
413+
r_offset: usize,
395414
) -> Result<ArrayRef> {
396415
let l_offsets = l.value_offsets();
397416
let r_offsets = r.value_offsets();
@@ -406,7 +425,7 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
406425
rows_l.num_rows().min(rows_r.num_rows())
407426
};
408427

409-
let mut final_rows = Vec::with_capacity(initial_capacity);
428+
let mut indices: Vec<usize> = Vec::with_capacity(initial_capacity);
410429

411430
// Reuse hash sets across iterations
412431
let mut seen = HashSet::new();
@@ -430,25 +449,27 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
430449
for idx in l_start..l_end {
431450
let row = rows_l.row(idx);
432451
if seen.insert(row) {
433-
final_rows.push(row);
452+
indices.push(idx);
434453
}
435454
}
436455
for idx in r_start..r_end {
437456
let row = rows_r.row(idx);
438457
if seen.insert(row) {
439-
final_rows.push(row);
458+
indices.push(idx + r_offset);
440459
}
441460
}
442461
} else {
443462
let l_len = l_end - l_start;
444463
let r_len = r_end - r_start;
445464

446-
// Select shorter side for lookup, longer side for probing
447-
let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len {
448-
(rows_l, l_start..l_end, rows_r, r_start..r_end)
449-
} else {
450-
(rows_r, r_start..r_end, rows_l, l_start..l_end)
451-
};
465+
// Select shorter side for lookup, longer side for probing.
466+
// Track the probe side's offset into the combined values array.
467+
let (lookup_rows, lookup_range, probe_rows, probe_range, probe_offset) =
468+
if l_len < r_len {
469+
(rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset)
470+
} else {
471+
(rows_r, r_start..r_end, rows_l, l_start..l_end, 0)
472+
};
452473
lookup_set.clear();
453474
lookup_set.reserve(lookup_range.len());
454475

@@ -461,18 +482,25 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
461482
for idx in probe_range {
462483
let row = probe_rows.row(idx);
463484
if lookup_set.contains(&row) && seen.insert(row) {
464-
final_rows.push(row);
485+
indices.push(idx + probe_offset);
465486
}
466487
}
467488
}
468489
result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
469490
}
470491

471-
let final_values = if final_rows.is_empty() {
492+
// Gather distinct values by index from the combined values array.
493+
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
494+
let final_values = if indices.is_empty() {
472495
new_empty_array(&l.value_type())
496+
} else if OffsetSize::IS_LARGE {
497+
let indices =
498+
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
499+
take(combined_values.as_ref(), &indices, None)?
473500
} else {
474-
let arrays = converter.convert_rows(final_rows)?;
475-
Arc::clone(&arrays[0])
501+
let indices =
502+
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
503+
take(combined_values.as_ref(), &indices, None)?
476504
};
477505

478506
let arr = GenericListArray::<OffsetSize>::try_new(
@@ -539,7 +567,7 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
539567
// Convert all values to row format in a single batch for performance
540568
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
541569
let rows = converter.convert_columns(&[Arc::clone(array.values())])?;
542-
let mut final_rows = Vec::with_capacity(rows.num_rows());
570+
let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
543571
let mut seen = HashSet::new();
544572
for i in 0..array.len() {
545573
let last_offset = *offsets.last().unwrap();
@@ -559,18 +587,24 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
559587
for idx in start..end {
560588
let row = rows.row(idx);
561589
if seen.insert(row) {
562-
final_rows.push(row);
590+
indices.push(idx);
563591
}
564592
}
565593
offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
566594
}
567595

568-
// Convert all collected distinct rows back
569-
let final_values = if final_rows.is_empty() {
596+
// Gather distinct values in a single pass, using the computed `indices`.
597+
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
598+
let final_values = if indices.is_empty() {
570599
new_empty_array(&dt)
600+
} else if OffsetSize::IS_LARGE {
601+
let indices =
602+
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
603+
take(array.values().as_ref(), &indices, None)?
571604
} else {
572-
let arrays = converter.convert_rows(final_rows)?;
573-
Arc::clone(&arrays[0])
605+
let indices =
606+
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
607+
take(array.values().as_ref(), &indices, None)?
574608
};
575609

576610
Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(

0 commit comments

Comments
 (0)