From 58189a139a4563e921e4e39df27e52ab3995d112 Mon Sep 17 00:00:00 2001 From: sandudb Date: Thu, 18 Jun 2026 02:14:03 +0300 Subject: [PATCH 1/9] Builder appending refactoring --- native/shuffle/src/spark_unsafe/list.rs | 263 +++++++++++++++--------- 1 file changed, 169 insertions(+), 94 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index 14f9feb843..73ebff4cde 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -20,15 +20,18 @@ use crate::spark_unsafe::{ row::{append_field, downcast_builder_ref, SparkUnsafeRow}, unsafe_object::{impl_primitive_accessors, SparkUnsafeObject}, }; -use arrow::array::{ - builder::{ - ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, - Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, - ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, +use arrow::datatypes::{DataType, TimeUnit}; +use arrow::{ + array::{ + builder::{ + ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, + Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, + ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, + }, + MapBuilder, }, - MapBuilder, + buffer::Buffer, }; -use arrow::datatypes::{DataType, TimeUnit}; use datafusion_comet_jni_bridge::errors::CometError; /// Generates bulk append methods for primitive types in SparkUnsafeArray. @@ -45,31 +48,68 @@ macro_rules! impl_append_to_builder { return; } + let ptr = self.element_offset as *const $element_type; + let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::<$element_type>()); + if NULLABLE { - let mut ptr = self.element_offset as *const $element_type; let null_words = self.null_bitset_ptr(); - debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); - debug_assert!(!ptr.is_null(), "element_offset pointer is null"); - for idx in 0..num_elements { - // SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements - let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) }; - - if is_null { - builder.append_null(); + let num_validity_bytes = num_elements.div_ceil(8); + + if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + + // Reserve space for values + let current_len = builder.len(); + builder.append_nulls(num_elements); + + let (values_slice, validity) = builder.slices_mut(); + values_slice[current_len..current_len + num_elements].copy_from_slice(values); + + // SAFETY: after append_nulls, validity is guaranteed Some. + let validity = validity.expect("validity must exist after append_nulls"); + let current_byte = current_len / 8; + let bit_offset = current_len % 8; + + let spark_bytes = unsafe { + std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) + }; + + if bit_offset == 0 { + // Fast path: byte-aligned — invert whole bytes directly + for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] + .iter_mut() + .zip(spark_bytes) + { + *dst = !src; + } } else { - // SAFETY: ptr is within element data bounds - builder.append_value(unsafe { ptr.read_unaligned() }); + for i in 0..num_elements { + let spark_word_idx = i >> 6; + let spark_bit_idx = i & 0x3f; + let is_null = unsafe { + (null_words.add(spark_word_idx).read_unaligned() + & (1i64 << spark_bit_idx)) + != 0 + }; + if !is_null { + let abs_idx = current_len + i; + validity[abs_idx / 8] |= 1 << (abs_idx % 8); + } + } + } + } else { + let mut ptr = ptr; + for idx in 0..num_elements { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { + builder.append_null(); + } else { + builder.append_value(unsafe { ptr.read_unaligned() }); + } + ptr = unsafe { ptr.add(1) }; } - // SAFETY: ptr stays within bounds, iterating num_elements times - ptr = unsafe { ptr.add(1) }; } } else { - // SAFETY: element_offset points to contiguous data of length num_elements - debug_assert!(self.element_offset != 0, "element_offset is null"); - let ptr = self.element_offset as *const $element_type; - // Use bulk copy when data is properly aligned, fall back to - // per-element unaligned reads otherwise - if (ptr as usize).is_multiple_of(std::mem::align_of::<$element_type>()) { + if aligned { let slice = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; builder.append_slice(slice); } else { @@ -194,37 +234,31 @@ impl SparkUnsafeArray { if num_elements == 0 { return; } - - let mut ptr = self.element_offset as *const u8; + // bools have alignment == 1 + // we dont have to worry about the fallback debug_assert!( - !ptr.is_null(), + self.element_offset != 0, "append_booleans: element_offset pointer is null" ); if NULLABLE { let null_words = self.null_bitset_ptr(); - debug_assert!( - !null_words.is_null(), - "append_booleans: null_bitset_ptr is null" - ); - for idx in 0..num_elements { - // SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements - let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) }; - - if is_null { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const bool, num_elements) + }; + for (idx, &value) in slice.iter().enumerate() { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { builder.append_null(); } else { - // SAFETY: ptr is within element data bounds - builder.append_value(unsafe { *ptr != 0 }); + builder.append_value(value); } - // SAFETY: ptr stays within bounds, iterating num_elements times - ptr = unsafe { ptr.add(1) }; } } else { - for _ in 0..num_elements { - // SAFETY: ptr is within element data bounds - builder.append_value(unsafe { *ptr != 0 }); - ptr = unsafe { ptr.add(1) }; + let values = unsafe { + std::slice::from_raw_parts(self.element_offset as *const u8, num_elements) + }; + for &value in values { + builder.append_value(value != 0); } } } @@ -239,29 +273,51 @@ impl SparkUnsafeArray { return; } + let ptr = self.element_offset as *const i64; + let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); + if NULLABLE { - let mut ptr = self.element_offset as *const i64; let null_words = self.null_bitset_ptr(); - debug_assert!( - !null_words.is_null(), - "append_timestamps: null_bitset_ptr is null" - ); - debug_assert!( - !ptr.is_null(), - "append_timestamps: element_offset pointer is null" - ); - for idx in 0..num_elements { - // SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements - let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) }; - - if is_null { - builder.append_null(); + debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); + if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + let current_len = builder.len(); + builder.append_nulls(num_elements); + let (values_slice, validity) = builder.slices_mut(); + values_slice[current_len..current_len + num_elements].copy_from_slice(values); + let validity = validity.expect("validity must exist after append_nulls"); + let num_validity_bytes = num_elements.div_ceil(8); + let spark_bytes = unsafe { + std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) + }; + let current_byte = current_len / 8; + let bit_offset = current_len % 8; + if bit_offset == 0 { + for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] + .iter_mut() + .zip(spark_bytes) + { + *dst = !src; + } } else { - // SAFETY: ptr is within element data bounds - builder.append_value(unsafe { ptr.read_unaligned() }); + for i in 0..num_elements { + let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; + if !is_null { + let abs_idx = current_len + i; + validity[abs_idx / 8] |= 1 << (abs_idx % 8); + } + } + } + } else { + let mut ptr = ptr; + for idx in 0..num_elements { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { + builder.append_null(); + } else { + builder.append_value(unsafe { ptr.read_unaligned() }); + } + ptr = unsafe { ptr.add(1) } } - // SAFETY: ptr stays within bounds, iterating num_elements times - ptr = unsafe { ptr.add(1) }; } } else { // SAFETY: element_offset points to contiguous i64 data of length num_elements @@ -269,10 +325,9 @@ impl SparkUnsafeArray { self.element_offset != 0, "append_timestamps: element_offset is null" ); - let ptr = self.element_offset as *const i64; - if (ptr as usize).is_multiple_of(std::mem::align_of::()) { - let slice = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - builder.append_slice(slice); + if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + builder.append_slice(values); } else { let mut ptr = ptr; for _ in 0..num_elements { @@ -292,41 +347,61 @@ impl SparkUnsafeArray { if num_elements == 0 { return; } + let ptr = self.element_offset as *const i32; + let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); if NULLABLE { - let mut ptr = self.element_offset as *const i32; let null_words = self.null_bitset_ptr(); - debug_assert!( - !null_words.is_null(), - "append_dates: null_bitset_ptr is null" - ); - debug_assert!( - !ptr.is_null(), - "append_dates: element_offset pointer is null" - ); - for idx in 0..num_elements { - // SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements - let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) }; - - if is_null { - builder.append_null(); + debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); + if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + let current_len = builder.len(); + builder.append_nulls(num_elements); + let (values_slice, validity) = builder.slices_mut(); + values_slice[current_len..current_len + num_elements].copy_from_slice(values); + let validity = validity.expect("validity must exist after append_nulls"); + let num_validity_bytes = num_elements.div_ceil(8); + let spark_bytes = unsafe { + std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) + }; + let current_byte = current_len / 8; + let bit_offset = current_len % 8; + if bit_offset == 0 { + for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] + .iter_mut() + .zip(spark_bytes) + { + *dst = !src; + } } else { - // SAFETY: ptr is within element data bounds - builder.append_value(unsafe { ptr.read_unaligned() }); + for i in 0..num_elements { + let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; + if !is_null { + let abs_idx = current_len + i; + validity[abs_idx / 8] |= 1 << (abs_idx % 8); + } + } + } + } else { + let mut ptr = ptr; + for idx in 0..num_elements { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { + builder.append_null(); + } else { + builder.append_value(unsafe { ptr.read_unaligned() }); + } + ptr = unsafe { ptr.add(1) }; } - // SAFETY: ptr stays within bounds, iterating num_elements times - ptr = unsafe { ptr.add(1) }; } } else { - // SAFETY: element_offset points to contiguous i32 data of length num_elements + // SAFETY: element_offset points to contiguous i64 data of length num_elements debug_assert!( self.element_offset != 0, - "append_dates: element_offset is null" + "append_timestamps: element_offset is null" ); - let ptr = self.element_offset as *const i32; - if (ptr as usize).is_multiple_of(std::mem::align_of::()) { - let slice = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - builder.append_slice(slice); + if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + builder.append_slice(values); } else { let mut ptr = ptr; for _ in 0..num_elements { From 7778a842a0445e3ae7d9342f46ff9f2ccfc3b05f Mon Sep 17 00:00:00 2001 From: sandudb Date: Thu, 18 Jun 2026 03:15:12 +0300 Subject: [PATCH 2/9] Formatting, small refactoring and adding docstrings --- native/shuffle/src/spark_unsafe/list.rs | 102 +++++++++++++++--------- 1 file changed, 64 insertions(+), 38 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index 73ebff4cde..44fde14243 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -20,18 +20,15 @@ use crate::spark_unsafe::{ row::{append_field, downcast_builder_ref, SparkUnsafeRow}, unsafe_object::{impl_primitive_accessors, SparkUnsafeObject}, }; -use arrow::datatypes::{DataType, TimeUnit}; -use arrow::{ - array::{ - builder::{ - ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, - Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, - ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, - }, - MapBuilder, +use arrow::array::{ + builder::{ + ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, + Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, + ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, }, - buffer::Buffer, + MapBuilder, }; +use arrow::datatypes::{DataType, TimeUnit}; use datafusion_comet_jni_bridge::errors::CometError; /// Generates bulk append methods for primitive types in SparkUnsafeArray. @@ -53,29 +50,34 @@ macro_rules! impl_append_to_builder { if NULLABLE { let null_words = self.null_bitset_ptr(); + // Note that Spark uses 64-bit word operations. Hence num.div_ceil(64) * 8 + // Arrow allocates the exact number of bytes needed. Hence num.div_ceil(8). We just skip reading unnecessary data let num_validity_bytes = num_elements.div_ceil(8); if aligned { + // Reading whole slice from base ptr (because we confirmed alignment) let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; // Reserve space for values let current_len = builder.len(); builder.append_nulls(num_elements); - + // Getting mutable slices to modify them in-place let (values_slice, validity) = builder.slices_mut(); - values_slice[current_len..current_len + num_elements].copy_from_slice(values); + // Copying whole `values` into `values_slice` buffer + values_slice[current_len..(current_len + num_elements)].copy_from_slice(values); // SAFETY: after append_nulls, validity is guaranteed Some. let validity = validity.expect("validity must exist after append_nulls"); - let current_byte = current_len / 8; - let bit_offset = current_len % 8; - let spark_bytes = unsafe { std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) }; - + let current_byte = current_len / 8; + let bit_offset = current_len % 8; + // Check if we land exactly on the byte boundary. Otherwise - skip if bit_offset == 0 { - // Fast path: byte-aligned — invert whole bytes directly + // Flipping bit values, due to Spark <-> Arrow difference. From their respective docs: + // `Arrow``: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. + // `Spark``: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] .iter_mut() .zip(spark_bytes) @@ -84,13 +86,8 @@ macro_rules! impl_append_to_builder { } } else { for i in 0..num_elements { - let spark_word_idx = i >> 6; - let spark_bit_idx = i & 0x3f; - let is_null = unsafe { - (null_words.add(spark_word_idx).read_unaligned() - & (1i64 << spark_bit_idx)) - != 0 - }; + let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; + // We have appended nulls at the beginning. Hence, we can skip this branch if it is null if !is_null { let abs_idx = current_len + i; validity[abs_idx / 8] |= 1 << (abs_idx % 8); @@ -234,8 +231,8 @@ impl SparkUnsafeArray { if num_elements == 0 { return; } - // bools have alignment == 1 - // we dont have to worry about the fallback + // Bools have alignment == 1 + // We dont have to worry about the fallback. Hence, we do not care about it debug_assert!( self.element_offset != 0, "append_booleans: element_offset pointer is null" @@ -273,26 +270,45 @@ impl SparkUnsafeArray { return; } + // SAFETY: element_offset points to contiguous i64 data of length num_elements + debug_assert!( + self.element_offset != 0, + "append_timestamps: element_offset is null" + ); + let ptr = self.element_offset as *const i64; + // Note: alignment is not guaranteed - that is why do this let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); if NULLABLE { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); if aligned { + // Reading whole slice from base ptr (because we confirmed alignment) let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; let current_len = builder.len(); + builder.append_nulls(num_elements); + // Getting mutable slices to modify them in-place let (values_slice, validity) = builder.slices_mut(); - values_slice[current_len..current_len + num_elements].copy_from_slice(values); + // Copying whole `values` into `values_slice` buffer + values_slice[current_len..(current_len + num_elements)].copy_from_slice(values); + let validity = validity.expect("validity must exist after append_nulls"); + // Note that Spark uses 64-bit word operations. Hence num.div_ceil(64) * 8 + // Arrow allocates the exact number of bytes needed. Hence num.div_ceil(8). We just skip reading unnecessary data let num_validity_bytes = num_elements.div_ceil(8); + // Validity bitmap let spark_bytes = unsafe { std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) }; let current_byte = current_len / 8; let bit_offset = current_len % 8; + // Check if we land exactly on the byte boundary. Otherwise - skip if bit_offset == 0 { + // Flipping bit values, due to Spark <-> Arrow difference. From their respective docs: + // `Arrow``: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. + // `Spark``: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] .iter_mut() .zip(spark_bytes) @@ -302,6 +318,7 @@ impl SparkUnsafeArray { } else { for i in 0..num_elements { let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; + // We have appended nulls at the beginning. Hence, we can skip this branch if it is null if !is_null { let abs_idx = current_len + i; validity[abs_idx / 8] |= 1 << (abs_idx % 8); @@ -320,11 +337,6 @@ impl SparkUnsafeArray { } } } else { - // SAFETY: element_offset points to contiguous i64 data of length num_elements - debug_assert!( - self.element_offset != 0, - "append_timestamps: element_offset is null" - ); if aligned { let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; builder.append_slice(values); @@ -347,26 +359,44 @@ impl SparkUnsafeArray { if num_elements == 0 { return; } + + // SAFETY: element_offset points to contiguous i64 data of length num_elements + debug_assert!( + self.element_offset != 0, + "append_timestamps: element_offset is null" + ); + let ptr = self.element_offset as *const i32; + // Note: alignment is not guaranteed - that is why do this let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); if NULLABLE { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); if aligned { + // Reading whole slice from base ptr (because we confirmed alignment) let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; let current_len = builder.len(); + builder.append_nulls(num_elements); let (values_slice, validity) = builder.slices_mut(); - values_slice[current_len..current_len + num_elements].copy_from_slice(values); + values_slice[current_len..(current_len + num_elements)].copy_from_slice(values); + let validity = validity.expect("validity must exist after append_nulls"); + // Note that Spark uses 64-bit word operations. Hence num.div_ceil(64) * 8 + // Arrow allocates the exact number of bytes needed. Hence num.div_ceil(8) let num_validity_bytes = num_elements.div_ceil(8); + // Validity bitmap let spark_bytes = unsafe { std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) }; let current_byte = current_len / 8; let bit_offset = current_len % 8; + // Check if we land exactly on the byte boundary. Otherwise - skip if bit_offset == 0 { + // Flipping bit values, due to Spark <-> Arrow difference + // Arrow: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. + // Spark: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] .iter_mut() .zip(spark_bytes) @@ -376,6 +406,7 @@ impl SparkUnsafeArray { } else { for i in 0..num_elements { let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; + // We have appended nulls at the beginning. Hence, we can skip this branch if it is null if !is_null { let abs_idx = current_len + i; validity[abs_idx / 8] |= 1 << (abs_idx % 8); @@ -394,11 +425,6 @@ impl SparkUnsafeArray { } } } else { - // SAFETY: element_offset points to contiguous i64 data of length num_elements - debug_assert!( - self.element_offset != 0, - "append_timestamps: element_offset is null" - ); if aligned { let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; builder.append_slice(values); From 2bee01021fc12116993734a359131d8a5511b3af Mon Sep 17 00:00:00 2001 From: sandudb Date: Thu, 18 Jun 2026 03:33:58 +0300 Subject: [PATCH 3/9] Added docstring regarding alignment --- native/shuffle/src/spark_unsafe/list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index 44fde14243..e6ef9a6e67 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -44,7 +44,7 @@ macro_rules! impl_append_to_builder { if num_elements == 0 { return; } - + // Note: alignment is not guaranteed - that is why do this let ptr = self.element_offset as *const $element_type; let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::<$element_type>()); From 6abf1f7b826c494c36fe9d51c567dd0879366dff Mon Sep 17 00:00:00 2001 From: sandudb Date: Thu, 18 Jun 2026 13:51:44 +0300 Subject: [PATCH 4/9] Fixed the fallback in an unaligned but NULLABLE case, that was failing the CI --- native/shuffle/src/spark_unsafe/list.rs | 42 ++++++++++++++----------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index e6ef9a6e67..f073cb7e8c 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -78,20 +78,22 @@ macro_rules! impl_append_to_builder { // Flipping bit values, due to Spark <-> Arrow difference. From their respective docs: // `Arrow``: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. // `Spark``: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. - for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] + for (dst, &src) in validity + [current_byte..(current_byte + num_validity_bytes)] .iter_mut() .zip(spark_bytes) { *dst = !src; } } else { - for i in 0..num_elements { - let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; - // We have appended nulls at the beginning. Hence, we can skip this branch if it is null - if !is_null { - let abs_idx = current_len + i; - validity[abs_idx / 8] |= 1 << (abs_idx % 8); + let mut ptr = ptr; + for idx in 0..num_elements { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { + builder.append_null(); + } else { + builder.append_value(unsafe { ptr.read_unaligned() }); } + ptr = unsafe { ptr.add(1) }; } } } else { @@ -316,13 +318,14 @@ impl SparkUnsafeArray { *dst = !src; } } else { - for i in 0..num_elements { - let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; - // We have appended nulls at the beginning. Hence, we can skip this branch if it is null - if !is_null { - let abs_idx = current_len + i; - validity[abs_idx / 8] |= 1 << (abs_idx % 8); + let mut ptr = ptr; + for idx in 0..num_elements { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { + builder.append_null(); + } else { + builder.append_value(unsafe { ptr.read_unaligned() }); } + ptr = unsafe { ptr.add(1) }; } } } else { @@ -404,13 +407,14 @@ impl SparkUnsafeArray { *dst = !src; } } else { - for i in 0..num_elements { - let is_null = unsafe { Self::is_null_in_bitset(null_words, i) }; - // We have appended nulls at the beginning. Hence, we can skip this branch if it is null - if !is_null { - let abs_idx = current_len + i; - validity[abs_idx / 8] |= 1 << (abs_idx % 8); + let mut ptr = ptr; + for idx in 0..num_elements { + if unsafe { Self::is_null_in_bitset(null_words, idx) } { + builder.append_null(); + } else { + builder.append_value(unsafe { ptr.read_unaligned() }); } + ptr = unsafe { ptr.add(1) }; } } } else { From 74265c7ab0cd95d83f6c1af934183cb7100ddb92 Mon Sep 17 00:00:00 2001 From: sandudb Date: Thu, 18 Jun 2026 16:26:45 +0300 Subject: [PATCH 5/9] Changes fixed --- native/shuffle/src/spark_unsafe/list.rs | 149 ++++-------------------- 1 file changed, 20 insertions(+), 129 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index f073cb7e8c..e8cb50cd2e 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -50,53 +50,15 @@ macro_rules! impl_append_to_builder { if NULLABLE { let null_words = self.null_bitset_ptr(); - // Note that Spark uses 64-bit word operations. Hence num.div_ceil(64) * 8 - // Arrow allocates the exact number of bytes needed. Hence num.div_ceil(8). We just skip reading unnecessary data - let num_validity_bytes = num_elements.div_ceil(8); - if aligned { - // Reading whole slice from base ptr (because we confirmed alignment) + if aligned { let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - - // Reserve space for values - let current_len = builder.len(); - builder.append_nulls(num_elements); - // Getting mutable slices to modify them in-place - let (values_slice, validity) = builder.slices_mut(); - // Copying whole `values` into `values_slice` buffer - values_slice[current_len..(current_len + num_elements)].copy_from_slice(values); - - // SAFETY: after append_nulls, validity is guaranteed Some. - let validity = validity.expect("validity must exist after append_nulls"); - let spark_bytes = unsafe { - std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) - }; - let current_byte = current_len / 8; - let bit_offset = current_len % 8; - // Check if we land exactly on the byte boundary. Otherwise - skip - if bit_offset == 0 { - // Flipping bit values, due to Spark <-> Arrow difference. From their respective docs: - // `Arrow``: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. - // `Spark``: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. - for (dst, &src) in validity - [current_byte..(current_byte + num_validity_bytes)] - .iter_mut() - .zip(spark_bytes) - { - *dst = !src; - } - } else { - let mut ptr = ptr; - for idx in 0..num_elements { - if unsafe { Self::is_null_in_bitset(null_words, idx) } { - builder.append_null(); - } else { - builder.append_value(unsafe { ptr.read_unaligned() }); - } - ptr = unsafe { ptr.add(1) }; - } - } - } else { + let is_valid: Vec = (0..num_elements) + .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) + .collect(); + builder.append_values(values, &is_valid); + + } else { let mut ptr = ptr; for idx in 0..num_elements { if unsafe { Self::is_null_in_bitset(null_words, idx) } { @@ -285,50 +247,13 @@ impl SparkUnsafeArray { if NULLABLE { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); - if aligned { - // Reading whole slice from base ptr (because we confirmed alignment) - let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let current_len = builder.len(); - - builder.append_nulls(num_elements); - // Getting mutable slices to modify them in-place - let (values_slice, validity) = builder.slices_mut(); - // Copying whole `values` into `values_slice` buffer - values_slice[current_len..(current_len + num_elements)].copy_from_slice(values); - - let validity = validity.expect("validity must exist after append_nulls"); - // Note that Spark uses 64-bit word operations. Hence num.div_ceil(64) * 8 - // Arrow allocates the exact number of bytes needed. Hence num.div_ceil(8). We just skip reading unnecessary data - let num_validity_bytes = num_elements.div_ceil(8); - // Validity bitmap - let spark_bytes = unsafe { - std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) - }; - let current_byte = current_len / 8; - let bit_offset = current_len % 8; - // Check if we land exactly on the byte boundary. Otherwise - skip - if bit_offset == 0 { - // Flipping bit values, due to Spark <-> Arrow difference. From their respective docs: - // `Arrow``: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. - // `Spark``: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. - for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] - .iter_mut() - .zip(spark_bytes) - { - *dst = !src; - } - } else { - let mut ptr = ptr; - for idx in 0..num_elements { - if unsafe { Self::is_null_in_bitset(null_words, idx) } { - builder.append_null(); - } else { - builder.append_value(unsafe { ptr.read_unaligned() }); - } - ptr = unsafe { ptr.add(1) }; - } - } - } else { +if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + let is_valid: Vec = (0..num_elements) + .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) + .collect(); + builder.append_values(values, &is_valid); +} else { let mut ptr = ptr; for idx in 0..num_elements { if unsafe { Self::is_null_in_bitset(null_words, idx) } { @@ -377,46 +302,12 @@ impl SparkUnsafeArray { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); if aligned { - // Reading whole slice from base ptr (because we confirmed alignment) - let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let current_len = builder.len(); - - builder.append_nulls(num_elements); - let (values_slice, validity) = builder.slices_mut(); - values_slice[current_len..(current_len + num_elements)].copy_from_slice(values); - - let validity = validity.expect("validity must exist after append_nulls"); - // Note that Spark uses 64-bit word operations. Hence num.div_ceil(64) * 8 - // Arrow allocates the exact number of bytes needed. Hence num.div_ceil(8) - let num_validity_bytes = num_elements.div_ceil(8); - // Validity bitmap - let spark_bytes = unsafe { - std::slice::from_raw_parts(null_words as *const u8, num_validity_bytes) - }; - let current_byte = current_len / 8; - let bit_offset = current_len % 8; - // Check if we land exactly on the byte boundary. Otherwise - skip - if bit_offset == 0 { - // Flipping bit values, due to Spark <-> Arrow difference - // Arrow: A 1 (set bit) for index j indicates that the value is not null, while a 0 (bit not set) indicates that it is null. - // Spark: `void setNullAt(int i)` sets the 1 (set bit) for index i indicating that the value IS null. - for (dst, &src) in validity[current_byte..current_byte + num_validity_bytes] - .iter_mut() - .zip(spark_bytes) - { - *dst = !src; - } - } else { - let mut ptr = ptr; - for idx in 0..num_elements { - if unsafe { Self::is_null_in_bitset(null_words, idx) } { - builder.append_null(); - } else { - builder.append_value(unsafe { ptr.read_unaligned() }); - } - ptr = unsafe { ptr.add(1) }; - } - } + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + let is_valid: Vec = (0..num_elements) + .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) + .collect(); + builder.append_values(values, &is_valid); + } else { let mut ptr = ptr; for idx in 0..num_elements { From 662a7f2506bd85f86d8237b40f6b4aad983712a7 Mon Sep 17 00:00:00 2001 From: sandudb Date: Thu, 18 Jun 2026 16:56:00 +0300 Subject: [PATCH 6/9] Fixed cargo formatting --- native/shuffle/src/spark_unsafe/list.rs | 30 ++++++++++++------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index e8cb50cd2e..72fbf74cee 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -51,14 +51,13 @@ macro_rules! impl_append_to_builder { if NULLABLE { let null_words = self.null_bitset_ptr(); - if aligned { + if aligned { let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; let is_valid: Vec = (0..num_elements) .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) .collect(); builder.append_values(values, &is_valid); - - } else { + } else { let mut ptr = ptr; for idx in 0..num_elements { if unsafe { Self::is_null_in_bitset(null_words, idx) } { @@ -247,13 +246,13 @@ impl SparkUnsafeArray { if NULLABLE { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); -if aligned { - let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let is_valid: Vec = (0..num_elements) - .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) - .collect(); - builder.append_values(values, &is_valid); -} else { + if aligned { + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + let is_valid: Vec = (0..num_elements) + .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) + .collect(); + builder.append_values(values, &is_valid); + } else { let mut ptr = ptr; for idx in 0..num_elements { if unsafe { Self::is_null_in_bitset(null_words, idx) } { @@ -302,12 +301,11 @@ if aligned { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); if aligned { - let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let is_valid: Vec = (0..num_elements) - .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) - .collect(); - builder.append_values(values, &is_valid); - + let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; + let is_valid: Vec = (0..num_elements) + .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) + .collect(); + builder.append_values(values, &is_valid); } else { let mut ptr = ptr; for idx in 0..num_elements { From 157c73fb6ac4a9291b2be18dd5c86ec2cd47b10f Mon Sep 17 00:00:00 2001 From: sandudb Date: Fri, 19 Jun 2026 13:27:24 +0300 Subject: [PATCH 7/9] Remade the building process using arrow buffers with no hot-spots in Vec<> loops --- native/shuffle/src/spark_unsafe/list.rs | 122 ++++++++++++++++++------ native/shuffle/src/spark_unsafe/row.rs | 63 ++++++++---- 2 files changed, 136 insertions(+), 49 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index 72fbf74cee..d708140005 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -15,20 +15,29 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::spark_unsafe::{ map::append_map_elements, row::{append_field, downcast_builder_ref, SparkUnsafeRow}, unsafe_object::{impl_primitive_accessors, SparkUnsafeObject}, }; -use arrow::array::{ - builder::{ - ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, - Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, - ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, +use arrow::datatypes::{DataType, TimeUnit}; +use arrow::{ + array::{ + builder::{ + ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, + Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, + ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, + }, + MapBuilder, PrimitiveArray, + }, + buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer}, + datatypes::{ + Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + TimestampMicrosecondType, }, - MapBuilder, }; -use arrow::datatypes::{DataType, TimeUnit}; use datafusion_comet_jni_bridge::errors::CometError; /// Generates bulk append methods for primitive types in SparkUnsafeArray. @@ -38,7 +47,7 @@ use datafusion_comet_jni_bridge::errors::CometError; /// - `null_bitset_ptr()` returns a pointer to `ceil(num_elements/64)` i64 words /// - These invariants are guaranteed by the SparkUnsafeArray layout from the JVM macro_rules! impl_append_to_builder { - ($method_name:ident, $builder_type:ty, $element_type:ty) => { + ($method_name:ident, $builder_type:ty, $element_type:ty, $arrow_type:ty) => { pub(crate) fn $method_name(&self, builder: &mut $builder_type) { let num_elements = self.num_elements; if num_elements == 0 { @@ -52,11 +61,28 @@ macro_rules! impl_append_to_builder { let null_words = self.null_bitset_ptr(); if aligned { + // Raw values let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let is_valid: Vec = (0..num_elements) - .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) - .collect(); - builder.append_values(values, &is_valid); + + // Note: in Spark bitmap is padded to 8 byte word-boundaries + // In Arrow we just use the needed number of whole bytes without padding + let null_mask_len = num_elements.div_ceil(8); + let null_mask = unsafe { + std::slice::from_raw_parts::(null_words as *const u8, null_mask_len) + }; + // We need to perform this flip due to the null bitmap Spark vs Arrow incompatibility + // In `Spark` we have 1 set in bitmap meaning that element IS NULL + // In `Arrow` we have 1 set in bitmap meaning that element IS VALID (non-null) + let flipped: Vec = null_mask.iter().map(|n| !n).collect(); + // Constructing null-buffer + let validity = + NullBuffer::new(BooleanBuffer::new(Buffer::from(flipped), 0, num_elements)); + + let arr = PrimitiveArray::<$arrow_type>::new( + ScalarBuffer::from(Buffer::from_slice_ref(values)), + Some(validity), + ); + builder.append_array(&arr); } else { let mut ptr = ptr; for idx in 0..num_elements { @@ -177,12 +203,12 @@ impl SparkUnsafeArray { (null_words.add(word_idx).read_unaligned() & (1i64 << bit_idx)) != 0 } - impl_append_to_builder!(append_ints_to_builder, Int32Builder, i32); - impl_append_to_builder!(append_longs_to_builder, Int64Builder, i64); - impl_append_to_builder!(append_shorts_to_builder, Int16Builder, i16); - impl_append_to_builder!(append_bytes_to_builder, Int8Builder, i8); - impl_append_to_builder!(append_floats_to_builder, Float32Builder, f32); - impl_append_to_builder!(append_doubles_to_builder, Float64Builder, f64); + impl_append_to_builder!(append_ints_to_builder, Int32Builder, i32, Int32Type); + impl_append_to_builder!(append_longs_to_builder, Int64Builder, i64, Int64Type); + impl_append_to_builder!(append_shorts_to_builder, Int16Builder, i16, Int16Type); + impl_append_to_builder!(append_bytes_to_builder, Int8Builder, i8, Int8Type); + impl_append_to_builder!(append_floats_to_builder, Float32Builder, f32, Float32Type); + impl_append_to_builder!(append_doubles_to_builder, Float64Builder, f64, Float64Type); /// Bulk append boolean values to builder. /// Booleans are stored as 1 byte each in SparkUnsafeArray, requiring special handling. @@ -227,6 +253,7 @@ impl SparkUnsafeArray { pub(crate) fn append_timestamps_to_builder( &self, builder: &mut TimestampMicrosecondBuilder, + timezone: Option>, ) { let num_elements = self.num_elements; if num_elements == 0 { @@ -247,11 +274,31 @@ impl SparkUnsafeArray { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); if aligned { + // Raw values let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let is_valid: Vec = (0..num_elements) - .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) - .collect(); - builder.append_values(values, &is_valid); + + // Note: in Spark bitmap is padded to 8 byte word-boundaries + // In Arrow we just use the needed number of whole bytes without padding + let null_mask_len = num_elements.div_ceil(8); + let null_mask = unsafe { + std::slice::from_raw_parts::(null_words as *const u8, null_mask_len) + }; + + // We need to perform this flip due to the null bitmap Spark vs Arrow incompatibility + // In `Spark` we have 1 set in bitmap meaning that element IS NULL + // In `Arrow` we have 1 set in bitmap meaning that element IS VALID (non-null) + let flipped: Vec = null_mask.iter().map(|n| !n).collect(); + // Constructing null-buffer + let validity = + NullBuffer::new(BooleanBuffer::new(Buffer::from(flipped), 0, num_elements)); + + // Constructing Arrow array with timezone set + let arr = PrimitiveArray::::new( + ScalarBuffer::from(Buffer::from_slice_ref(values)), + Some(validity), + ) + .with_timezone_opt(timezone); + builder.append_array(&arr); } else { let mut ptr = ptr; for idx in 0..num_elements { @@ -301,11 +348,30 @@ impl SparkUnsafeArray { let null_words = self.null_bitset_ptr(); debug_assert!(!null_words.is_null(), "null_bitset_ptr is null"); if aligned { + // Raw values let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) }; - let is_valid: Vec = (0..num_elements) - .map(|i| unsafe { !Self::is_null_in_bitset(null_words, i) }) - .collect(); - builder.append_values(values, &is_valid); + + // Note: in Spark bitmap is padded to 8 byte word-boundaries + // In Arrow we just use the needed number of whole bytes without padding + let null_mask_len = num_elements.div_ceil(8); + let null_mask = unsafe { + std::slice::from_raw_parts::(null_words as *const u8, null_mask_len) + }; + + // We need to perform this flip due to the null bitmap `Spark` vs `Arrow` incompatibility + // In `Spark` we have 1 set in bitmap meaning that element IS NULL + // In `Arrow` we have 1 set in bitmap meaning that element IS VALID (non-null) + let flipped: Vec = null_mask.iter().map(|n| !n).collect(); + // Constructing null-buffer + let validity = + NullBuffer::new(BooleanBuffer::new(Buffer::from(flipped), 0, num_elements)); + + // Constructing Arrow array with timezone set + let arr = PrimitiveArray::::new( + ScalarBuffer::from(Buffer::from_slice_ref(values)), + Some(validity), + ); + builder.append_array(&arr); } else { let mut ptr = ptr; for idx in 0..num_elements { @@ -379,9 +445,9 @@ pub fn append_to_builder( let builder = downcast_builder_ref!(Float64Builder, builder); array.append_doubles_to_builder::(builder); } - DataType::Timestamp(TimeUnit::Microsecond, _) => { + DataType::Timestamp(TimeUnit::Microsecond, tz) => { let builder = downcast_builder_ref!(TimestampMicrosecondBuilder, builder); - array.append_timestamps_to_builder::(builder); + array.append_timestamps_to_builder::(builder, tz.clone()); } DataType::Date32 => { let builder = downcast_builder_ref!(Date32Builder, builder); diff --git a/native/shuffle/src/spark_unsafe/row.rs b/native/shuffle/src/spark_unsafe/row.rs index 6ffe9d0b6e..84947773e4 100644 --- a/native/shuffle/src/spark_unsafe/row.rs +++ b/native/shuffle/src/spark_unsafe/row.rs @@ -607,26 +607,43 @@ fn append_list_column_batch( // Helper macro for primitive element types - gets builder fresh each iteration // to avoid borrow conflicts with list_builder.append() macro_rules! process_primitive_lists { - ($builder_type:ty, $append_fn:ident) => {{ - for i in row_start..row_end { - read_row_at!(row, row_addresses_ptr, row_sizes_ptr, i); - - if row.is_null_at(column_idx) { - list_builder.append_null(); - } else { - let array = row.get_array(column_idx); - // Get values builder fresh each iteration to avoid borrow conflict - let values_builder = list_builder - .values() - .as_any_mut() - .downcast_mut::<$builder_type>() - .expect(stringify!($builder_type)); - array.$append_fn::(values_builder); - list_builder.append(true); - } + // No extra args + ($builder_type:ty, $append_fn:ident) => {{ + for i in row_start..row_end { + read_row_at!(row, row_addresses_ptr, row_sizes_ptr, i); + if row.is_null_at(column_idx) { + list_builder.append_null(); + } else { + let array = row.get_array(column_idx); + let values_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::<$builder_type>() + .expect(stringify!($builder_type)); + array.$append_fn::(values_builder); + list_builder.append(true); } - }}; - } + } + }}; + // Extra args — for timestamps (timezone) and others + ($builder_type:ty, $append_fn:ident, $($extra:expr),*) => {{ + for i in row_start..row_end { + read_row_at!(row, row_addresses_ptr, row_sizes_ptr, i); + if row.is_null_at(column_idx) { + list_builder.append_null(); + } else { + let array = row.get_array(column_idx); + let values_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::<$builder_type>() + .expect(stringify!($builder_type)); + array.$append_fn::(values_builder, $($extra),*); + list_builder.append(true); + } + } + }}; +} match element_type { DataType::Boolean => { @@ -653,8 +670,12 @@ fn append_list_column_batch( DataType::Date32 => { process_primitive_lists!(Date32Builder, append_dates_to_builder); } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - process_primitive_lists!(TimestampMicrosecondBuilder, append_timestamps_to_builder); + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + process_primitive_lists!( + TimestampMicrosecondBuilder, + append_timestamps_to_builder, + tz.clone() + ); } // For complex element types, fall back to per-row dispatch _ => { From 0e5fcadc3d6b44eee676a659e79aa3850e18b43f Mon Sep 17 00:00:00 2001 From: sandudb Date: Fri, 19 Jun 2026 13:28:07 +0300 Subject: [PATCH 8/9] Formatting fix --- native/shuffle/src/spark_unsafe/list.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index d708140005..a7dce07fe1 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -74,7 +74,7 @@ macro_rules! impl_append_to_builder { // In `Spark` we have 1 set in bitmap meaning that element IS NULL // In `Arrow` we have 1 set in bitmap meaning that element IS VALID (non-null) let flipped: Vec = null_mask.iter().map(|n| !n).collect(); - // Constructing null-buffer + // Constructing null-buffer let validity = NullBuffer::new(BooleanBuffer::new(Buffer::from(flipped), 0, num_elements)); @@ -283,7 +283,7 @@ impl SparkUnsafeArray { let null_mask = unsafe { std::slice::from_raw_parts::(null_words as *const u8, null_mask_len) }; - + // We need to perform this flip due to the null bitmap Spark vs Arrow incompatibility // In `Spark` we have 1 set in bitmap meaning that element IS NULL // In `Arrow` we have 1 set in bitmap meaning that element IS VALID (non-null) From 42bd9dd08656baebf8f3e4cb91a54daf6148b04e Mon Sep 17 00:00:00 2001 From: sandudb Date: Fri, 19 Jun 2026 13:49:58 +0300 Subject: [PATCH 9/9] Added docstring about runtime check of alignment --- native/shuffle/src/spark_unsafe/list.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/native/shuffle/src/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs index a7dce07fe1..93b7e46290 100644 --- a/native/shuffle/src/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -54,6 +54,7 @@ macro_rules! impl_append_to_builder { return; } // Note: alignment is not guaranteed - that is why do this + // This runtime check is needed. Look at `unsafe_object.rs:49` for more info let ptr = self.element_offset as *const $element_type; let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::<$element_type>()); @@ -268,6 +269,7 @@ impl SparkUnsafeArray { let ptr = self.element_offset as *const i64; // Note: alignment is not guaranteed - that is why do this + // This runtime check is needed. Look at `unsafe_object.rs:49` for more info let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); if NULLABLE { @@ -342,6 +344,7 @@ impl SparkUnsafeArray { let ptr = self.element_offset as *const i32; // Note: alignment is not guaranteed - that is why do this + // This runtime check is needed. Look at `unsafe_object.rs:49` for more info let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::()); if NULLABLE {