diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 55698904758..885e2add71e 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -8388,6 +8388,8 @@ pub unsafe fn vortex_array::builders::PrimitiveBuilder::set_validity_unchecke pub fn vortex_array::builders::builder_with_capacity(dtype: &vortex_array::dtype::DType, capacity: usize) -> alloc::boxed::Box +pub fn vortex_array::builders::builder_with_capacity_in(allocator: vortex_array::memory::HostAllocatorRef, dtype: &vortex_array::dtype::DType, capacity: usize) -> alloc::boxed::Box + pub mod vortex_array::builtins pub trait vortex_array::builtins::ArrayBuiltins: core::marker::Sized @@ -13262,6 +13264,110 @@ pub fn V::matches(array: &vortex_array::ArrayRef) -> bool pub fn V::try_match<'a>(array: &'a vortex_array::ArrayRef) -> core::option::Option> +pub mod vortex_array::memory + +pub struct vortex_array::memory::DefaultHostAllocator + +impl core::default::Default for vortex_array::memory::DefaultHostAllocator + +pub fn vortex_array::memory::DefaultHostAllocator::default() -> vortex_array::memory::DefaultHostAllocator + +impl core::fmt::Debug for vortex_array::memory::DefaultHostAllocator + +pub fn vortex_array::memory::DefaultHostAllocator::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAllocator + +pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult + +pub struct vortex_array::memory::MemorySession + +impl vortex_array::memory::MemorySession + +pub fn vortex_array::memory::MemorySession::allocator(&self) -> vortex_array::memory::HostAllocatorRef + +pub fn vortex_array::memory::MemorySession::new(allocator: vortex_array::memory::HostAllocatorRef) -> Self + +pub fn vortex_array::memory::MemorySession::set_allocator(&mut self, allocator: vortex_array::memory::HostAllocatorRef) + +impl core::default::Default for vortex_array::memory::MemorySession + +pub fn vortex_array::memory::MemorySession::default() -> Self + +impl core::fmt::Debug for vortex_array::memory::MemorySession + +pub fn vortex_array::memory::MemorySession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_array::memory::WritableHostBuffer + +impl vortex_array::memory::WritableHostBuffer + +pub fn vortex_array::memory::WritableHostBuffer::alignment(&self) -> vortex_buffer::alignment::Alignment + +pub fn vortex_array::memory::WritableHostBuffer::as_mut_slice(&mut self) -> &mut [u8] + +pub fn vortex_array::memory::WritableHostBuffer::as_mut_slice_typed(&mut self) -> vortex_error::VortexResult<&mut [T]> + +pub fn vortex_array::memory::WritableHostBuffer::freeze(self) -> vortex_buffer::ByteBuffer + +pub fn vortex_array::memory::WritableHostBuffer::freeze_typed(self) -> vortex_error::VortexResult> + +pub fn vortex_array::memory::WritableHostBuffer::is_empty(&self) -> bool + +pub fn vortex_array::memory::WritableHostBuffer::len(&self) -> usize + +pub fn vortex_array::memory::WritableHostBuffer::new(inner: alloc::boxed::Box) -> Self + +impl core::fmt::Debug for vortex_array::memory::WritableHostBuffer + +pub fn vortex_array::memory::WritableHostBuffer::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub trait vortex_array::memory::HostAllocator: core::fmt::Debug + core::marker::Send + core::marker::Sync + 'static + +pub fn vortex_array::memory::HostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult + +impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAllocator + +pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult + +pub trait vortex_array::memory::HostAllocatorExt: vortex_array::memory::HostAllocator + +pub fn vortex_array::memory::HostAllocatorExt::allocate_typed(&self, len: usize) -> vortex_error::VortexResult + +impl vortex_array::memory::HostAllocatorExt for A + +pub fn A::allocate_typed(&self, len: usize) -> vortex_error::VortexResult + +pub trait vortex_array::memory::HostBufferMut: core::marker::Send + 'static + +pub fn vortex_array::memory::HostBufferMut::alignment(&self) -> vortex_buffer::alignment::Alignment + +pub fn vortex_array::memory::HostBufferMut::as_mut_slice(&mut self) -> &mut [u8] + +pub fn vortex_array::memory::HostBufferMut::freeze(self: alloc::boxed::Box) -> vortex_buffer::ByteBuffer + +pub fn vortex_array::memory::HostBufferMut::is_empty(&self) -> bool + +pub fn vortex_array::memory::HostBufferMut::len(&self) -> usize + +pub trait vortex_array::memory::MemorySessionExt: vortex_session::SessionExt + +pub fn vortex_array::memory::MemorySessionExt::allocator(&self) -> vortex_array::memory::HostAllocatorRef + +pub fn vortex_array::memory::MemorySessionExt::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemorySession> + +pub fn vortex_array::memory::MemorySessionExt::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession> + +impl vortex_array::memory::MemorySessionExt for S + +pub fn S::allocator(&self) -> vortex_array::memory::HostAllocatorRef + +pub fn S::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemorySession> + +pub fn S::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession> + +pub type vortex_array::memory::HostAllocatorRef = alloc::sync::Arc + pub mod vortex_array::normalize pub enum vortex_array::normalize::Operation<'a> @@ -22362,6 +22468,8 @@ pub struct vortex_array::ExecutionCtx impl vortex_array::ExecutionCtx +pub fn vortex_array::ExecutionCtx::allocator(&self) -> vortex_array::memory::HostAllocatorRef + pub fn vortex_array::ExecutionCtx::log(&mut self, msg: core::fmt::Arguments<'_>) pub fn vortex_array::ExecutionCtx::new(session: vortex_session::VortexSession) -> Self diff --git a/vortex-array/src/arrays/chunked/vtable/canonical.rs b/vortex-array/src/arrays/chunked/vtable/canonical.rs index 73a9b1c856a..63bb4132eca 100644 --- a/vortex-array/src/arrays/chunked/vtable/canonical.rs +++ b/vortex-array/src/arrays/chunked/vtable/canonical.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_buffer::BufferMut; +use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -19,12 +19,13 @@ use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::listview::ListViewArrayExt; use crate::arrays::listview::ListViewRebuildMode; use crate::arrays::struct_::StructArrayExt; -use crate::builders::builder_with_capacity; +use crate::builders::builder_with_capacity_in; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; use crate::dtype::StructFields; +use crate::memory::HostAllocatorExt; use crate::validity::Validity; pub(super) fn _canonicalize( @@ -56,7 +57,7 @@ pub(super) fn _canonicalize( ctx, )?), _ => { - let mut builder = builder_with_capacity(array.dtype(), array.len()); + let mut builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len()); array.array().append_to_builder(builder.as_mut(), ctx)?; builder.finish_into_canonical() } @@ -131,8 +132,12 @@ fn swizzle_list_chunks( // this much more complicated. // We (somewhat arbitrarily) choose `u64` for our offsets and sizes here. These can always be // narrowed later by the compressor. - let mut offsets = BufferMut::::with_capacity(len); - let mut sizes = BufferMut::::with_capacity(len); + let allocator = ctx.allocator(); + let mut offsets = allocator.allocate_typed::(len)?; + let mut sizes = allocator.allocate_typed::(len)?; + let offsets_out: &mut [u64] = offsets.as_mut_slice_typed::()?; + let sizes_slice_out: &mut [u64] = sizes.as_mut_slice_typed::()?; + let mut next_list = 0usize; for chunk in chunks { let chunk_array = chunk.clone().execute::(ctx)?; @@ -162,19 +167,31 @@ fn swizzle_list_chunks( let sizes_slice = sizes_arr.as_slice::(); // Append offsets and sizes, adjusting offsets to point into the combined array. - offsets.extend(offsets_slice.iter().map(|o| o + num_elements)); - sizes.extend(sizes_slice); + for (&offset, &size) in offsets_slice.iter().zip(sizes_slice.iter()) { + offsets_out[next_list] = offset + num_elements; + sizes_slice_out[next_list] = size; + next_list += 1; + } num_elements += chunk_array.elements().len() as u64; } + debug_assert_eq!(next_list, len); // SAFETY: elements are sliced from valid `ListViewArray`s (from `to_listview()`). let chunked_elements = unsafe { ChunkedArray::new_unchecked(list_elements_chunks, elem_dtype.clone()) } .into_array(); - let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable).into_array(); - let sizes = PrimitiveArray::new(sizes.freeze(), Validity::NonNullable).into_array(); + let offsets = PrimitiveArray::new( + Buffer::::from_byte_buffer(offsets.freeze()), + Validity::NonNullable, + ) + .into_array(); + let sizes = PrimitiveArray::new( + Buffer::::from_byte_buffer(sizes.freeze()), + Validity::NonNullable, + ) + .into_array(); // SAFETY: // - `offsets` and `sizes` are non-nullable u64 arrays of the same length @@ -192,9 +209,15 @@ fn swizzle_list_chunks( #[cfg(test)] mod tests { use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + use crate::Canonical; + use crate::ExecutionCtx; use crate::IntoArray; use crate::ToCanonical; use crate::accessor::ArrayAccessor; @@ -207,8 +230,28 @@ mod tests { use crate::dtype::DType::Primitive; use crate::dtype::Nullability::NonNullable; use crate::dtype::PType::I32; + use crate::memory::DefaultHostAllocator; + use crate::memory::HostAllocator; + use crate::memory::MemorySessionExt; + use crate::memory::WritableHostBuffer; use crate::validity::Validity; + #[derive(Debug)] + struct CountingAllocator { + allocations: Arc, + } + + impl HostAllocator for CountingAllocator { + fn allocate( + &self, + len: usize, + alignment: vortex_buffer::Alignment, + ) -> VortexResult { + self.allocations.fetch_add(1, Ordering::Relaxed); + DefaultHostAllocator.allocate(len, alignment) + } + } + #[test] pub fn pack_nested_structs() { let struct_array = StructArray::try_new( @@ -265,4 +308,42 @@ mod tests { assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap()); assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap()); } + + #[test] + fn list_canonicalize_uses_memory_session_allocator() { + let allocations = Arc::new(AtomicUsize::new(0)); + let session = VortexSession::empty(); + session + .memory_mut() + .set_allocator(Arc::new(CountingAllocator { + allocations: Arc::clone(&allocations), + })); + let mut ctx = ExecutionCtx::new(session); + + let l1 = ListArray::try_new( + buffer![1, 2, 3, 4].into_array(), + buffer![0, 3].into_array(), + Validity::NonNullable, + ) + .unwrap(); + let l2 = ListArray::try_new( + buffer![5, 6].into_array(), + buffer![0, 2].into_array(), + Validity::NonNullable, + ) + .unwrap(); + + let chunked_list = ChunkedArray::try_new( + vec![l1.into_array(), l2.into_array()], + List(Arc::new(Primitive(I32, NonNullable)), NonNullable), + ) + .unwrap() + .into_array(); + + drop(chunked_list.execute::(&mut ctx).unwrap()); + assert!( + allocations.load(Ordering::Relaxed) >= 2, + "expected offset+size allocations through MemorySession" + ); + } } diff --git a/vortex-array/src/builders/mod.rs b/vortex-array/src/builders/mod.rs index 8093fcde4c3..72c6f286e42 100644 --- a/vortex-array/src/builders/mod.rs +++ b/vortex-array/src/builders/mod.rs @@ -40,6 +40,7 @@ use crate::canonical::Canonical; use crate::dtype::DType; use crate::match_each_decimal_value_type; use crate::match_each_native_ptype; +use crate::memory::HostAllocatorRef; use crate::scalar::Scalar; mod lazy_null_builder; @@ -291,3 +292,14 @@ pub fn builder_with_capacity(dtype: &DType, capacity: usize) -> Box Box { + let _allocator = allocator; + builder_with_capacity(dtype, capacity) +} diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 80581795996..be47f178467 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -34,6 +34,8 @@ use crate::ArrayRef; use crate::Canonical; use crate::IntoArray; use crate::matcher::Matcher; +use crate::memory::HostAllocatorRef; +use crate::memory::MemorySessionExt; use crate::optimizer::ArrayOptimizer; /// Maximum number of iterations to attempt when executing an array before giving up and returning @@ -210,6 +212,11 @@ impl ExecutionCtx { &self.session } + /// Get the session-scoped host allocator for this execution context. + pub fn allocator(&self) -> HostAllocatorRef { + self.session.allocator() + } + /// Log an execution step at the current depth. /// /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level. diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index cb0956b8f47..e84c83c0cf6 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -52,6 +52,7 @@ pub mod kernel; pub mod mask; mod mask_future; pub mod matcher; +pub mod memory; mod metadata; pub mod normalize; pub mod optimizer; diff --git a/vortex-array/src/memory.rs b/vortex-array/src/memory.rs new file mode 100644 index 00000000000..47030b18131 --- /dev/null +++ b/vortex-array/src/memory.rs @@ -0,0 +1,375 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session-scoped memory allocation for host-side buffers. + +use std::fmt::Debug; +use std::mem::size_of; +use std::sync::Arc; + +use bytes::Bytes; +use vortex_buffer::Alignment; +use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::Ref; +use vortex_session::RefMut; +use vortex_session::SessionExt; + +/// Mutable host buffer contract used by [`WritableHostBuffer`]. +pub trait HostBufferMut: Send + 'static { + /// Returns the logical byte length of the buffer. + fn len(&self) -> usize; + + /// Whether the buffer is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the alignment of the buffer. + fn alignment(&self) -> Alignment; + + /// Returns mutable access to the writable byte range. + fn as_mut_slice(&mut self) -> &mut [u8]; + + /// Freeze the buffer into an immutable [`ByteBuffer`]. + fn freeze(self: Box) -> ByteBuffer; +} + +/// Exact-size writable host buffer returned by a [`HostAllocator`]. +pub struct WritableHostBuffer { + inner: Box, +} + +impl WritableHostBuffer { + /// Create a writable host buffer from an implementation of [`HostBufferMut`]. + pub fn new(inner: Box) -> Self { + Self { inner } + } + + /// Returns the logical byte length of the buffer. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true when the buffer has zero bytes. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the alignment of the buffer. + pub fn alignment(&self) -> Alignment { + self.inner.alignment() + } + + /// Returns mutable access to the writable byte range. + pub fn as_mut_slice(&mut self) -> &mut [u8] { + self.inner.as_mut_slice() + } + + /// Returns mutable access to the buffer as a typed slice. + pub fn as_mut_slice_typed(&mut self) -> VortexResult<&mut [T]> { + vortex_ensure!( + size_of::() != 0, + InvalidArgument: "Cannot create typed mutable slice for zero-sized type {}", + std::any::type_name::() + ); + vortex_ensure!( + self.alignment().is_aligned_to(Alignment::of::()), + InvalidArgument: "Buffer is not sufficiently aligned for type {}", + std::any::type_name::() + ); + + let bytes = self.as_mut_slice(); + let byte_len = bytes.len(); + let ptr = bytes.as_mut_ptr(); + let type_size = size_of::(); + + vortex_ensure!( + byte_len.is_multiple_of(type_size), + InvalidArgument: "Buffer length {byte_len} is not a multiple of {} for {}", + type_size, + std::any::type_name::() + ); + + // SAFETY: We checked size divisibility and pointer alignment for `T`, + // and we have exclusive mutable access to the underlying bytes. + Ok(unsafe { std::slice::from_raw_parts_mut(ptr.cast::(), byte_len / type_size) }) + } + + /// Freeze the writable buffer into an immutable [`ByteBuffer`]. + pub fn freeze(self) -> ByteBuffer { + self.inner.freeze() + } + + /// Freeze the writable buffer into a typed immutable [`Buffer`]. + pub fn freeze_typed(self) -> VortexResult> { + vortex_ensure!( + size_of::() != 0, + InvalidArgument: "Cannot freeze typed buffer for zero-sized type {}", + std::any::type_name::() + ); + + let buffer = self.freeze(); + let byte_len = buffer.len(); + let type_size = size_of::(); + let type_align = Alignment::of::(); + + vortex_ensure!( + byte_len.is_multiple_of(type_size), + InvalidArgument: "Buffer length {byte_len} is not a multiple of {} for {}", + type_size, + std::any::type_name::() + ); + vortex_ensure!( + buffer.is_aligned(type_align), + InvalidArgument: "Buffer pointer is not aligned to {} for {}", + type_align, + std::any::type_name::() + ); + + Ok(Buffer::from_byte_buffer(buffer)) + } +} + +impl Debug for WritableHostBuffer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WritableHostBuffer") + .field("len", &self.len()) + .field("alignment", &self.alignment()) + .finish() + } +} + +/// Allocator for exact-size writable host buffers. +pub trait HostAllocator: Debug + Send + Sync + 'static { + /// Allocate a writable host buffer with the requested byte length and alignment. + fn allocate(&self, len: usize, alignment: Alignment) -> VortexResult; +} + +/// Shared allocator reference used throughout session-scoped memory APIs. +pub type HostAllocatorRef = Arc; + +/// Extension methods for [`HostAllocator`]s. +pub trait HostAllocatorExt: HostAllocator { + /// Allocate host memory for `len` elements of `T` using `Alignment::of::()`. + fn allocate_typed(&self, len: usize) -> VortexResult { + let bytes = len.checked_mul(size_of::()).ok_or_else(|| { + vortex_err!( + "Typed host allocation overflow for type {} and len {}", + std::any::type_name::(), + len + ) + })?; + self.allocate(bytes, Alignment::of::()) + } +} + +impl HostAllocatorExt for A {} + +/// Session-scoped memory configuration for Vortex arrays. +#[derive(Debug)] +pub struct MemorySession { + allocator: HostAllocatorRef, +} + +impl MemorySession { + /// Creates a new session memory configuration using the provided allocator. + pub fn new(allocator: HostAllocatorRef) -> Self { + Self { allocator } + } + + /// Returns the configured allocator. + pub fn allocator(&self) -> HostAllocatorRef { + Arc::clone(&self.allocator) + } + + /// Updates the configured allocator. + pub fn set_allocator(&mut self, allocator: HostAllocatorRef) { + self.allocator = allocator; + } +} + +impl Default for MemorySession { + fn default() -> Self { + Self::new(Arc::new(DefaultHostAllocator)) + } +} + +/// Extension trait for accessing session-scoped memory configuration. +pub trait MemorySessionExt: SessionExt { + /// Returns the memory session for this execution/session context. + fn memory(&self) -> Ref<'_, MemorySession> { + self.get::() + } + + /// Returns the configured host allocator for this execution/session context. + fn allocator(&self) -> HostAllocatorRef { + self.memory().allocator() + } + + /// Returns mutable access to the memory session. + fn memory_mut(&self) -> RefMut<'_, MemorySession> { + self.get_mut::() + } +} + +impl MemorySessionExt for S {} + +/// Default host allocator. +#[derive(Debug, Default)] +pub struct DefaultHostAllocator; + +impl HostAllocator for DefaultHostAllocator { + fn allocate(&self, len: usize, alignment: Alignment) -> VortexResult { + let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment); + // SAFETY: We fully initialize this slice before freezing it. + unsafe { buffer.set_len(len) }; + Ok(WritableHostBuffer::new(Box::new( + DefaultWritableHostBuffer { buffer, alignment }, + ))) + } +} + +#[derive(Debug)] +struct DefaultWritableHostBuffer { + buffer: ByteBufferMut, + alignment: Alignment, +} + +#[derive(Debug)] +struct HostBufferOwner { + buffer: ByteBufferMut, +} + +impl AsRef<[u8]> for HostBufferOwner { + fn as_ref(&self) -> &[u8] { + self.buffer.as_slice() + } +} + +impl HostBufferMut for DefaultWritableHostBuffer { + fn len(&self) -> usize { + self.buffer.len() + } + + fn alignment(&self) -> Alignment { + self.alignment + } + + fn as_mut_slice(&mut self) -> &mut [u8] { + self.buffer.as_mut_slice() + } + + fn freeze(self: Box) -> ByteBuffer { + let Self { buffer, alignment } = *self; + let bytes = Bytes::from_owner(HostBufferOwner { buffer }); + ByteBuffer::from_bytes_aligned(bytes, alignment) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use super::*; + + #[derive(Debug)] + struct CountingAllocator { + allocations: Arc, + } + + impl HostAllocator for CountingAllocator { + fn allocate(&self, len: usize, alignment: Alignment) -> VortexResult { + self.allocations.fetch_add(1, Ordering::Relaxed); + DefaultHostAllocator.allocate(len, alignment) + } + } + + #[test] + fn writable_host_buffer_freeze_round_trip() { + let allocator = DefaultHostAllocator; + let mut writable = allocator.allocate(16, Alignment::new(8)).unwrap(); + for (idx, byte) in writable.as_mut_slice().iter_mut().enumerate() { + *byte = u8::try_from(idx).unwrap(); + } + + let host = writable.freeze(); + assert_eq!(host.len(), 16); + assert!(host.is_aligned(Alignment::new(8))); + assert_eq!(host.as_slice(), (0u8..16).collect::>().as_slice()); + } + + #[test] + fn memory_session_replaces_allocator() { + let allocations = Arc::new(AtomicUsize::new(0)); + let allocator = Arc::new(CountingAllocator { + allocations: Arc::clone(&allocations), + }); + let mut session = MemorySession::default(); + session.set_allocator(allocator); + drop(session.allocator().allocate(4, Alignment::none()).unwrap()); + assert_eq!(allocations.load(Ordering::Relaxed), 1); + } + + #[test] + fn typed_allocation_uses_type_alignment() { + let allocator = DefaultHostAllocator; + let writable = allocator.allocate_typed::(4).unwrap(); + assert_eq!(writable.len(), 4 * size_of::()); + assert_eq!(writable.alignment(), Alignment::of::()); + } + + #[test] + fn typed_mut_slice_round_trip() { + let allocator = DefaultHostAllocator; + let mut writable = allocator.allocate_typed::(4).unwrap(); + writable + .as_mut_slice_typed::() + .unwrap() + .copy_from_slice(&[10, 20, 30, 40]); + + let frozen = writable.freeze(); + let values = unsafe { + std::slice::from_raw_parts( + frozen.as_slice().as_ptr().cast::(), + frozen.len() / size_of::(), + ) + }; + assert_eq!(values, [10, 20, 30, 40]); + } + + #[test] + fn typed_mut_slice_rejects_length_mismatch() { + let allocator = DefaultHostAllocator; + let mut writable = allocator.allocate(7, Alignment::none()).unwrap(); + assert!(writable.as_mut_slice_typed::().is_err()); + } + + #[test] + fn freeze_typed_round_trip() { + let allocator = DefaultHostAllocator; + let mut writable = allocator.allocate_typed::(4).unwrap(); + writable + .as_mut_slice_typed::() + .unwrap() + .copy_from_slice(&[1, 3, 5, 7]); + + let frozen = writable.freeze_typed::().unwrap(); + assert_eq!(frozen.as_slice(), [1, 3, 5, 7]); + } + + #[test] + fn freeze_typed_rejects_length_mismatch() { + let allocator = DefaultHostAllocator; + let writable = allocator.allocate(7, Alignment::none()).unwrap(); + let err = writable.freeze_typed::().unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("not a multiple of")); + } +} diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index a1bb5895204..2515b66289e 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -43,6 +43,7 @@ use futures::stream; use object_store::ObjectMeta; use object_store::ObjectStore; use vortex::VortexSessionDefault; +use vortex::array::memory::MemorySessionExt; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::dtype::PType; @@ -265,10 +266,11 @@ impl FileFormat for VortexFormat { } // Not entry or invalid - open the file - let reader = Arc::new(ObjectStoreReadAt::new( + let reader = Arc::new(ObjectStoreReadAt::new_with_allocator( store, object.location.clone(), session.handle(), + session.allocator(), )); let vxf = session @@ -337,10 +339,11 @@ impl FileFormat for VortexFormat { Some(metadata) => metadata, None => { // Not entry - open the file - let reader = Arc::new(ObjectStoreReadAt::new( + let reader = Arc::new(ObjectStoreReadAt::new_with_allocator( store, object.location.clone(), session.handle(), + session.allocator(), )); let vxf = session diff --git a/vortex-datafusion/src/persistent/reader.rs b/vortex-datafusion/src/persistent/reader.rs index fad6ef37eaf..34ff646722c 100644 --- a/vortex-datafusion/src/persistent/reader.rs +++ b/vortex-datafusion/src/persistent/reader.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use datafusion_common::Result as DFResult; use datafusion_datasource::PartitionedFile; use object_store::ObjectStore; +use vortex::array::memory::MemorySessionExt; use vortex::io::VortexReadAt; use vortex::io::object_store::ObjectStoreReadAt; use vortex::io::session::RuntimeSessionExt; @@ -45,10 +46,11 @@ impl VortexReaderFactory for DefaultVortexReaderFactory { file: &PartitionedFile, session: &VortexSession, ) -> DFResult> { - Ok(Arc::new(ObjectStoreReadAt::new( + Ok(Arc::new(ObjectStoreReadAt::new_with_allocator( Arc::clone(&self.object_store), file.path().as_ref().into(), session.handle(), + session.allocator(), )) as _) } } diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index fe545cfb26a..59ec83b7fe9 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -398,11 +398,11 @@ pub const vortex_file::VORTEX_FILE_EXTENSION: &str pub static vortex_file::ALLOWED_ENCODINGS: std::sync::lazy_lock::LazyLock> -pub trait vortex_file::OpenOptionsSessionExt: vortex_array::session::ArraySessionExt + vortex_layout::session::LayoutSessionExt + vortex_io::session::RuntimeSessionExt +pub trait vortex_file::OpenOptionsSessionExt: vortex_array::session::ArraySessionExt + vortex_layout::session::LayoutSessionExt + vortex_io::session::RuntimeSessionExt + vortex_array::memory::MemorySessionExt pub fn vortex_file::OpenOptionsSessionExt::open_options(&self) -> vortex_file::VortexOpenOptions -impl vortex_file::OpenOptionsSessionExt for S +impl vortex_file::OpenOptionsSessionExt for S pub fn S::open_options(&self) -> vortex_file::VortexOpenOptions diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 2231796b2e8..3a5d3cf90e5 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use futures::executor::block_on; use parking_lot::RwLock; use vortex_array::dtype::DType; +use vortex_array::memory::MemorySessionExt; use vortex_array::session::ArraySessionExt; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; @@ -61,7 +62,9 @@ pub struct VortexOpenOptions { labels: Vec