-
Notifications
You must be signed in to change notification settings - Fork 329
feat: Expand murmur3 hash support to complex types
#3077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
953154e
72c5342
b76e3eb
45c679b
86355d8
a31b2c0
078b204
a50e09c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -206,6 +206,46 @@ macro_rules! hash_array_decimal { | |
| }; | ||
| } | ||
|
|
||
| /// Hash a list array by recursively hashing each element. | ||
| /// For each row, we hash all elements in the list. | ||
| /// Spark hashes arrays by recursively hashing each element, where each | ||
| /// element's hash is computed using the previous element's hash as the seed. | ||
| /// This creates a chain: hash(elem_n, hash(elem_n-1, ... hash(elem_0, seed)...)) | ||
| #[macro_export] | ||
| macro_rules! hash_list_array { | ||
| ($array_type:ident, $offset_type:ty, $column: ident, $hashes: ident, $recursive_hash_method: ident) => { | ||
| let list_array = $column | ||
| .as_any() | ||
| .downcast_ref::<$array_type>() | ||
| .unwrap_or_else(|| { | ||
| panic!( | ||
| "Failed to downcast column to {}. Actual data type: {:?}.", | ||
| stringify!($array_type), | ||
| $column.data_type() | ||
| ) | ||
| }); | ||
|
|
||
| let values = list_array.values(); | ||
| let offsets = list_array.offsets(); | ||
|
|
||
| // For each row, hash the elements in its list | ||
| for (row_idx, hash) in $hashes.iter_mut().enumerate() { | ||
| if !list_array.is_null(row_idx) { | ||
| let start = offsets[row_idx] as usize; | ||
| let end = offsets[row_idx + 1] as usize; | ||
| let len = end - start; | ||
| // Hash each element in sequence, chaining the hash values | ||
| for elem_idx in 0..len { | ||
| let elem_array = values.slice(start + elem_idx, 1); | ||
| let mut single_hash = [*hash]; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| $recursive_hash_method(&[elem_array], &mut single_hash)?; | ||
| *hash = single_hash[0]; | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| /// Creates hash values for every row, based on the values in the | ||
| /// columns. | ||
| /// | ||
|
|
@@ -214,9 +254,10 @@ macro_rules! hash_array_decimal { | |
| /// | ||
| /// `hash_method` is the hash function to use. | ||
| /// `create_dictionary_hash_method` is the function to create hashes for dictionary arrays input. | ||
| /// `recursive_hash_method` is the function to call for recursive hashing of complex types. | ||
| #[macro_export] | ||
| macro_rules! create_hashes_internal { | ||
| ($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident) => { | ||
| ($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident, $recursive_hash_method: ident) => { | ||
| use arrow::datatypes::{DataType, TimeUnit}; | ||
| use arrow::array::{types::*, *}; | ||
|
|
||
|
|
@@ -425,6 +466,69 @@ macro_rules! create_hashes_internal { | |
| ))) | ||
| } | ||
| }, | ||
| DataType::List(_) => { | ||
| $crate::hash_list_array!(ListArray, i32, col, $hashes_buffer, $recursive_hash_method); | ||
| } | ||
| DataType::LargeList(_) => { | ||
| $crate::hash_list_array!(LargeListArray, i64, col, $hashes_buffer, $recursive_hash_method); | ||
| } | ||
| DataType::FixedSizeList(_, size) => { | ||
| let list_array = col.as_any().downcast_ref::<FixedSizeListArray>().unwrap(); | ||
| let values = list_array.values(); | ||
| let list_size = *size as usize; | ||
|
|
||
| // For each row, hash the elements in its fixed-size list | ||
| for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { | ||
| if !list_array.is_null(row_idx) { | ||
| let start = row_idx * list_size; | ||
| // Hash each element in sequence, chaining the hash values | ||
| for elem_idx in 0..list_size { | ||
| let elem_array = values.slice(start + elem_idx, 1); | ||
| let mut single_hash = [*hash]; | ||
| $recursive_hash_method(&[elem_array], &mut single_hash)?; | ||
| *hash = single_hash[0]; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| DataType::Struct(_) => { | ||
| let struct_array = col.as_any().downcast_ref::<StructArray>().unwrap(); | ||
| // Hash each field of the struct - Spark hashes all fields recursively | ||
| let columns: Vec<ArrayRef> = struct_array.columns().to_vec(); | ||
| if !columns.is_empty() { | ||
| $recursive_hash_method(&columns, $hashes_buffer)?; | ||
| } | ||
| } | ||
| DataType::Map(_, _) => { | ||
| let map_array = col.as_any().downcast_ref::<MapArray>().unwrap(); | ||
| // For maps, Spark hashes by iterating through (key, value) pairs | ||
| // For each entry, hash the key then the value | ||
| let keys = map_array.keys(); | ||
| let values = map_array.values(); | ||
| let offsets = map_array.offsets(); | ||
|
|
||
| // For each row, hash the key-value pairs in sequence | ||
| for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { | ||
| if !map_array.is_null(row_idx) { | ||
|
mbutrovich marked this conversation as resolved.
Outdated
|
||
| let start = offsets[row_idx] as usize; | ||
| let end = offsets[row_idx + 1] as usize; | ||
| // Hash each key-value pair in sequence | ||
| for entry_idx in start..end { | ||
| // Hash the key | ||
| let key_array = keys.slice(entry_idx, 1); | ||
| let mut single_hash = [*hash]; | ||
| $recursive_hash_method(&[key_array], &mut single_hash)?; | ||
| *hash = single_hash[0]; | ||
|
|
||
| // Hash the value | ||
| let value_array = values.slice(entry_idx, 1); | ||
| single_hash = [*hash]; | ||
| $recursive_hash_method(&[value_array], &mut single_hash)?; | ||
| *hash = single_hash[0]; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| _ => { | ||
| // This is internal because we should have caught this before. | ||
| return Err(DataFusionError::Internal(format!( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.