Skip to content

Commit 5e24a18

Browse files
committed
feat: add CUDA cuDF convenience API
Add vortex_cuda.to_cudf and install optional Vortex Array helpers for cuDF conversion and Arrow C Device export. Keep the conversion path CUDA-only by rejecting unsupported fallback policies and routing cuDF ingestion through fresh Arrow C Device capsules. Expand CUDA Python tests for the convenience API, installed Array methods, Arrow Device export smoke coverage, and capsule ownership paths. Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 4a90e13 commit 5e24a18

7 files changed

Lines changed: 867 additions & 29 deletions

File tree

vortex-cuda/src/arrow/canonical.rs

Lines changed: 85 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ fn gather_binary_values(
764764
///
765765
/// Returns `None` for the buffer when Arrow can omit validity because all rows are valid.
766766
///
767-
/// Returned buffers use zeroed 4-byte padding so cuDF's word-sized mask reads stay in bounds.
767+
/// Returned buffers use zeroed cuDF-sized padding so mask reads stay in bounds.
768768
/// Bits at positions `>= len + arrow_offset` within the final data byte are unspecified, as
769769
/// Arrow permits.
770770
pub(super) async fn export_arrow_validity_buffer(
@@ -798,21 +798,26 @@ pub(super) async fn export_arrow_validity_buffer(
798798
let bitmap = ctx.ensure_on_device(bits).await?;
799799
// ArrowDeviceArray uses ArrowArray layout with its buffers being device pointers.
800800
//
801-
// Validity is one bit per row, addressed via the Arrow array offset. Reuse the bitmap
802-
// when Vortex's validity offset already matches Arrow's; otherwise repack on the GPU
803-
// so row i is at Arrow bit `arrow_offset + i`.
804-
let bitmap = if meta.offset() == arrow_offset {
805-
bitmap
806-
} else {
807-
repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?
808-
};
801+
// Validity is one bit per row, addressed via the Arrow array offset. Repack on the GPU
802+
// so row i is at Arrow bit `arrow_offset + i` and the backing allocation has the
803+
// zeroed cuDF-sized padding expected by Arrow Device consumers.
804+
let bitmap =
805+
repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?;
809806
// Keep nullable exports self-describing for consumers that require exact null counts.
810807
let null_count = count_arrow_validity_nulls(&bitmap, len, arrow_offset, ctx)?;
811808
Ok((Some(bitmap), null_count))
812809
}
813810
}
814811
}
815812

813+
/// Minimum backing allocation quantum for Arrow validity buffers handed to cuDF.
814+
///
815+
/// Arrow exposes only the logical bitmap byte length, but cuDF imports null masks into 64-byte
816+
/// padded buffers and its kernels may read through that padded extent. Vortex therefore keeps the
817+
/// exported `BufferHandle` sliced to Arrow's logical length while zero-padding the underlying CUDA
818+
/// allocation to this boundary.
819+
const CUDF_VALIDITY_BUFFER_PADDING: usize = 64;
820+
816821
/// Return the byte length needed for `len` validity bits at the given bit offset.
817822
fn validity_bitmap_byte_len(len: usize, arrow_offset: usize) -> VortexResult<usize> {
818823
Ok(len
@@ -821,12 +826,25 @@ fn validity_bitmap_byte_len(len: usize, arrow_offset: usize) -> VortexResult<usi
821826
.div_ceil(8))
822827
}
823828

829+
/// Return the CUDA allocation size for a logical Arrow validity bitmap byte length.
830+
///
831+
/// The returned allocation length may be larger than `byte_len`; callers slice the exported
832+
/// `BufferHandle` back to `byte_len` while retaining the padded backing allocation. A zero-length
833+
/// bitmap still gets one byte so we never request a zero-sized CUDA allocation.
834+
fn validity_bitmap_allocation_byte_len(byte_len: usize) -> usize {
835+
if byte_len == 0 {
836+
1
837+
} else {
838+
byte_len.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
839+
}
840+
}
841+
824842
/// Allocate a zeroed device buffer with cuDF-safe padding for Arrow validity masks.
825843
fn device_zeroed_byte_buffer(
826844
byte_len: usize,
827845
ctx: &mut CudaExecutionCtx,
828846
) -> VortexResult<BufferHandle> {
829-
let allocation_len = byte_len.next_multiple_of(size_of::<u32>()).max(1);
847+
let allocation_len = validity_bitmap_allocation_byte_len(byte_len);
830848
let mut buffer = ctx.device_alloc::<u8>(allocation_len)?;
831849
ctx.stream()
832850
.memset_zeros(&mut buffer)
@@ -894,8 +912,8 @@ pub fn count_arrow_validity_nulls(
894912
///
895913
/// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer
896914
/// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a
897-
/// bit-level offset. The kernel writes the output one 64-bit word at a time, funnel-shifting two
898-
/// adjacent input words, so the allocation is padded to whole words (zeroed by the edge masks).
915+
/// bit-level offset. The output handle keeps Arrow's logical byte length, while the backing
916+
/// allocation is zero-padded to cuDF's mask allocation size for consumers that read full masks.
899917
pub fn repack_arrow_validity_buffer(
900918
input_buffer: &BufferHandle,
901919
input_offset: usize,
@@ -904,7 +922,13 @@ pub fn repack_arrow_validity_buffer(
904922
ctx: &mut CudaExecutionCtx,
905923
) -> VortexResult<BufferHandle> {
906924
let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?;
925+
// The CUDA kernel writes the bitmap as u64 words, so round the logical byte length up to the
926+
// number of words that cover the exported Arrow bytes.
907927
let output_words = output_bytes.div_ceil(size_of::<u64>());
928+
// `device_alloc::<u64>` takes a word count, while the padding policy is expressed in bytes.
929+
// Round up so the padded byte allocation is fully represented by whole u64 words.
930+
let allocation_words =
931+
validity_bitmap_allocation_byte_len(output_bytes).div_ceil(size_of::<u64>());
908932

909933
// The kernel loads the input bitmap as 64-bit words.
910934
if !input_buffer
@@ -914,7 +938,12 @@ pub fn repack_arrow_validity_buffer(
914938
vortex_bail!("Arrow validity repack requires an 8-byte aligned device buffer");
915939
}
916940

917-
let output = ctx.device_alloc::<u64>(output_words.max(1))?;
941+
let mut output = ctx.device_alloc::<u64>(allocation_words.max(1))?;
942+
// The repack kernel writes only the logical bitmap words. Zero the whole backing allocation so
943+
// cuDF's padded mask reads see invalid rows, not uninitialized CUDA memory.
944+
ctx.stream()
945+
.memset_zeros(&mut output)
946+
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?;
918947
let output_device = CudaDeviceBuffer::new(output);
919948

920949
if output_words > 0 {
@@ -1337,6 +1366,7 @@ mod tests {
13371366
use crate::arrow::ArrowDeviceArray;
13381367
use crate::arrow::DeviceArrayExt;
13391368
use crate::arrow::PrivateData;
1369+
use crate::arrow::canonical::CUDF_VALIDITY_BUFFER_PADDING;
13401370
use crate::arrow::canonical::export_arrow_validity_buffer;
13411371
use crate::arrow::canonical::repack_arrow_validity_buffer;
13421372
use crate::device_buffer::cuda_backing_allocation;
@@ -2955,7 +2985,43 @@ mod tests {
29552985
let backing_bytes = backing.to_host_sync();
29562986
assert_eq!(
29572987
backing_bytes.len(),
2958-
output_bytes.next_multiple_of(size_of::<u64>())
2988+
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
2989+
);
2990+
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));
2991+
2992+
Ok(())
2993+
}
2994+
2995+
#[crate::test]
2996+
async fn test_export_validity_buffer_pads_matching_offset() -> VortexResult<()> {
2997+
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
2998+
.vortex_expect("failed to create execution context");
2999+
3000+
let len = 3;
3001+
let arrow_offset = 0;
3002+
let (buffer, null_count) = export_arrow_validity_buffer(
3003+
Validity::from(BitBuffer::from_iter([true, false, true])),
3004+
len,
3005+
arrow_offset,
3006+
&mut ctx,
3007+
)
3008+
.await?;
3009+
ctx.synchronize_stream()?;
3010+
3011+
assert_eq!(null_count, 1);
3012+
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
3013+
let output_bytes = (len + arrow_offset).div_ceil(8);
3014+
assert_eq!(buffer.len(), output_bytes);
3015+
let actual = BitBuffer::new(buffer.to_host_sync(), len + arrow_offset)
3016+
.iter()
3017+
.collect::<Vec<_>>();
3018+
assert_eq!(actual, [true, false, true]);
3019+
3020+
let backing = cuda_backing_allocation(&buffer)?;
3021+
let backing_bytes = backing.to_host_sync();
3022+
assert_eq!(
3023+
backing_bytes.len(),
3024+
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
29593025
);
29603026
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));
29613027

@@ -2983,6 +3049,11 @@ mod tests {
29833049
let bytes = buffer.to_host_sync();
29843050
assert_eq!(bytes.len(), (len + arrow_offset).div_ceil(8));
29853051
assert!(bytes.iter().all(|byte| *byte == 0));
3052+
let backing = cuda_backing_allocation(&buffer)?;
3053+
assert_eq!(
3054+
backing.len(),
3055+
bytes.len().next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
3056+
);
29863057

29873058
Ok(())
29883059
}

vortex-cuda/src/arrow/mod.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,16 @@ pub(crate) struct PrivateData {
140140
}
141141

142142
impl PrivateData {
143+
fn retain_buffers(&self, buffers: &mut Vec<BufferHandle>) -> VortexResult<()> {
144+
// `BufferHandle::clone` is shallow for device buffers; this retains the Arc-backed CUDA
145+
// allocations for cuDF without copying device memory.
146+
buffers.extend(self.buffers.iter().flatten().cloned());
147+
for child in &self.children {
148+
retain_arrow_array_buffers(unsafe { child.as_ref() }, buffers)?;
149+
}
150+
retain_arrow_array_buffers(unsafe { self.dictionary.as_ref() }, buffers)
151+
}
152+
143153
/// Create private data for arrays that own buffers and child arrays but no dictionary.
144154
pub(crate) fn new(
145155
buffers: Vec<Option<BufferHandle>>,
@@ -204,6 +214,49 @@ impl PrivateData {
204214
}
205215
}
206216

217+
/// Cloned device buffers retained outside an Arrow Device release callback.
218+
///
219+
/// This is useful for Arrow Device consumers that consume and release the Arrow C structs during
220+
/// import but keep zero-copy views over the producer's device buffers in their returned object.
221+
#[derive(Debug)]
222+
pub struct ArrowDeviceArrayBufferKeepAlive {
223+
#[allow(
224+
dead_code,
225+
reason = "buffers are retained until this keepalive is dropped"
226+
)]
227+
buffers: Vec<BufferHandle>,
228+
}
229+
230+
/// Clone the device buffers owned by a Vortex-created Arrow Device array.
231+
///
232+
/// The returned owner must be dropped only after consumers are finished with any zero-copy views over
233+
/// `array`'s buffers. Call this before the consumer invokes the Arrow release callback.
234+
///
235+
/// # Safety
236+
///
237+
/// `array` must be a live Arrow Device array exported by Vortex. Passing a foreign Arrow array, or a
238+
/// Vortex array whose release callback has already run, may dereference invalid private data.
239+
pub unsafe fn retain_arrow_device_array_buffers(
240+
array: &ArrowDeviceArray,
241+
) -> VortexResult<ArrowDeviceArrayBufferKeepAlive> {
242+
let mut buffers = Vec::new();
243+
retain_arrow_array_buffers(Some(&array.array), &mut buffers)?;
244+
Ok(ArrowDeviceArrayBufferKeepAlive { buffers })
245+
}
246+
247+
fn retain_arrow_array_buffers(
248+
array: Option<&ArrowArray>,
249+
buffers: &mut Vec<BufferHandle>,
250+
) -> VortexResult<()> {
251+
let Some(array) = array else {
252+
return Ok(());
253+
};
254+
let Some(private_data) = (unsafe { array.private_data.cast::<PrivateData>().as_ref() }) else {
255+
return Ok(());
256+
};
257+
private_data.retain_buffers(buffers)
258+
}
259+
207260
/// A Vortex array exported as an Arrow schema and Arrow Device array pair.
208261
#[derive(Debug)]
209262
pub struct ArrowDeviceArrayWithSchema {

0 commit comments

Comments
 (0)