Skip to content

Commit 6bda7fc

Browse files
author
B Vadlamani
committed
init
1 parent 3d2e6b2 commit 6bda7fc

1 file changed

Lines changed: 99 additions & 29 deletions

File tree

  • datafusion/physical-plan/src/joins

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 99 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use crate::joins::{JoinOn, JoinOnRef};
4343
use arrow::array::{
4444
Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray,
4545
RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, UInt64Array,
46-
builder::UInt64Builder, downcast_array, new_null_array,
46+
builder::UInt64Builder, downcast_array, make_comparator, new_null_array,
4747
};
4848
use arrow::array::{
4949
ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
@@ -54,7 +54,7 @@ use arrow::array::{
5454
};
5555
use arrow::buffer::{BooleanBuffer, NullBuffer};
5656
use arrow::compute::kernels::cmp::eq;
57-
use arrow::compute::{self, FilterBuilder, and, take};
57+
use arrow::compute::{self, take};
5858
use arrow::datatypes::{
5959
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
6060
};
@@ -1767,37 +1767,54 @@ pub(super) fn equal_rows_arr(
17671767
right_arrays: &[ArrayRef],
17681768
null_equality: NullEquality,
17691769
) -> Result<(UInt64Array, UInt32Array)> {
1770-
let mut iter = left_arrays.iter().zip(right_arrays.iter());
1771-
1772-
let Some((first_left, first_right)) = iter.next() else {
1773-
return Ok((Vec::<u64>::new().into(), Vec::<u32>::new().into()));
1774-
};
1775-
1776-
let arr_left = take(first_left.as_ref(), indices_left, None)?;
1777-
let arr_right = take(first_right.as_ref(), indices_right, None)?;
1778-
1779-
let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
1770+
let num_indices = indices_left.len();
1771+
if num_indices == 0 || left_arrays.is_empty() {
1772+
return Ok((
1773+
UInt64Array::from(Vec::<u64>::new()),
1774+
UInt32Array::from(Vec::<u32>::new()),
1775+
));
1776+
}
17801777

1781-
// Use map and try_fold to iterate over the remaining pairs of arrays.
1782-
// In each iteration, take is used on the pair of arrays and their equality is determined.
1783-
// The results are then folded (combined) using the and function to get a final equality result.
1784-
equal = iter
1785-
.map(|(left, right)| {
1786-
let arr_left = take(left.as_ref(), indices_left, None)?;
1787-
let arr_right = take(right.as_ref(), indices_right, None)?;
1788-
eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)
1789-
})
1790-
.try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
1778+
let mut comparators = Vec::with_capacity(left_arrays.len());
1779+
for (left, right) in left_arrays.iter().zip(right_arrays.iter()) {
1780+
comparators.push(make_comparator(
1781+
left.as_ref(),
1782+
right.as_ref(),
1783+
SortOptions::default(),
1784+
)?);
1785+
}
17911786

1792-
let filter_builder = FilterBuilder::new(&equal).optimize().build();
1787+
let mut left_builder = UInt64Builder::with_capacity(num_indices);
1788+
let mut right_builder = UInt32Builder::with_capacity(num_indices);
1789+
1790+
for i in 0..num_indices {
1791+
let left_idx = indices_left.value(i) as usize;
1792+
let right_idx = indices_right.value(i) as usize;
1793+
1794+
let mut is_equal = true;
1795+
for (col_idx, cmp) in comparators.iter().enumerate() {
1796+
match (null_equality, cmp(left_idx, right_idx)) {
1797+
(NullEquality::NullEqualsNull, Ordering::Equal) => continue,
1798+
(NullEquality::NullEqualsNothing, Ordering::Equal) => {
1799+
if left_arrays.get(col_idx).unwrap().is_null(left_idx) {
1800+
is_equal = false;
1801+
break;
1802+
}
1803+
}
1804+
_ => {
1805+
is_equal = false;
1806+
break;
1807+
}
1808+
}
1809+
}
17931810

1794-
let left_filtered = filter_builder.filter(indices_left)?;
1795-
let right_filtered = filter_builder.filter(indices_right)?;
1811+
if is_equal {
1812+
left_builder.append_value(indices_left.value(i));
1813+
right_builder.append_value(indices_right.value(i));
1814+
}
1815+
}
17961816

1797-
Ok((
1798-
downcast_array(left_filtered.as_ref()),
1799-
downcast_array(right_filtered.as_ref()),
1800-
))
1817+
Ok((left_builder.finish(), right_builder.finish()))
18011818
}
18021819

18031820
// version of eq_dyn supporting equality on null arrays
@@ -2949,4 +2966,57 @@ mod tests {
29492966
let result = max_distinct_count(&num_rows, &stats);
29502967
assert_eq!(result, Exact(0));
29512968
}
2969+
2970+
#[test]
2971+
fn test_equal_rows_arr() -> Result<()> {
2972+
let left_col = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef;
2973+
let right_col = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 6])) as ArrayRef;
2974+
2975+
let indices_left = UInt64Array::from(vec![0, 1, 2, 3, 4]);
2976+
let indices_right = UInt32Array::from(vec![0, 1, 2, 3, 4]);
2977+
2978+
// Test NullEqualsNothing
2979+
let (res_left, res_right) = equal_rows_arr(
2980+
&indices_left,
2981+
&indices_right,
2982+
&[Arc::clone(&left_col)],
2983+
&[Arc::clone(&right_col)],
2984+
NullEquality::NullEqualsNothing,
2985+
)?;
2986+
2987+
assert_eq!(res_left, UInt64Array::from(vec![0, 1, 2, 3]));
2988+
assert_eq!(res_right, UInt32Array::from(vec![0, 1, 2, 3]));
2989+
2990+
// Test with NULLs
2991+
let left_col = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
2992+
let right_col =
2993+
Arc::new(Int32Array::from(vec![Some(1), None, Some(4)])) as ArrayRef;
2994+
2995+
let indices_left = UInt64Array::from(vec![0, 1, 2]);
2996+
let indices_right = UInt32Array::from(vec![0, 1, 2]);
2997+
2998+
// NullEqualsNothing: NULL != NULL
2999+
let (res_left, res_right) = equal_rows_arr(
3000+
&indices_left,
3001+
&indices_right,
3002+
&[Arc::clone(&left_col)],
3003+
&[Arc::clone(&right_col)],
3004+
NullEquality::NullEqualsNothing,
3005+
)?;
3006+
assert_eq!(res_left, UInt64Array::from(vec![0]));
3007+
assert_eq!(res_right, UInt32Array::from(vec![0]));
3008+
3009+
// NullEqualsNull: NULL == NULL
3010+
let (res_left, res_right) = equal_rows_arr(
3011+
&indices_left,
3012+
&indices_right,
3013+
&[Arc::clone(&left_col)],
3014+
&[Arc::clone(&right_col)],
3015+
NullEquality::NullEqualsNull,
3016+
)?;
3017+
assert_eq!(res_left, UInt64Array::from(vec![0, 1]));
3018+
assert_eq!(res_right, UInt32Array::from(vec![0, 1]));
3019+
3020+
Ok(())
3021+
}
29523022
}

0 commit comments

Comments
 (0)