Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8388,6 +8388,8 @@ pub unsafe fn vortex_array::builders::PrimitiveBuilder<T>::set_validity_unchecke

pub fn vortex_array::builders::builder_with_capacity(dtype: &vortex_array::dtype::DType, capacity: usize) -> alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>

pub fn vortex_array::builders::builder_with_capacity_in(allocator: vortex_array::memory::HostAllocatorRef, dtype: &vortex_array::dtype::DType, capacity: usize) -> alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>

pub mod vortex_array::builtins

pub trait vortex_array::builtins::ArrayBuiltins: core::marker::Sized
Expand Down Expand Up @@ -13262,6 +13264,110 @@ pub fn V::matches(array: &vortex_array::ArrayRef) -> bool

pub fn V::try_match<'a>(array: &'a vortex_array::ArrayRef) -> core::option::Option<vortex_array::ArrayView<'a, V>>

pub mod vortex_array::memory

pub struct vortex_array::memory::DefaultHostAllocator

impl core::default::Default for vortex_array::memory::DefaultHostAllocator

pub fn vortex_array::memory::DefaultHostAllocator::default() -> vortex_array::memory::DefaultHostAllocator

impl core::fmt::Debug for vortex_array::memory::DefaultHostAllocator

pub fn vortex_array::memory::DefaultHostAllocator::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAllocator

pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>

pub struct vortex_array::memory::MemorySession

impl vortex_array::memory::MemorySession

pub fn vortex_array::memory::MemorySession::allocator(&self) -> vortex_array::memory::HostAllocatorRef

pub fn vortex_array::memory::MemorySession::new(allocator: vortex_array::memory::HostAllocatorRef) -> Self

pub fn vortex_array::memory::MemorySession::set_allocator(&mut self, allocator: vortex_array::memory::HostAllocatorRef)

impl core::default::Default for vortex_array::memory::MemorySession

pub fn vortex_array::memory::MemorySession::default() -> Self

impl core::fmt::Debug for vortex_array::memory::MemorySession

pub fn vortex_array::memory::MemorySession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub struct vortex_array::memory::WritableHostBuffer

impl vortex_array::memory::WritableHostBuffer

pub fn vortex_array::memory::WritableHostBuffer::alignment(&self) -> vortex_buffer::alignment::Alignment

pub fn vortex_array::memory::WritableHostBuffer::as_mut_slice(&mut self) -> &mut [u8]

pub fn vortex_array::memory::WritableHostBuffer::as_mut_slice_typed<T>(&mut self) -> vortex_error::VortexResult<&mut [T]>

pub fn vortex_array::memory::WritableHostBuffer::freeze(self) -> vortex_buffer::ByteBuffer

pub fn vortex_array::memory::WritableHostBuffer::freeze_typed<T>(self) -> vortex_error::VortexResult<vortex_buffer::buffer::Buffer<T>>

pub fn vortex_array::memory::WritableHostBuffer::is_empty(&self) -> bool

pub fn vortex_array::memory::WritableHostBuffer::len(&self) -> usize

pub fn vortex_array::memory::WritableHostBuffer::new(inner: alloc::boxed::Box<dyn vortex_array::memory::HostBufferMut>) -> Self

impl core::fmt::Debug for vortex_array::memory::WritableHostBuffer

pub fn vortex_array::memory::WritableHostBuffer::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub trait vortex_array::memory::HostAllocator: core::fmt::Debug + core::marker::Send + core::marker::Sync + 'static

pub fn vortex_array::memory::HostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>

impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAllocator

pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>

pub trait vortex_array::memory::HostAllocatorExt: vortex_array::memory::HostAllocator

pub fn vortex_array::memory::HostAllocatorExt::allocate_typed<T>(&self, len: usize) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>

impl<A: vortex_array::memory::HostAllocator + ?core::marker::Sized> vortex_array::memory::HostAllocatorExt for A

pub fn A::allocate_typed<T>(&self, len: usize) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>

pub trait vortex_array::memory::HostBufferMut: core::marker::Send + 'static

pub fn vortex_array::memory::HostBufferMut::alignment(&self) -> vortex_buffer::alignment::Alignment

pub fn vortex_array::memory::HostBufferMut::as_mut_slice(&mut self) -> &mut [u8]

pub fn vortex_array::memory::HostBufferMut::freeze(self: alloc::boxed::Box<Self>) -> vortex_buffer::ByteBuffer

pub fn vortex_array::memory::HostBufferMut::is_empty(&self) -> bool

pub fn vortex_array::memory::HostBufferMut::len(&self) -> usize

pub trait vortex_array::memory::MemorySessionExt: vortex_session::SessionExt

pub fn vortex_array::memory::MemorySessionExt::allocator(&self) -> vortex_array::memory::HostAllocatorRef

pub fn vortex_array::memory::MemorySessionExt::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemorySession>

pub fn vortex_array::memory::MemorySessionExt::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession>

impl<S: vortex_session::SessionExt> vortex_array::memory::MemorySessionExt for S

pub fn S::allocator(&self) -> vortex_array::memory::HostAllocatorRef

pub fn S::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemorySession>

pub fn S::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession>

pub type vortex_array::memory::HostAllocatorRef = alloc::sync::Arc<dyn vortex_array::memory::HostAllocator>

pub mod vortex_array::normalize

pub enum vortex_array::normalize::Operation<'a>
Expand Down Expand Up @@ -22362,6 +22468,8 @@ pub struct vortex_array::ExecutionCtx

impl vortex_array::ExecutionCtx

pub fn vortex_array::ExecutionCtx::allocator(&self) -> vortex_array::memory::HostAllocatorRef

pub fn vortex_array::ExecutionCtx::log(&mut self, msg: core::fmt::Arguments<'_>)

pub fn vortex_array::ExecutionCtx::new(session: vortex_session::VortexSession) -> Self
Expand Down
99 changes: 90 additions & 9 deletions vortex-array/src/arrays/chunked/vtable/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_buffer::BufferMut;
use vortex_buffer::Buffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;

Expand All @@ -19,12 +19,13 @@ use crate::arrays::chunked::ChunkedArrayExt;
use crate::arrays::listview::ListViewArrayExt;
use crate::arrays::listview::ListViewRebuildMode;
use crate::arrays::struct_::StructArrayExt;
use crate::builders::builder_with_capacity;
use crate::builders::builder_with_capacity_in;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::dtype::StructFields;
use crate::memory::HostAllocatorExt;
use crate::validity::Validity;

pub(super) fn _canonicalize(
Expand Down Expand Up @@ -56,7 +57,7 @@ pub(super) fn _canonicalize(
ctx,
)?),
_ => {
let mut builder = builder_with_capacity(array.dtype(), array.len());
let mut builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
array.array().append_to_builder(builder.as_mut(), ctx)?;
builder.finish_into_canonical()
}
Expand Down Expand Up @@ -131,8 +132,12 @@ fn swizzle_list_chunks(
// this much more complicated.
// We (somewhat arbitrarily) choose `u64` for our offsets and sizes here. These can always be
// narrowed later by the compressor.
let mut offsets = BufferMut::<u64>::with_capacity(len);
let mut sizes = BufferMut::<u64>::with_capacity(len);
let allocator = ctx.allocator();
let mut offsets = allocator.allocate_typed::<u64>(len)?;
let mut sizes = allocator.allocate_typed::<u64>(len)?;
let offsets_out: &mut [u64] = offsets.as_mut_slice_typed::<u64>()?;
let sizes_slice_out: &mut [u64] = sizes.as_mut_slice_typed::<u64>()?;
let mut next_list = 0usize;

for chunk in chunks {
let chunk_array = chunk.clone().execute::<ListViewArray>(ctx)?;
Expand Down Expand Up @@ -162,19 +167,31 @@ fn swizzle_list_chunks(
let sizes_slice = sizes_arr.as_slice::<u64>();

// Append offsets and sizes, adjusting offsets to point into the combined array.
offsets.extend(offsets_slice.iter().map(|o| o + num_elements));
sizes.extend(sizes_slice);
for (&offset, &size) in offsets_slice.iter().zip(sizes_slice.iter()) {
offsets_out[next_list] = offset + num_elements;
sizes_slice_out[next_list] = size;
next_list += 1;
}

num_elements += chunk_array.elements().len() as u64;
}
debug_assert_eq!(next_list, len);

// SAFETY: elements are sliced from valid `ListViewArray`s (from `to_listview()`).
let chunked_elements =
unsafe { ChunkedArray::new_unchecked(list_elements_chunks, elem_dtype.clone()) }
.into_array();

let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable).into_array();
let sizes = PrimitiveArray::new(sizes.freeze(), Validity::NonNullable).into_array();
let offsets = PrimitiveArray::new(
Buffer::<u64>::from_byte_buffer(offsets.freeze()),
Validity::NonNullable,
)
.into_array();
let sizes = PrimitiveArray::new(
Buffer::<u64>::from_byte_buffer(sizes.freeze()),
Validity::NonNullable,
)
.into_array();

// SAFETY:
// - `offsets` and `sizes` are non-nullable u64 arrays of the same length
Expand All @@ -192,9 +209,15 @@ fn swizzle_list_chunks(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::Canonical;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::ToCanonical;
use crate::accessor::ArrayAccessor;
Expand All @@ -207,8 +230,28 @@ mod tests {
use crate::dtype::DType::Primitive;
use crate::dtype::Nullability::NonNullable;
use crate::dtype::PType::I32;
use crate::memory::DefaultHostAllocator;
use crate::memory::HostAllocator;
use crate::memory::MemorySessionExt;
use crate::memory::WritableHostBuffer;
use crate::validity::Validity;

#[derive(Debug)]
struct CountingAllocator {
allocations: Arc<AtomicUsize>,
}

impl HostAllocator for CountingAllocator {
fn allocate(
&self,
len: usize,
alignment: vortex_buffer::Alignment,
) -> VortexResult<WritableHostBuffer> {
self.allocations.fetch_add(1, Ordering::Relaxed);
DefaultHostAllocator.allocate(len, alignment)
}
}

#[test]
pub fn pack_nested_structs() {
let struct_array = StructArray::try_new(
Expand Down Expand Up @@ -265,4 +308,42 @@ mod tests {
assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap());
assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap());
}

#[test]
fn list_canonicalize_uses_memory_session_allocator() {
let allocations = Arc::new(AtomicUsize::new(0));
let session = VortexSession::empty();
session
.memory_mut()
.set_allocator(Arc::new(CountingAllocator {
allocations: Arc::clone(&allocations),
}));
let mut ctx = ExecutionCtx::new(session);

let l1 = ListArray::try_new(
buffer![1, 2, 3, 4].into_array(),
buffer![0, 3].into_array(),
Validity::NonNullable,
)
.unwrap();
let l2 = ListArray::try_new(
buffer![5, 6].into_array(),
buffer![0, 2].into_array(),
Validity::NonNullable,
)
.unwrap();

let chunked_list = ChunkedArray::try_new(
vec![l1.into_array(), l2.into_array()],
List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
)
.unwrap()
.into_array();

drop(chunked_list.execute::<Canonical>(&mut ctx).unwrap());
assert!(
allocations.load(Ordering::Relaxed) >= 2,
"expected offset+size allocations through MemorySession"
);
}
}
12 changes: 12 additions & 0 deletions vortex-array/src/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::canonical::Canonical;
use crate::dtype::DType;
use crate::match_each_decimal_value_type;
use crate::match_each_native_ptype;
use crate::memory::HostAllocatorRef;
use crate::scalar::Scalar;

mod lazy_null_builder;
Expand Down Expand Up @@ -291,3 +292,14 @@ pub fn builder_with_capacity(dtype: &DType, capacity: usize) -> Box<dyn ArrayBui
}
}
}

/// Construct a new canonical builder for the given [`DType`] using a host
/// [`crate::memory::HostAllocator`].
pub fn builder_with_capacity_in(
allocator: HostAllocatorRef,
dtype: &DType,
capacity: usize,
) -> Box<dyn ArrayBuilder> {
let _allocator = allocator;
builder_with_capacity(dtype, capacity)
}
7 changes: 7 additions & 0 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::matcher::Matcher;
use crate::memory::HostAllocatorRef;
use crate::memory::MemorySessionExt;
use crate::optimizer::ArrayOptimizer;

/// Maximum number of iterations to attempt when executing an array before giving up and returning
Expand Down Expand Up @@ -210,6 +212,11 @@ impl ExecutionCtx {
&self.session
}

/// Get the session-scoped host allocator for this execution context.
pub fn allocator(&self) -> HostAllocatorRef {
self.session.allocator()
}

/// Log an execution step at the current depth.
///
/// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod kernel;
pub mod mask;
mod mask_future;
pub mod matcher;
pub mod memory;
mod metadata;
pub mod normalize;
pub mod optimizer;
Expand Down
Loading
Loading