diff --git a/arrow-array/src/ffi_stream.rs b/arrow-array/src/ffi_stream.rs index 815d7c57605d..65bfc66ffff3 100644 --- a/arrow-array/src/ffi_stream.rs +++ b/arrow-array/src/ffi_stream.rs @@ -285,6 +285,7 @@ fn get_error_code(err: &ArrowError) -> i32 { pub struct ArrowArrayStreamReader { stream: FFI_ArrowArrayStream, schema: SchemaRef, + align_buffers: bool, } /// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing @@ -317,7 +318,11 @@ impl ArrowArrayStreamReader { let schema = get_stream_schema(&mut stream)?; - Ok(Self { stream, schema }) + Ok(Self { + stream, + schema, + align_buffers: false, + }) } /// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`. @@ -334,6 +339,31 @@ impl ArrowArrayStreamReader { Self::try_new(unsafe { FFI_ArrowArrayStream::from_raw(raw_stream) }) } + /// Configure whether buffers from imported arrays should be aligned, copying + /// data if necessary. + /// + /// Some Arrow C Data Interface producers (notably some ADBC drivers, e.g. + /// the Go Snowflake driver) return buffers whose pointers are not aligned to + /// the corresponding primitive type. The default for [`ArrowArrayStreamReader`] + /// is to leave such buffers untouched, matching the behavior of other FFI + /// import paths in this crate; downstream readers are expected to detect + /// misalignment and surface it explicitly. + /// + /// Enabling this opt-in causes [`Iterator::next`] to call + /// [`ArrayData::align_buffers`](arrow_data::ArrayData::align_buffers) on + /// each imported batch, which copies any insufficiently aligned buffers into + /// new aligned allocations. Adequately aligned buffers are left untouched, so + /// the cost is paid only when a producer actually emits unaligned data. + /// + /// This is useful when consuming streams from sources known to produce + /// unaligned data, allowing the resulting [`RecordBatch`]es to be passed to + /// downstream code (e.g. compute kernels) that requires aligned buffers + /// without panicking. + pub fn with_align_buffers(mut self, align_buffers: bool) -> Self { + self.align_buffers = align_buffers; + self + } + /// Get the last error from `ArrowArrayStreamReader` fn get_stream_last_error(&mut self) -> Option { let get_last_error = self.stream.get_last_error?; @@ -365,7 +395,15 @@ impl Iterator for ArrowArrayStreamReader { let result = unsafe { from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone())) }; - Some(result.and_then(|data| { + let align_buffers = self.align_buffers; + Some(result.and_then(|mut data| { + if align_buffers { + // Some Arrow C Data Interface producers (e.g. ADBC drivers) + // can emit buffers that are not aligned to their corresponding + // primitive type. Opted in via `with_align_buffers`, copy + // any such buffers to a new aligned allocation. + data.align_buffers(); + } let len = data.len(); RecordBatch::try_new_with_options( self.schema.clone(), @@ -396,6 +434,7 @@ mod tests { use crate::array::Int32Array; use crate::ffi::from_ffi; + use arrow_data::ArrayData; struct TestRecordBatchReader { schema: SchemaRef, @@ -555,4 +594,169 @@ mod tests { Ok(()) } + + /// Construct a `FFI_ArrowArrayStream` whose first batch contains a single + /// `Int32` column whose data buffer is deliberately misaligned by one byte, + /// mimicking what some Arrow C Data Interface producers (e.g. the Go ADBC + /// Snowflake driver) emit. + /// + /// This bypasses the normal `FFI_ArrowArrayStream::new(RecordBatchReader)` + /// path because that path materializes the source `RecordBatch` into typed + /// arrays (e.g. `Int32Array`), and that materialization itself panics on + /// misaligned buffers via `ScalarBuffer::from`. Instead, we feed an + /// untyped `ArrayData` (whose buffers can stay misaligned) directly into + /// `FFI_ArrowArray::new`, which only takes raw pointers off the buffers. + fn build_unaligned_int32_stream() -> FFI_ArrowArrayStream { + use arrow_buffer::Buffer; + + // Allocate an aligned i32 buffer, then slice off the first byte so the + // resulting buffer's `as_ptr()` is offset by 1 from `align_of::()`. + // The remaining bytes still fit four `i32`s starting at the unaligned + // offset. + let backing = Buffer::from_vec(vec![0_i32, 1, 2, 3, 4]); + let unaligned = backing.slice(1); + assert_ne!( + unaligned.as_ptr().align_offset(std::mem::align_of::()), + 0, + "test setup: sliced buffer should be misaligned for i32", + ); + + // SAFETY: the buffer is large enough to hold 4 i32s starting at byte + // offset 1; `build_unchecked` is required because validation would + // (correctly) reject the misalignment, which is exactly the condition + // we want to reproduce on the consumer side. + let int32_child = unsafe { + ArrayData::builder(DataType::Int32) + .len(4) + .add_buffer(unaligned) + .build_unchecked() + }; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let struct_data = unsafe { + ArrayData::builder(DataType::Struct(schema.fields().clone())) + .len(4) + .add_child_data(int32_child) + .build_unchecked() + }; + + struct UnalignedPrivateData { + schema: SchemaRef, + pending: Option, + } + + unsafe extern "C" fn get_schema_cb( + stream: *mut FFI_ArrowArrayStream, + out_schema: *mut FFI_ArrowSchema, + ) -> c_int { + let priv_data = unsafe { &*((*stream).private_data as *const UnalignedPrivateData) }; + match FFI_ArrowSchema::try_from(priv_data.schema.as_ref()) { + Ok(ffi_schema) => { + unsafe { std::ptr::copy(addr_of!(ffi_schema), out_schema, 1) }; + std::mem::forget(ffi_schema); + 0 + } + Err(_) => EINVAL, + } + } + + unsafe extern "C" fn get_next_cb( + stream: *mut FFI_ArrowArrayStream, + out_array: *mut FFI_ArrowArray, + ) -> c_int { + let priv_data = unsafe { &mut *((*stream).private_data as *mut UnalignedPrivateData) }; + match priv_data.pending.take() { + Some(data) => { + let ffi_array = FFI_ArrowArray::new(&data); + unsafe { std::ptr::write_unaligned(out_array, ffi_array) }; + 0 + } + None => { + unsafe { std::ptr::write(out_array, FFI_ArrowArray::empty()) }; + 0 + } + } + } + + unsafe extern "C" fn get_last_error_cb(_: *mut FFI_ArrowArrayStream) -> *const c_char { + std::ptr::null() + } + + unsafe extern "C" fn release_cb(stream: *mut FFI_ArrowArrayStream) { + if stream.is_null() { + return; + } + let stream = unsafe { &mut *stream }; + let _ = unsafe { Box::from_raw(stream.private_data as *mut UnalignedPrivateData) }; + stream.get_schema = None; + stream.get_next = None; + stream.get_last_error = None; + stream.release = None; + } + + let priv_data = Box::new(UnalignedPrivateData { + schema, + pending: Some(struct_data), + }); + + FFI_ArrowArrayStream { + get_schema: Some(get_schema_cb), + get_next: Some(get_next_cb), + get_last_error: Some(get_last_error_cb), + release: Some(release_cb), + private_data: Box::into_raw(priv_data) as *mut c_void, + } + } + + /// With the default reader configuration, an unaligned FFI buffer must + /// surface (rather than be silently fixed up) as a panic on typed access. + /// This preserves the long-standing invariant that unaligned producers are + /// detectable bugs, not silently absorbed cost. + #[test] + #[should_panic(expected = "Memory pointer from external source")] + fn test_unaligned_buffers_default_panics_on_typed_access() { + let stream = build_unaligned_int32_stream(); + let mut stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap(); + // Iterating constructs typed arrays via `StructArray::from(data)` → + // `make_array(child)`, which panics in `ScalarBuffer::from` for + // misaligned i32 buffers from a `Custom` (FFI) deallocation source. + let _ = stream_reader.next().unwrap(); + } + + /// Opting in via `with_align_buffers(true)` must copy any insufficiently + /// aligned buffers into a fresh aligned allocation before constructing the + /// `RecordBatch`, allowing downstream typed access without panicking. + #[test] + fn test_unaligned_buffers_with_align_buffers_opt_in() { + let stream = build_unaligned_int32_stream(); + let mut stream_reader = ArrowArrayStreamReader::try_new(stream) + .unwrap() + .with_align_buffers(true); + + let imported = stream_reader.next().unwrap().unwrap(); + + // The child data buffer should now be aligned to `align_of::()`. + let column_data = imported.column(0).to_data(); + let buf_ptr = column_data.buffers()[0].as_ptr(); + assert_eq!( + buf_ptr.align_offset(std::mem::align_of::()), + 0, + "with_align_buffers(true) should produce an aligned data buffer", + ); + + // Typed access on the realigned buffer must not panic. The values + // themselves are bit-shifted reinterpretations of the source bytes + // (an artifact of slicing into the middle of an i32), so we only + // assert structural correctness. + let column = imported + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(column.len(), 4); + let _ = column.values().to_vec(); + + // Stream is exhausted after the first batch. + assert!(stream_reader.next().is_none()); + } }