Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 206 additions & 2 deletions arrow-array/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
pub struct ArrowArrayStreamReader {
stream: FFI_ArrowArrayStream,
schema: SchemaRef,
align_buffers: bool,
}

/// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing
Expand Down Expand Up @@ -317,7 +318,11 @@ impl ArrowArrayStreamReader {

let schema = get_stream_schema(&mut stream)?;

Ok(Self { stream, schema })
Ok(Self {
stream,
schema,
align_buffers: false,
})
}

/// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`.
Expand All @@ -334,6 +339,31 @@ impl ArrowArrayStreamReader {
Self::try_new(unsafe { FFI_ArrowArrayStream::from_raw(raw_stream) })
}

/// Configure whether buffers from imported arrays should be aligned, copying
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIWIW this seems reasonable to me

/// data if necessary.
///
/// Some Arrow C Data Interface producers (notably some ADBC drivers, e.g.
/// the Go Snowflake driver) return buffers whose pointers are not aligned to
/// the corresponding primitive type. The default for [`ArrowArrayStreamReader`]
/// is to leave such buffers untouched, matching the behavior of other FFI
/// import paths in this crate; downstream readers are expected to detect
/// misalignment and surface it explicitly.
///
/// Enabling this opt-in causes [`Iterator::next`] to call
/// [`ArrayData::align_buffers`](arrow_data::ArrayData::align_buffers) on
/// each imported batch, which copies any insufficiently aligned buffers into
/// new aligned allocations. Adequately aligned buffers are left untouched, so
/// the cost is paid only when a producer actually emits unaligned data.
///
/// This is useful when consuming streams from sources known to produce
/// unaligned data, allowing the resulting [`RecordBatch`]es to be passed to
/// downstream code (e.g. compute kernels) that requires aligned buffers
/// without panicking.
pub fn with_align_buffers(mut self, align_buffers: bool) -> Self {
self.align_buffers = align_buffers;
self
}

/// Get the last error from `ArrowArrayStreamReader`
fn get_stream_last_error(&mut self) -> Option<String> {
let get_last_error = self.stream.get_last_error?;
Expand Down Expand Up @@ -365,7 +395,15 @@ impl Iterator for ArrowArrayStreamReader {
let result = unsafe {
from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone()))
};
Some(result.and_then(|data| {
let align_buffers = self.align_buffers;
Some(result.and_then(|mut data| {
if align_buffers {
// Some Arrow C Data Interface producers (e.g. ADBC drivers)
// can emit buffers that are not aligned to their corresponding
// primitive type. Opted in via `with_align_buffers`, copy
// any such buffers to a new aligned allocation.
data.align_buffers();
}
let len = data.len();
RecordBatch::try_new_with_options(
self.schema.clone(),
Expand Down Expand Up @@ -396,6 +434,7 @@ mod tests {

use crate::array::Int32Array;
use crate::ffi::from_ffi;
use arrow_data::ArrayData;

struct TestRecordBatchReader {
schema: SchemaRef,
Expand Down Expand Up @@ -555,4 +594,169 @@ mod tests {

Ok(())
}

/// Construct a `FFI_ArrowArrayStream` whose first batch contains a single
/// `Int32` column whose data buffer is deliberately misaligned by one byte,
/// mimicking what some Arrow C Data Interface producers (e.g. the Go ADBC
/// Snowflake driver) emit.
///
/// This bypasses the normal `FFI_ArrowArrayStream::new(RecordBatchReader)`
/// path because that path materializes the source `RecordBatch` into typed
/// arrays (e.g. `Int32Array`), and that materialization itself panics on
/// misaligned buffers via `ScalarBuffer::from`. Instead, we feed an
/// untyped `ArrayData` (whose buffers can stay misaligned) directly into
/// `FFI_ArrowArray::new`, which only takes raw pointers off the buffers.
fn build_unaligned_int32_stream() -> FFI_ArrowArrayStream {
use arrow_buffer::Buffer;

// Allocate an aligned i32 buffer, then slice off the first byte so the
// resulting buffer's `as_ptr()` is offset by 1 from `align_of::<i32>()`.
// The remaining bytes still fit four `i32`s starting at the unaligned
// offset.
let backing = Buffer::from_vec(vec![0_i32, 1, 2, 3, 4]);
let unaligned = backing.slice(1);
assert_ne!(
unaligned.as_ptr().align_offset(std::mem::align_of::<i32>()),
0,
"test setup: sliced buffer should be misaligned for i32",
);

// SAFETY: the buffer is large enough to hold 4 i32s starting at byte
// offset 1; `build_unchecked` is required because validation would
// (correctly) reject the misalignment, which is exactly the condition
// we want to reproduce on the consumer side.
let int32_child = unsafe {
ArrayData::builder(DataType::Int32)
.len(4)
.add_buffer(unaligned)
.build_unchecked()
};

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let struct_data = unsafe {
ArrayData::builder(DataType::Struct(schema.fields().clone()))
.len(4)
.add_child_data(int32_child)
.build_unchecked()
};

struct UnalignedPrivateData {
schema: SchemaRef,
pending: Option<ArrayData>,
}

unsafe extern "C" fn get_schema_cb(
stream: *mut FFI_ArrowArrayStream,
out_schema: *mut FFI_ArrowSchema,
) -> c_int {
let priv_data = unsafe { &*((*stream).private_data as *const UnalignedPrivateData) };
match FFI_ArrowSchema::try_from(priv_data.schema.as_ref()) {
Ok(ffi_schema) => {
unsafe { std::ptr::copy(addr_of!(ffi_schema), out_schema, 1) };
std::mem::forget(ffi_schema);
0
}
Err(_) => EINVAL,
}
}

unsafe extern "C" fn get_next_cb(
stream: *mut FFI_ArrowArrayStream,
out_array: *mut FFI_ArrowArray,
) -> c_int {
let priv_data = unsafe { &mut *((*stream).private_data as *mut UnalignedPrivateData) };
match priv_data.pending.take() {
Some(data) => {
let ffi_array = FFI_ArrowArray::new(&data);
unsafe { std::ptr::write_unaligned(out_array, ffi_array) };
0
}
None => {
unsafe { std::ptr::write(out_array, FFI_ArrowArray::empty()) };
0
}
}
}

unsafe extern "C" fn get_last_error_cb(_: *mut FFI_ArrowArrayStream) -> *const c_char {
std::ptr::null()
}

unsafe extern "C" fn release_cb(stream: *mut FFI_ArrowArrayStream) {
if stream.is_null() {
return;
}
let stream = unsafe { &mut *stream };
let _ = unsafe { Box::from_raw(stream.private_data as *mut UnalignedPrivateData) };
stream.get_schema = None;
stream.get_next = None;
stream.get_last_error = None;
stream.release = None;
}

let priv_data = Box::new(UnalignedPrivateData {
schema,
pending: Some(struct_data),
});

FFI_ArrowArrayStream {
get_schema: Some(get_schema_cb),
get_next: Some(get_next_cb),
get_last_error: Some(get_last_error_cb),
release: Some(release_cb),
private_data: Box::into_raw(priv_data) as *mut c_void,
}
}

/// With the default reader configuration, an unaligned FFI buffer must
/// surface (rather than be silently fixed up) as a panic on typed access.
/// This preserves the long-standing invariant that unaligned producers are
/// detectable bugs, not silently absorbed cost.
#[test]
#[should_panic(expected = "Memory pointer from external source")]
fn test_unaligned_buffers_default_panics_on_typed_access() {
let stream = build_unaligned_int32_stream();
let mut stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap();
// Iterating constructs typed arrays via `StructArray::from(data)` →
// `make_array(child)`, which panics in `ScalarBuffer::from` for
// misaligned i32 buffers from a `Custom` (FFI) deallocation source.
let _ = stream_reader.next().unwrap();
}

/// Opting in via `with_align_buffers(true)` must copy any insufficiently
/// aligned buffers into a fresh aligned allocation before constructing the
/// `RecordBatch`, allowing downstream typed access without panicking.
#[test]
fn test_unaligned_buffers_with_align_buffers_opt_in() {
let stream = build_unaligned_int32_stream();
let mut stream_reader = ArrowArrayStreamReader::try_new(stream)
.unwrap()
.with_align_buffers(true);

let imported = stream_reader.next().unwrap().unwrap();

// The child data buffer should now be aligned to `align_of::<i32>()`.
let column_data = imported.column(0).to_data();
let buf_ptr = column_data.buffers()[0].as_ptr();
assert_eq!(
buf_ptr.align_offset(std::mem::align_of::<i32>()),
0,
"with_align_buffers(true) should produce an aligned data buffer",
);

// Typed access on the realigned buffer must not panic. The values
// themselves are bit-shifted reinterpretations of the source bytes
// (an artifact of slicing into the middle of an i32), so we only
// assert structural correctness.
let column = imported
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(column.len(), 4);
let _ = column.values().to_vec();

// Stream is exhausted after the first batch.
assert!(stream_reader.next().is_none());
}
}