Skip to content

Commit 0e9c0c5

Browse files
authored
Buffer Allocator Abstraction (#7337)
Initial PR to move towards a session-defined buffer allocation. 1. Sets up a MemorySession (we could move this and all buffer handle types to vortex-io and treat the i/o crate as engine integration?) 2. Defines an allocator trait, returning a WritableHostBuffer 3. Freeze the WriteableHostBuffer into a host ByteBuffer For now, we keep the existing behavior for device buffers (i.e. vortex-cuda constructs its own device BufferHandle inside its segment source). This change configures host VortexReadAt sources (ObjectStoreReadAt and FileReadAt) to use the allocator from the session. It also demonstrates using the allocator from within an array (chunked canonical), although it doesn't do the plumbing work all the way into the array builders. A follow up PR will implement a pooling allocator. Further PRs wlll port over the execution logic to use the allocator from the ExecutionCtx, eventually using clippy to forbid direct ByteBufferMut access from within our (non allocator) crates. --------- Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent ff21366 commit 0e9c0c5

13 files changed

Lines changed: 732 additions & 31 deletions

File tree

vortex-array/public-api.lock

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8388,6 +8388,8 @@ pub unsafe fn vortex_array::builders::PrimitiveBuilder<T>::set_validity_unchecke
83888388

83898389
pub fn vortex_array::builders::builder_with_capacity(dtype: &vortex_array::dtype::DType, capacity: usize) -> alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>
83908390

8391+
pub fn vortex_array::builders::builder_with_capacity_in(allocator: vortex_array::memory::HostAllocatorRef, dtype: &vortex_array::dtype::DType, capacity: usize) -> alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>
8392+
83918393
pub mod vortex_array::builtins
83928394

83938395
pub trait vortex_array::builtins::ArrayBuiltins: core::marker::Sized
@@ -13262,6 +13264,110 @@ pub fn V::matches(array: &vortex_array::ArrayRef) -> bool
1326213264

1326313265
pub fn V::try_match<'a>(array: &'a vortex_array::ArrayRef) -> core::option::Option<vortex_array::ArrayView<'a, V>>
1326413266

13267+
pub mod vortex_array::memory
13268+
13269+
pub struct vortex_array::memory::DefaultHostAllocator
13270+
13271+
impl core::default::Default for vortex_array::memory::DefaultHostAllocator
13272+
13273+
pub fn vortex_array::memory::DefaultHostAllocator::default() -> vortex_array::memory::DefaultHostAllocator
13274+
13275+
impl core::fmt::Debug for vortex_array::memory::DefaultHostAllocator
13276+
13277+
pub fn vortex_array::memory::DefaultHostAllocator::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
13278+
13279+
impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAllocator
13280+
13281+
pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>
13282+
13283+
pub struct vortex_array::memory::MemorySession
13284+
13285+
impl vortex_array::memory::MemorySession
13286+
13287+
pub fn vortex_array::memory::MemorySession::allocator(&self) -> vortex_array::memory::HostAllocatorRef
13288+
13289+
pub fn vortex_array::memory::MemorySession::new(allocator: vortex_array::memory::HostAllocatorRef) -> Self
13290+
13291+
pub fn vortex_array::memory::MemorySession::set_allocator(&mut self, allocator: vortex_array::memory::HostAllocatorRef)
13292+
13293+
impl core::default::Default for vortex_array::memory::MemorySession
13294+
13295+
pub fn vortex_array::memory::MemorySession::default() -> Self
13296+
13297+
impl core::fmt::Debug for vortex_array::memory::MemorySession
13298+
13299+
pub fn vortex_array::memory::MemorySession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
13300+
13301+
pub struct vortex_array::memory::WritableHostBuffer
13302+
13303+
impl vortex_array::memory::WritableHostBuffer
13304+
13305+
pub fn vortex_array::memory::WritableHostBuffer::alignment(&self) -> vortex_buffer::alignment::Alignment
13306+
13307+
pub fn vortex_array::memory::WritableHostBuffer::as_mut_slice(&mut self) -> &mut [u8]
13308+
13309+
pub fn vortex_array::memory::WritableHostBuffer::as_mut_slice_typed<T>(&mut self) -> vortex_error::VortexResult<&mut [T]>
13310+
13311+
pub fn vortex_array::memory::WritableHostBuffer::freeze(self) -> vortex_buffer::ByteBuffer
13312+
13313+
pub fn vortex_array::memory::WritableHostBuffer::freeze_typed<T>(self) -> vortex_error::VortexResult<vortex_buffer::buffer::Buffer<T>>
13314+
13315+
pub fn vortex_array::memory::WritableHostBuffer::is_empty(&self) -> bool
13316+
13317+
pub fn vortex_array::memory::WritableHostBuffer::len(&self) -> usize
13318+
13319+
pub fn vortex_array::memory::WritableHostBuffer::new(inner: alloc::boxed::Box<dyn vortex_array::memory::HostBufferMut>) -> Self
13320+
13321+
impl core::fmt::Debug for vortex_array::memory::WritableHostBuffer
13322+
13323+
pub fn vortex_array::memory::WritableHostBuffer::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
13324+
13325+
pub trait vortex_array::memory::HostAllocator: core::fmt::Debug + core::marker::Send + core::marker::Sync + 'static
13326+
13327+
pub fn vortex_array::memory::HostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>
13328+
13329+
impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAllocator
13330+
13331+
pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>
13332+
13333+
pub trait vortex_array::memory::HostAllocatorExt: vortex_array::memory::HostAllocator
13334+
13335+
pub fn vortex_array::memory::HostAllocatorExt::allocate_typed<T>(&self, len: usize) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>
13336+
13337+
impl<A: vortex_array::memory::HostAllocator + ?core::marker::Sized> vortex_array::memory::HostAllocatorExt for A
13338+
13339+
pub fn A::allocate_typed<T>(&self, len: usize) -> vortex_error::VortexResult<vortex_array::memory::WritableHostBuffer>
13340+
13341+
pub trait vortex_array::memory::HostBufferMut: core::marker::Send + 'static
13342+
13343+
pub fn vortex_array::memory::HostBufferMut::alignment(&self) -> vortex_buffer::alignment::Alignment
13344+
13345+
pub fn vortex_array::memory::HostBufferMut::as_mut_slice(&mut self) -> &mut [u8]
13346+
13347+
pub fn vortex_array::memory::HostBufferMut::freeze(self: alloc::boxed::Box<Self>) -> vortex_buffer::ByteBuffer
13348+
13349+
pub fn vortex_array::memory::HostBufferMut::is_empty(&self) -> bool
13350+
13351+
pub fn vortex_array::memory::HostBufferMut::len(&self) -> usize
13352+
13353+
pub trait vortex_array::memory::MemorySessionExt: vortex_session::SessionExt
13354+
13355+
pub fn vortex_array::memory::MemorySessionExt::allocator(&self) -> vortex_array::memory::HostAllocatorRef
13356+
13357+
pub fn vortex_array::memory::MemorySessionExt::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemorySession>
13358+
13359+
pub fn vortex_array::memory::MemorySessionExt::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession>
13360+
13361+
impl<S: vortex_session::SessionExt> vortex_array::memory::MemorySessionExt for S
13362+
13363+
pub fn S::allocator(&self) -> vortex_array::memory::HostAllocatorRef
13364+
13365+
pub fn S::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemorySession>
13366+
13367+
pub fn S::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession>
13368+
13369+
pub type vortex_array::memory::HostAllocatorRef = alloc::sync::Arc<dyn vortex_array::memory::HostAllocator>
13370+
1326513371
pub mod vortex_array::normalize
1326613372

1326713373
pub enum vortex_array::normalize::Operation<'a>
@@ -22362,6 +22468,8 @@ pub struct vortex_array::ExecutionCtx
2236222468

2236322469
impl vortex_array::ExecutionCtx
2236422470

22471+
pub fn vortex_array::ExecutionCtx::allocator(&self) -> vortex_array::memory::HostAllocatorRef
22472+
2236522473
pub fn vortex_array::ExecutionCtx::log(&mut self, msg: core::fmt::Arguments<'_>)
2236622474

2236722475
pub fn vortex_array::ExecutionCtx::new(session: vortex_session::VortexSession) -> Self

vortex-array/src/arrays/chunked/vtable/canonical.rs

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_buffer::BufferMut;
4+
use vortex_buffer::Buffer;
55
use vortex_error::VortexExpect;
66
use vortex_error::VortexResult;
77

@@ -19,12 +19,13 @@ use crate::arrays::chunked::ChunkedArrayExt;
1919
use crate::arrays::listview::ListViewArrayExt;
2020
use crate::arrays::listview::ListViewRebuildMode;
2121
use crate::arrays::struct_::StructArrayExt;
22-
use crate::builders::builder_with_capacity;
22+
use crate::builders::builder_with_capacity_in;
2323
use crate::builtins::ArrayBuiltins;
2424
use crate::dtype::DType;
2525
use crate::dtype::Nullability;
2626
use crate::dtype::PType;
2727
use crate::dtype::StructFields;
28+
use crate::memory::HostAllocatorExt;
2829
use crate::validity::Validity;
2930

3031
pub(super) fn _canonicalize(
@@ -56,7 +57,7 @@ pub(super) fn _canonicalize(
5657
ctx,
5758
)?),
5859
_ => {
59-
let mut builder = builder_with_capacity(array.dtype(), array.len());
60+
let mut builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
6061
array.array().append_to_builder(builder.as_mut(), ctx)?;
6162
builder.finish_into_canonical()
6263
}
@@ -131,8 +132,12 @@ fn swizzle_list_chunks(
131132
// this much more complicated.
132133
// We (somewhat arbitrarily) choose `u64` for our offsets and sizes here. These can always be
133134
// narrowed later by the compressor.
134-
let mut offsets = BufferMut::<u64>::with_capacity(len);
135-
let mut sizes = BufferMut::<u64>::with_capacity(len);
135+
let allocator = ctx.allocator();
136+
let mut offsets = allocator.allocate_typed::<u64>(len)?;
137+
let mut sizes = allocator.allocate_typed::<u64>(len)?;
138+
let offsets_out: &mut [u64] = offsets.as_mut_slice_typed::<u64>()?;
139+
let sizes_slice_out: &mut [u64] = sizes.as_mut_slice_typed::<u64>()?;
140+
let mut next_list = 0usize;
136141

137142
for chunk in chunks {
138143
let chunk_array = chunk.clone().execute::<ListViewArray>(ctx)?;
@@ -162,19 +167,31 @@ fn swizzle_list_chunks(
162167
let sizes_slice = sizes_arr.as_slice::<u64>();
163168

164169
// Append offsets and sizes, adjusting offsets to point into the combined array.
165-
offsets.extend(offsets_slice.iter().map(|o| o + num_elements));
166-
sizes.extend(sizes_slice);
170+
for (&offset, &size) in offsets_slice.iter().zip(sizes_slice.iter()) {
171+
offsets_out[next_list] = offset + num_elements;
172+
sizes_slice_out[next_list] = size;
173+
next_list += 1;
174+
}
167175

168176
num_elements += chunk_array.elements().len() as u64;
169177
}
178+
debug_assert_eq!(next_list, len);
170179

171180
// SAFETY: elements are sliced from valid `ListViewArray`s (from `to_listview()`).
172181
let chunked_elements =
173182
unsafe { ChunkedArray::new_unchecked(list_elements_chunks, elem_dtype.clone()) }
174183
.into_array();
175184

176-
let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable).into_array();
177-
let sizes = PrimitiveArray::new(sizes.freeze(), Validity::NonNullable).into_array();
185+
let offsets = PrimitiveArray::new(
186+
Buffer::<u64>::from_byte_buffer(offsets.freeze()),
187+
Validity::NonNullable,
188+
)
189+
.into_array();
190+
let sizes = PrimitiveArray::new(
191+
Buffer::<u64>::from_byte_buffer(sizes.freeze()),
192+
Validity::NonNullable,
193+
)
194+
.into_array();
178195

179196
// SAFETY:
180197
// - `offsets` and `sizes` are non-nullable u64 arrays of the same length
@@ -192,9 +209,15 @@ fn swizzle_list_chunks(
192209
#[cfg(test)]
193210
mod tests {
194211
use std::sync::Arc;
212+
use std::sync::atomic::AtomicUsize;
213+
use std::sync::atomic::Ordering;
195214

196215
use vortex_buffer::buffer;
216+
use vortex_error::VortexResult;
217+
use vortex_session::VortexSession;
197218

219+
use crate::Canonical;
220+
use crate::ExecutionCtx;
198221
use crate::IntoArray;
199222
use crate::ToCanonical;
200223
use crate::accessor::ArrayAccessor;
@@ -207,8 +230,28 @@ mod tests {
207230
use crate::dtype::DType::Primitive;
208231
use crate::dtype::Nullability::NonNullable;
209232
use crate::dtype::PType::I32;
233+
use crate::memory::DefaultHostAllocator;
234+
use crate::memory::HostAllocator;
235+
use crate::memory::MemorySessionExt;
236+
use crate::memory::WritableHostBuffer;
210237
use crate::validity::Validity;
211238

239+
#[derive(Debug)]
240+
struct CountingAllocator {
241+
allocations: Arc<AtomicUsize>,
242+
}
243+
244+
impl HostAllocator for CountingAllocator {
245+
fn allocate(
246+
&self,
247+
len: usize,
248+
alignment: vortex_buffer::Alignment,
249+
) -> VortexResult<WritableHostBuffer> {
250+
self.allocations.fetch_add(1, Ordering::Relaxed);
251+
DefaultHostAllocator.allocate(len, alignment)
252+
}
253+
}
254+
212255
#[test]
213256
pub fn pack_nested_structs() {
214257
let struct_array = StructArray::try_new(
@@ -265,4 +308,42 @@ mod tests {
265308
assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap());
266309
assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap());
267310
}
311+
312+
#[test]
313+
fn list_canonicalize_uses_memory_session_allocator() {
314+
let allocations = Arc::new(AtomicUsize::new(0));
315+
let session = VortexSession::empty();
316+
session
317+
.memory_mut()
318+
.set_allocator(Arc::new(CountingAllocator {
319+
allocations: Arc::clone(&allocations),
320+
}));
321+
let mut ctx = ExecutionCtx::new(session);
322+
323+
let l1 = ListArray::try_new(
324+
buffer![1, 2, 3, 4].into_array(),
325+
buffer![0, 3].into_array(),
326+
Validity::NonNullable,
327+
)
328+
.unwrap();
329+
let l2 = ListArray::try_new(
330+
buffer![5, 6].into_array(),
331+
buffer![0, 2].into_array(),
332+
Validity::NonNullable,
333+
)
334+
.unwrap();
335+
336+
let chunked_list = ChunkedArray::try_new(
337+
vec![l1.into_array(), l2.into_array()],
338+
List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
339+
)
340+
.unwrap()
341+
.into_array();
342+
343+
drop(chunked_list.execute::<Canonical>(&mut ctx).unwrap());
344+
assert!(
345+
allocations.load(Ordering::Relaxed) >= 2,
346+
"expected offset+size allocations through MemorySession"
347+
);
348+
}
268349
}

vortex-array/src/builders/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::canonical::Canonical;
4040
use crate::dtype::DType;
4141
use crate::match_each_decimal_value_type;
4242
use crate::match_each_native_ptype;
43+
use crate::memory::HostAllocatorRef;
4344
use crate::scalar::Scalar;
4445

4546
mod lazy_null_builder;
@@ -291,3 +292,14 @@ pub fn builder_with_capacity(dtype: &DType, capacity: usize) -> Box<dyn ArrayBui
291292
}
292293
}
293294
}
295+
296+
/// Construct a new canonical builder for the given [`DType`] using a host
297+
/// [`crate::memory::HostAllocator`].
298+
pub fn builder_with_capacity_in(
299+
allocator: HostAllocatorRef,
300+
dtype: &DType,
301+
capacity: usize,
302+
) -> Box<dyn ArrayBuilder> {
303+
let _allocator = allocator;
304+
builder_with_capacity(dtype, capacity)
305+
}

vortex-array/src/executor.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use crate::ArrayRef;
3434
use crate::Canonical;
3535
use crate::IntoArray;
3636
use crate::matcher::Matcher;
37+
use crate::memory::HostAllocatorRef;
38+
use crate::memory::MemorySessionExt;
3739
use crate::optimizer::ArrayOptimizer;
3840

3941
/// Maximum number of iterations to attempt when executing an array before giving up and returning
@@ -210,6 +212,11 @@ impl ExecutionCtx {
210212
&self.session
211213
}
212214

215+
/// Get the session-scoped host allocator for this execution context.
216+
pub fn allocator(&self) -> HostAllocatorRef {
217+
self.session.allocator()
218+
}
219+
213220
/// Log an execution step at the current depth.
214221
///
215222
/// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.

vortex-array/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub mod kernel;
5252
pub mod mask;
5353
mod mask_future;
5454
pub mod matcher;
55+
pub mod memory;
5556
mod metadata;
5657
pub mod normalize;
5758
pub mod optimizer;

0 commit comments

Comments
 (0)