Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ jobs:
value: |
org.apache.comet.CometExpressionSuite
org.apache.comet.CometExpressionCoverageSuite
org.apache.comet.CometHashExpressionSuite
org.apache.comet.CometTemporalExpressionSuite
org.apache.comet.CometArrayExpressionSuite
org.apache.comet.CometCastSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ jobs:
value: |
org.apache.comet.CometExpressionSuite
org.apache.comet.CometExpressionCoverageSuite
org.apache.comet.CometHashExpressionSuite
org.apache.comet.CometTemporalExpressionSuite
org.apache.comet.CometArrayExpressionSuite
org.apache.comet.CometCastSuite
Expand Down
3 changes: 2 additions & 1 deletion native/spark-expr/src/hash_funcs/murmur3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ pub fn create_murmur3_hashes<'a>(
arrays,
hashes_buffer,
spark_compatible_murmur3_hash,
create_hashes_dictionary
create_hashes_dictionary,
create_murmur3_hashes
);
Ok(hashes_buffer)
}
Expand Down
106 changes: 105 additions & 1 deletion native/spark-expr/src/hash_funcs/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,46 @@ macro_rules! hash_array_decimal {
};
}

/// Hash a list array by recursively hashing each element.
/// For each row, we hash all elements in the list.
/// Spark hashes arrays by recursively hashing each element, where each
/// element's hash is computed using the previous element's hash as the seed.
/// This creates a chain: hash(elem_n, hash(elem_n-1, ... hash(elem_0, seed)...))
#[macro_export]
macro_rules! hash_list_array {
($array_type:ident, $offset_type:ty, $column: ident, $hashes: ident, $recursive_hash_method: ident) => {
let list_array = $column
.as_any()
.downcast_ref::<$array_type>()
.unwrap_or_else(|| {
panic!(
"Failed to downcast column to {}. Actual data type: {:?}.",
stringify!($array_type),
$column.data_type()
)
});

let values = list_array.values();
let offsets = list_array.offsets();

// For each row, hash the elements in its list
for (row_idx, hash) in $hashes.iter_mut().enumerate() {
if !list_array.is_null(row_idx) {
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
let start = offsets[row_idx] as usize;
let end = offsets[row_idx + 1] as usize;
let len = end - start;
// Hash each element in sequence, chaining the hash values
for elem_idx in 0..len {
let elem_array = values.slice(start + elem_idx, 1);
let mut single_hash = [*hash];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is single_hash an array?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single_hash is an array because the recursive hash method interface expects a slice of hashes and this allows us to reuse that rather than add another version of the code

$recursive_hash_method(&[elem_array], &mut single_hash)?;
*hash = single_hash[0];
}
}
}
};
}

/// Creates hash values for every row, based on the values in the
/// columns.
///
Expand All @@ -214,9 +254,10 @@ macro_rules! hash_array_decimal {
///
/// `hash_method` is the hash function to use.
/// `create_dictionary_hash_method` is the function to create hashes for dictionary arrays input.
/// `recursive_hash_method` is the function to call for recursive hashing of complex types.
#[macro_export]
macro_rules! create_hashes_internal {
($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident) => {
($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident, $recursive_hash_method: ident) => {
use arrow::datatypes::{DataType, TimeUnit};
use arrow::array::{types::*, *};

Expand Down Expand Up @@ -425,6 +466,69 @@ macro_rules! create_hashes_internal {
)))
}
},
DataType::List(_) => {
$crate::hash_list_array!(ListArray, i32, col, $hashes_buffer, $recursive_hash_method);
}
DataType::LargeList(_) => {
$crate::hash_list_array!(LargeListArray, i64, col, $hashes_buffer, $recursive_hash_method);
}
DataType::FixedSizeList(_, size) => {
let list_array = col.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
let values = list_array.values();
let list_size = *size as usize;

// For each row, hash the elements in its fixed-size list
for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() {
if !list_array.is_null(row_idx) {
let start = row_idx * list_size;
// Hash each element in sequence, chaining the hash values
for elem_idx in 0..list_size {
let elem_array = values.slice(start + elem_idx, 1);
let mut single_hash = [*hash];
$recursive_hash_method(&[elem_array], &mut single_hash)?;
*hash = single_hash[0];
}
}
}
}
DataType::Struct(_) => {
let struct_array = col.as_any().downcast_ref::<StructArray>().unwrap();
// Hash each field of the struct - Spark hashes all fields recursively
let columns: Vec<ArrayRef> = struct_array.columns().to_vec();
if !columns.is_empty() {
$recursive_hash_method(&columns, $hashes_buffer)?;
}
}
DataType::Map(_, _) => {
let map_array = col.as_any().downcast_ref::<MapArray>().unwrap();
// For maps, Spark hashes by iterating through (key, value) pairs
// For each entry, hash the key then the value
let keys = map_array.keys();
let values = map_array.values();
let offsets = map_array.offsets();

// For each row, hash the key-value pairs in sequence
for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() {
if !map_array.is_null(row_idx) {
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
let start = offsets[row_idx] as usize;
let end = offsets[row_idx + 1] as usize;
// Hash each key-value pair in sequence
for entry_idx in start..end {
// Hash the key
let key_array = keys.slice(entry_idx, 1);
let mut single_hash = [*hash];
$recursive_hash_method(&[key_array], &mut single_hash)?;
*hash = single_hash[0];

// Hash the value
let value_array = values.slice(entry_idx, 1);
single_hash = [*hash];
$recursive_hash_method(&[value_array], &mut single_hash)?;
*hash = single_hash[0];
}
}
}
}
_ => {
// This is internal because we should have caught this before.
return Err(DataFusionError::Internal(format!(
Expand Down
3 changes: 2 additions & 1 deletion native/spark-expr/src/hash_funcs/xxhash64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ fn create_xxhash64_hashes<'a>(
arrays,
hashes_buffer,
spark_compatible_xxhash64,
create_xxhash64_hashes_dictionary
create_xxhash64_hashes_dictionary,
create_xxhash64_hashes
);
Ok(hashes_buffer)
}
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/org/apache/comet/serde/hash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private object HashUtils {
// Java BigDecimal before hashing
withInfo(expr, s"Unsupported datatype: $dt (precision > 18)")
return false
case dt if !supportedDataType(dt) =>
case dt if !supportedDataType(dt, allowComplex = true) =>
withInfo(expr, s"Unsupported datatype $dt")
return false
case _ =>
Expand Down
Loading
Loading