Skip to content

Commit 890704f

Browse files
authored
feat: add CUDA cuDF convenience API (#8624)
See the `README.md` added as part of this PR how to use pyVortex CUDA. Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 0a45777 commit 890704f

11 files changed

Lines changed: 915 additions & 34 deletions

File tree

vortex-cuda/src/arrow/canonical.rs

Lines changed: 255 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ use crate::arrow::arrow_schema_for_array;
7575
use crate::arrow::cuda_decimal_value_type;
7676
use crate::arrow::list_view::export_device_list_view;
7777
use crate::cub::exclusive_sum_i32;
78+
use crate::device_buffer::CUDF_VALIDITY_BUFFER_PADDING;
7879
use crate::executor::CudaArrayExt;
7980
use crate::executor::execute_validity_cuda;
8081

@@ -764,7 +765,7 @@ fn gather_binary_values(
764765
///
765766
/// Returns `None` for the buffer when Arrow can omit validity because all rows are valid.
766767
///
767-
/// Returned buffers use zeroed 4-byte padding so cuDF's word-sized mask reads stay in bounds.
768+
/// Returned buffers use zeroed cuDF-sized padding so mask reads stay in bounds.
768769
/// Bits at positions `>= len + arrow_offset` within the final data byte are unspecified, as
769770
/// Arrow permits.
770771
pub(super) async fn export_arrow_validity_buffer(
@@ -773,6 +774,11 @@ pub(super) async fn export_arrow_validity_buffer(
773774
arrow_offset: usize,
774775
ctx: &mut CudaExecutionCtx,
775776
) -> VortexResult<(Option<BufferHandle>, i64)> {
777+
// Empty arrays do not need a validity buffer; avoid zero-sized CUDA allocations.
778+
if len == 0 {
779+
return Ok((None, 0));
780+
}
781+
776782
// Validity is exported separately from the array data. Decode it here so Arrow
777783
// gets a device-resident validity buffer alongside the array it belongs to.
778784
let validity = execute_validity_cuda(validity, len, ctx).await?;
@@ -796,16 +802,18 @@ pub(super) async fn export_arrow_validity_buffer(
796802
})?;
797803
let BoolDataParts { bits, meta } = array.into_data().into_parts(len);
798804
let bitmap = ctx.ensure_on_device(bits).await?;
799-
// ArrowDeviceArray uses ArrowArray layout with its buffers being device pointers.
800-
//
801-
// Validity is one bit per row, addressed via the Arrow array offset. Reuse the bitmap
802-
// when Vortex's validity offset already matches Arrow's; otherwise repack on the GPU
803-
// so row i is at Arrow bit `arrow_offset + i`.
804-
let bitmap = if meta.offset() == arrow_offset {
805-
bitmap
806-
} else {
807-
repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?
808-
};
805+
let bitmap =
806+
match export_arrow_validity_bitmap(&bitmap, meta.offset(), len, arrow_offset, ctx)?
807+
{
808+
Some(bitmap) => bitmap,
809+
None => repack_arrow_validity_buffer(
810+
&bitmap,
811+
meta.offset(),
812+
len,
813+
arrow_offset,
814+
ctx,
815+
)?,
816+
};
809817
// Keep nullable exports self-describing for consumers that require exact null counts.
810818
let null_count = count_arrow_validity_nulls(&bitmap, len, arrow_offset, ctx)?;
811819
Ok((Some(bitmap), null_count))
@@ -826,12 +834,78 @@ fn device_zeroed_byte_buffer(
826834
byte_len: usize,
827835
ctx: &mut CudaExecutionCtx,
828836
) -> VortexResult<BufferHandle> {
829-
let allocation_len = byte_len.next_multiple_of(size_of::<u32>()).max(1);
837+
vortex_ensure!(
838+
byte_len > 0,
839+
"zero-length validity buffers should be omitted"
840+
);
841+
let allocation_len = byte_len.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING);
830842
let mut buffer = ctx.device_alloc::<u8>(allocation_len)?;
831843
ctx.stream()
832844
.memset_zeros(&mut buffer)
833845
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer: {err}"))?;
834-
Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(buffer))).slice(0..byte_len))
846+
// The memset above zeroed the whole allocation, including cuDF tail padding.
847+
Ok(
848+
BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new_with_zeroed_tail(buffer, 0)?))
849+
.slice(0..byte_len),
850+
)
851+
}
852+
853+
/// Exports a matching-offset bitmap by reusing it or copying it into zero-padded storage.
854+
fn export_arrow_validity_bitmap(
855+
bitmap: &BufferHandle,
856+
input_offset: usize,
857+
len: usize,
858+
arrow_offset: usize,
859+
ctx: &mut CudaExecutionCtx,
860+
) -> VortexResult<Option<BufferHandle>> {
861+
if input_offset != arrow_offset {
862+
return Ok(None);
863+
}
864+
865+
let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?;
866+
let allocation_bytes = output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING);
867+
if bitmap.has_zeroed_tail_padding(output_bytes, allocation_bytes)? {
868+
return Ok(Some(bitmap.slice(0..output_bytes)));
869+
}
870+
871+
copy_arrow_validity_buffer(bitmap, output_bytes, ctx).map(Some)
872+
}
873+
874+
/// Copies a validity bitmap into a new cuDF-padded buffer without shifting bits.
875+
fn copy_arrow_validity_buffer(
876+
input_buffer: &BufferHandle,
877+
output_bytes: usize,
878+
ctx: &mut CudaExecutionCtx,
879+
) -> VortexResult<BufferHandle> {
880+
vortex_ensure!(
881+
output_bytes > 0,
882+
"zero-length validity buffers should be omitted"
883+
);
884+
vortex_ensure!(
885+
input_buffer.len() >= output_bytes,
886+
"Arrow validity bitmap has {} bytes, expected at least {output_bytes}",
887+
input_buffer.len()
888+
);
889+
890+
let allocation_bytes = output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING);
891+
let mut output = ctx.device_alloc::<u8>(allocation_bytes)?;
892+
ctx.stream()
893+
.memset_zeros(&mut output)
894+
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?;
895+
896+
let input_view = input_buffer.cuda_view::<u8>()?.slice(0..output_bytes);
897+
let mut output_view = output.slice_mut(0..output_bytes);
898+
ctx.stream()
899+
.memcpy_dtod(&input_view, &mut output_view)
900+
.map_err(|err| vortex_err!("Failed to copy Arrow validity buffer: {err}"))?;
901+
902+
Ok(
903+
BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new_with_zeroed_tail(
904+
output,
905+
output_bytes,
906+
)?))
907+
.slice(0..output_bytes),
908+
)
835909
}
836910

837911
pub fn count_arrow_validity_nulls(
@@ -894,8 +968,8 @@ pub fn count_arrow_validity_nulls(
894968
///
895969
/// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer
896970
/// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a
897-
/// bit-level offset. The kernel writes the output one 64-bit word at a time, funnel-shifting two
898-
/// adjacent input words, so the allocation is padded to whole words (zeroed by the edge masks).
971+
/// bit-level offset. The output handle keeps Arrow's logical byte length, while the backing
972+
/// allocation is zero-padded to cuDF's mask allocation size for consumers that read full masks.
899973
pub fn repack_arrow_validity_buffer(
900974
input_buffer: &BufferHandle,
901975
input_offset: usize,
@@ -904,7 +978,18 @@ pub fn repack_arrow_validity_buffer(
904978
ctx: &mut CudaExecutionCtx,
905979
) -> VortexResult<BufferHandle> {
906980
let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?;
981+
vortex_ensure!(
982+
output_bytes > 0,
983+
"zero-length validity buffers should be omitted"
984+
);
985+
// The CUDA kernel writes the bitmap as u64 words, so round the logical byte length up to the
986+
// number of words that cover the exported Arrow bytes.
907987
let output_words = output_bytes.div_ceil(size_of::<u64>());
988+
// `device_alloc::<u64>` takes a word count, while the padding policy is expressed in bytes.
989+
// Round up so the padded byte allocation is fully represented by whole u64 words.
990+
let allocation_words = output_bytes
991+
.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
992+
.div_ceil(size_of::<u64>());
908993

909994
// The kernel loads the input bitmap as 64-bit words.
910995
if !input_buffer
@@ -914,8 +999,14 @@ pub fn repack_arrow_validity_buffer(
914999
vortex_bail!("Arrow validity repack requires an 8-byte aligned device buffer");
9151000
}
9161001

917-
let output = ctx.device_alloc::<u64>(output_words.max(1))?;
918-
let output_device = CudaDeviceBuffer::new(output);
1002+
let mut output = ctx.device_alloc::<u64>(allocation_words.max(1))?;
1003+
// The repack kernel writes only the logical bitmap words. Zero the whole backing allocation so
1004+
// cuDF's padded mask reads see invalid rows, not uninitialized CUDA memory.
1005+
ctx.stream()
1006+
.memset_zeros(&mut output)
1007+
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?;
1008+
// The memset above zeroed all allocation bytes after the logical output.
1009+
let output_device = CudaDeviceBuffer::new_with_zeroed_tail(output, output_bytes)?;
9191010

9201011
if output_words > 0 {
9211012
let input_view = input_buffer.cuda_view::<u8>()?;
@@ -1331,6 +1422,7 @@ mod tests {
13311422
use vortex::error::vortex_bail;
13321423
use vortex::extension::datetime::TimeUnit;
13331424

1425+
use crate::CudaBufferExt;
13341426
use crate::CudaExecutionCtx;
13351427
use crate::arrow::ARROW_DEVICE_CUDA;
13361428
use crate::arrow::ArrowArray;
@@ -1339,6 +1431,7 @@ mod tests {
13391431
use crate::arrow::PrivateData;
13401432
use crate::arrow::canonical::export_arrow_validity_buffer;
13411433
use crate::arrow::canonical::repack_arrow_validity_buffer;
1434+
use crate::device_buffer::CUDF_VALIDITY_BUFFER_PADDING;
13421435
use crate::device_buffer::cuda_backing_allocation;
13431436
use crate::session::CudaSession;
13441437

@@ -2955,13 +3048,152 @@ mod tests {
29553048
let backing_bytes = backing.to_host_sync();
29563049
assert_eq!(
29573050
backing_bytes.len(),
2958-
output_bytes.next_multiple_of(size_of::<u64>())
3051+
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
3052+
);
3053+
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));
3054+
3055+
Ok(())
3056+
}
3057+
3058+
#[crate::test]
3059+
async fn test_export_validity_buffer_pads_matching_offset() -> VortexResult<()> {
3060+
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
3061+
.vortex_expect("failed to create execution context");
3062+
3063+
let len = 3;
3064+
let arrow_offset = 0;
3065+
let (buffer, null_count) = export_arrow_validity_buffer(
3066+
Validity::from(BitBuffer::from_iter([true, false, true])),
3067+
len,
3068+
arrow_offset,
3069+
&mut ctx,
3070+
)
3071+
.await?;
3072+
ctx.synchronize_stream()?;
3073+
3074+
assert_eq!(null_count, 1);
3075+
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
3076+
let output_bytes = (len + arrow_offset).div_ceil(8);
3077+
assert_eq!(buffer.len(), output_bytes);
3078+
let actual = BitBuffer::new(buffer.to_host_sync(), len + arrow_offset)
3079+
.iter()
3080+
.collect::<Vec<_>>();
3081+
assert_eq!(actual, [true, false, true]);
3082+
3083+
let backing = cuda_backing_allocation(&buffer)?;
3084+
let backing_bytes = backing.to_host_sync();
3085+
assert_eq!(
3086+
backing_bytes.len(),
3087+
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
29593088
);
29603089
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));
29613090

29623091
Ok(())
29633092
}
29643093

3094+
#[crate::test]
3095+
async fn test_export_validity_buffer_reuses_matching_padded_device_bitmap() -> VortexResult<()>
3096+
{
3097+
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
3098+
.vortex_expect("failed to create execution context");
3099+
3100+
let len = 3;
3101+
let source = BitBuffer::from_iter([true, false, true]);
3102+
let (input_offset, _, input_buffer) = source.into_inner();
3103+
let input_buffer = ctx
3104+
.ensure_on_device(BufferHandle::new_host(input_buffer))
3105+
.await?;
3106+
let input_ptr = input_buffer.cuda_device_ptr()?;
3107+
let validity = BoolArray::new_handle(
3108+
input_buffer.clone(),
3109+
input_offset,
3110+
len,
3111+
Validity::NonNullable,
3112+
)
3113+
.into_array();
3114+
3115+
let (buffer, null_count) =
3116+
export_arrow_validity_buffer(Validity::Array(validity), len, input_offset, &mut ctx)
3117+
.await?;
3118+
ctx.synchronize_stream()?;
3119+
3120+
assert_eq!(null_count, 1);
3121+
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
3122+
assert_eq!(buffer.cuda_device_ptr()?, input_ptr);
3123+
assert_eq!(buffer.len(), (len + input_offset).div_ceil(8));
3124+
let actual = BitBuffer::new(buffer.to_host_sync(), len + input_offset)
3125+
.iter()
3126+
.collect::<Vec<_>>();
3127+
let expected = std::iter::repeat_n(false, input_offset)
3128+
.chain([true, false, true])
3129+
.collect::<Vec<_>>();
3130+
assert_eq!(actual, expected);
3131+
3132+
Ok(())
3133+
}
3134+
3135+
#[crate::test]
3136+
async fn test_export_validity_buffer_repacks_matching_offset_without_tail_padding()
3137+
-> VortexResult<()> {
3138+
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
3139+
.vortex_expect("failed to create execution context");
3140+
3141+
let len = 3;
3142+
let source = BitBuffer::from_iter((0..80).map(|idx| idx % 3 != 1));
3143+
let (input_offset, _, input_buffer) = source.into_inner();
3144+
let input_buffer = ctx
3145+
.ensure_on_device(BufferHandle::new_host(input_buffer))
3146+
.await?;
3147+
let input_ptr = input_buffer.cuda_device_ptr()?;
3148+
let validity = BoolArray::new_handle(
3149+
input_buffer.clone(),
3150+
input_offset,
3151+
len,
3152+
Validity::NonNullable,
3153+
)
3154+
.into_array();
3155+
3156+
let (buffer, null_count) =
3157+
export_arrow_validity_buffer(Validity::Array(validity), len, input_offset, &mut ctx)
3158+
.await?;
3159+
ctx.synchronize_stream()?;
3160+
3161+
assert_eq!(null_count, 1);
3162+
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
3163+
assert_ne!(buffer.cuda_device_ptr()?, input_ptr);
3164+
let output_bytes = (len + input_offset).div_ceil(8);
3165+
assert_eq!(buffer.len(), output_bytes);
3166+
let actual = BitBuffer::new(buffer.to_host_sync(), len + input_offset)
3167+
.iter()
3168+
.collect::<Vec<_>>();
3169+
let expected = std::iter::repeat_n(false, input_offset)
3170+
.chain([true, false, true])
3171+
.collect::<Vec<_>>();
3172+
assert_eq!(actual, expected);
3173+
3174+
let backing = cuda_backing_allocation(&buffer)?;
3175+
assert_eq!(
3176+
backing.len(),
3177+
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
3178+
);
3179+
3180+
Ok(())
3181+
}
3182+
3183+
#[crate::test]
3184+
async fn test_export_empty_validity_buffer_is_omitted() -> VortexResult<()> {
3185+
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
3186+
.vortex_expect("failed to create execution context");
3187+
3188+
let (buffer, null_count) =
3189+
export_arrow_validity_buffer(Validity::AllInvalid, 0, 0, &mut ctx).await?;
3190+
3191+
assert_eq!(null_count, 0);
3192+
assert!(buffer.is_none());
3193+
3194+
Ok(())
3195+
}
3196+
29653197
#[crate::test]
29663198
async fn test_export_all_false_validity_buffer_is_zeroed_on_device() -> VortexResult<()> {
29673199
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
@@ -2983,6 +3215,11 @@ mod tests {
29833215
let bytes = buffer.to_host_sync();
29843216
assert_eq!(bytes.len(), (len + arrow_offset).div_ceil(8));
29853217
assert!(bytes.iter().all(|byte| *byte == 0));
3218+
let backing = cuda_backing_allocation(&buffer)?;
3219+
assert_eq!(
3220+
backing.len(),
3221+
bytes.len().next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
3222+
);
29863223

29873224
Ok(())
29883225
}

0 commit comments

Comments
 (0)