Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 255 additions & 18 deletions vortex-cuda/src/arrow/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use crate::arrow::arrow_schema_for_array;
use crate::arrow::cuda_decimal_value_type;
use crate::arrow::list_view::export_device_list_view;
use crate::cub::exclusive_sum_i32;
use crate::device_buffer::CUDF_VALIDITY_BUFFER_PADDING;
use crate::executor::CudaArrayExt;
use crate::executor::execute_validity_cuda;

Expand Down Expand Up @@ -764,7 +765,7 @@ fn gather_binary_values(
///
/// Returns `None` for the buffer when Arrow can omit validity because all rows are valid.
///
/// Returned buffers use zeroed 4-byte padding so cuDF's word-sized mask reads stay in bounds.
/// Returned buffers use zeroed cuDF-sized padding so mask reads stay in bounds.
/// Bits at positions `>= len + arrow_offset` within the final data byte are unspecified, as
/// Arrow permits.
pub(super) async fn export_arrow_validity_buffer(
Expand All @@ -773,6 +774,11 @@ pub(super) async fn export_arrow_validity_buffer(
arrow_offset: usize,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<(Option<BufferHandle>, i64)> {
// Empty arrays do not need a validity buffer; avoid zero-sized CUDA allocations.
if len == 0 {
return Ok((None, 0));
}

// Validity is exported separately from the array data. Decode it here so Arrow
// gets a device-resident validity buffer alongside the array it belongs to.
let validity = execute_validity_cuda(validity, len, ctx).await?;
Expand All @@ -796,16 +802,18 @@ pub(super) async fn export_arrow_validity_buffer(
})?;
let BoolDataParts { bits, meta } = array.into_data().into_parts(len);
let bitmap = ctx.ensure_on_device(bits).await?;
// ArrowDeviceArray uses ArrowArray layout with its buffers being device pointers.
//
// Validity is one bit per row, addressed via the Arrow array offset. Reuse the bitmap
// when Vortex's validity offset already matches Arrow's; otherwise repack on the GPU
// so row i is at Arrow bit `arrow_offset + i`.
let bitmap = if meta.offset() == arrow_offset {
bitmap
} else {
repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?
};
let bitmap =
match export_arrow_validity_bitmap(&bitmap, meta.offset(), len, arrow_offset, ctx)?
{
Some(bitmap) => bitmap,
None => repack_arrow_validity_buffer(
&bitmap,
meta.offset(),
len,
arrow_offset,
ctx,
)?,
};
// Keep nullable exports self-describing for consumers that require exact null counts.
let null_count = count_arrow_validity_nulls(&bitmap, len, arrow_offset, ctx)?;
Ok((Some(bitmap), null_count))
Expand All @@ -826,12 +834,78 @@ fn device_zeroed_byte_buffer(
byte_len: usize,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<BufferHandle> {
let allocation_len = byte_len.next_multiple_of(size_of::<u32>()).max(1);
vortex_ensure!(
byte_len > 0,
"zero-length validity buffers should be omitted"
);
let allocation_len = byte_len.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING);
let mut buffer = ctx.device_alloc::<u8>(allocation_len)?;
ctx.stream()
.memset_zeros(&mut buffer)
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer: {err}"))?;
Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(buffer))).slice(0..byte_len))
// The memset above zeroed the whole allocation, including cuDF tail padding.
Ok(
BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new_with_zeroed_tail(buffer, 0)?))
.slice(0..byte_len),
)
}

/// Exports a matching-offset bitmap by reusing it or copying it into zero-padded storage.
fn export_arrow_validity_bitmap(
bitmap: &BufferHandle,
input_offset: usize,
len: usize,
arrow_offset: usize,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<Option<BufferHandle>> {
if input_offset != arrow_offset {
return Ok(None);
}

let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?;
let allocation_bytes = output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING);
if bitmap.has_zeroed_tail_padding(output_bytes, allocation_bytes)? {
return Ok(Some(bitmap.slice(0..output_bytes)));
}

copy_arrow_validity_buffer(bitmap, output_bytes, ctx).map(Some)
}

/// Copies a validity bitmap into a new cuDF-padded buffer without shifting bits.
fn copy_arrow_validity_buffer(
input_buffer: &BufferHandle,
output_bytes: usize,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<BufferHandle> {
vortex_ensure!(
output_bytes > 0,
"zero-length validity buffers should be omitted"
);
vortex_ensure!(
input_buffer.len() >= output_bytes,
"Arrow validity bitmap has {} bytes, expected at least {output_bytes}",
input_buffer.len()
);

let allocation_bytes = output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING);
let mut output = ctx.device_alloc::<u8>(allocation_bytes)?;
ctx.stream()
.memset_zeros(&mut output)
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?;

let input_view = input_buffer.cuda_view::<u8>()?.slice(0..output_bytes);
let mut output_view = output.slice_mut(0..output_bytes);
ctx.stream()
.memcpy_dtod(&input_view, &mut output_view)
.map_err(|err| vortex_err!("Failed to copy Arrow validity buffer: {err}"))?;

Ok(
BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new_with_zeroed_tail(
output,
output_bytes,
)?))
.slice(0..output_bytes),
)
}

pub fn count_arrow_validity_nulls(
Expand Down Expand Up @@ -894,8 +968,8 @@ pub fn count_arrow_validity_nulls(
///
/// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer
/// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a
/// bit-level offset. The kernel writes the output one 64-bit word at a time, funnel-shifting two
/// adjacent input words, so the allocation is padded to whole words (zeroed by the edge masks).
/// bit-level offset. The output handle keeps Arrow's logical byte length, while the backing
/// allocation is zero-padded to cuDF's mask allocation size for consumers that read full masks.
pub fn repack_arrow_validity_buffer(
input_buffer: &BufferHandle,
input_offset: usize,
Expand All @@ -904,7 +978,18 @@ pub fn repack_arrow_validity_buffer(
ctx: &mut CudaExecutionCtx,
) -> VortexResult<BufferHandle> {
let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?;
vortex_ensure!(
output_bytes > 0,
"zero-length validity buffers should be omitted"
);
// The CUDA kernel writes the bitmap as u64 words, so round the logical byte length up to the
// number of words that cover the exported Arrow bytes.
let output_words = output_bytes.div_ceil(size_of::<u64>());
// `device_alloc::<u64>` takes a word count, while the padding policy is expressed in bytes.
// Round up so the padded byte allocation is fully represented by whole u64 words.
let allocation_words = output_bytes
.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
.div_ceil(size_of::<u64>());

// The kernel loads the input bitmap as 64-bit words.
if !input_buffer
Expand All @@ -914,8 +999,14 @@ pub fn repack_arrow_validity_buffer(
vortex_bail!("Arrow validity repack requires an 8-byte aligned device buffer");
}

let output = ctx.device_alloc::<u64>(output_words.max(1))?;
let output_device = CudaDeviceBuffer::new(output);
let mut output = ctx.device_alloc::<u64>(allocation_words.max(1))?;
// The repack kernel writes only the logical bitmap words. Zero the whole backing allocation so
// cuDF's padded mask reads see invalid rows, not uninitialized CUDA memory.
ctx.stream()
.memset_zeros(&mut output)
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?;
// The memset above zeroed all allocation bytes after the logical output.
let output_device = CudaDeviceBuffer::new_with_zeroed_tail(output, output_bytes)?;

if output_words > 0 {
let input_view = input_buffer.cuda_view::<u8>()?;
Expand Down Expand Up @@ -1331,6 +1422,7 @@ mod tests {
use vortex::error::vortex_bail;
use vortex::extension::datetime::TimeUnit;

use crate::CudaBufferExt;
use crate::CudaExecutionCtx;
use crate::arrow::ARROW_DEVICE_CUDA;
use crate::arrow::ArrowArray;
Expand All @@ -1339,6 +1431,7 @@ mod tests {
use crate::arrow::PrivateData;
use crate::arrow::canonical::export_arrow_validity_buffer;
use crate::arrow::canonical::repack_arrow_validity_buffer;
use crate::device_buffer::CUDF_VALIDITY_BUFFER_PADDING;
use crate::device_buffer::cuda_backing_allocation;
use crate::session::CudaSession;

Expand Down Expand Up @@ -2955,13 +3048,152 @@ mod tests {
let backing_bytes = backing.to_host_sync();
assert_eq!(
backing_bytes.len(),
output_bytes.next_multiple_of(size_of::<u64>())
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));

Ok(())
}

#[crate::test]
async fn test_export_validity_buffer_pads_matching_offset() -> VortexResult<()> {
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
.vortex_expect("failed to create execution context");

let len = 3;
let arrow_offset = 0;
let (buffer, null_count) = export_arrow_validity_buffer(
Validity::from(BitBuffer::from_iter([true, false, true])),
len,
arrow_offset,
&mut ctx,
)
.await?;
ctx.synchronize_stream()?;

assert_eq!(null_count, 1);
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
let output_bytes = (len + arrow_offset).div_ceil(8);
assert_eq!(buffer.len(), output_bytes);
let actual = BitBuffer::new(buffer.to_host_sync(), len + arrow_offset)
.iter()
.collect::<Vec<_>>();
assert_eq!(actual, [true, false, true]);

let backing = cuda_backing_allocation(&buffer)?;
let backing_bytes = backing.to_host_sync();
assert_eq!(
backing_bytes.len(),
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));

Ok(())
}

#[crate::test]
async fn test_export_validity_buffer_reuses_matching_padded_device_bitmap() -> VortexResult<()>
{
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
.vortex_expect("failed to create execution context");

let len = 3;
let source = BitBuffer::from_iter([true, false, true]);
let (input_offset, _, input_buffer) = source.into_inner();
let input_buffer = ctx
.ensure_on_device(BufferHandle::new_host(input_buffer))
.await?;
let input_ptr = input_buffer.cuda_device_ptr()?;
let validity = BoolArray::new_handle(
input_buffer.clone(),
input_offset,
len,
Validity::NonNullable,
)
.into_array();

let (buffer, null_count) =
export_arrow_validity_buffer(Validity::Array(validity), len, input_offset, &mut ctx)
.await?;
ctx.synchronize_stream()?;

assert_eq!(null_count, 1);
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
assert_eq!(buffer.cuda_device_ptr()?, input_ptr);
assert_eq!(buffer.len(), (len + input_offset).div_ceil(8));
let actual = BitBuffer::new(buffer.to_host_sync(), len + input_offset)
.iter()
.collect::<Vec<_>>();
let expected = std::iter::repeat_n(false, input_offset)
.chain([true, false, true])
.collect::<Vec<_>>();
assert_eq!(actual, expected);

Ok(())
}

#[crate::test]
async fn test_export_validity_buffer_repacks_matching_offset_without_tail_padding()
-> VortexResult<()> {
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
.vortex_expect("failed to create execution context");

let len = 3;
let source = BitBuffer::from_iter((0..80).map(|idx| idx % 3 != 1));
let (input_offset, _, input_buffer) = source.into_inner();
let input_buffer = ctx
.ensure_on_device(BufferHandle::new_host(input_buffer))
.await?;
let input_ptr = input_buffer.cuda_device_ptr()?;
let validity = BoolArray::new_handle(
input_buffer.clone(),
input_offset,
len,
Validity::NonNullable,
)
.into_array();

let (buffer, null_count) =
export_arrow_validity_buffer(Validity::Array(validity), len, input_offset, &mut ctx)
.await?;
ctx.synchronize_stream()?;

assert_eq!(null_count, 1);
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
assert_ne!(buffer.cuda_device_ptr()?, input_ptr);
let output_bytes = (len + input_offset).div_ceil(8);
assert_eq!(buffer.len(), output_bytes);
let actual = BitBuffer::new(buffer.to_host_sync(), len + input_offset)
.iter()
.collect::<Vec<_>>();
let expected = std::iter::repeat_n(false, input_offset)
.chain([true, false, true])
.collect::<Vec<_>>();
assert_eq!(actual, expected);

let backing = cuda_backing_allocation(&buffer)?;
assert_eq!(
backing.len(),
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);

Ok(())
}

#[crate::test]
async fn test_export_empty_validity_buffer_is_omitted() -> VortexResult<()> {
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
.vortex_expect("failed to create execution context");

let (buffer, null_count) =
export_arrow_validity_buffer(Validity::AllInvalid, 0, 0, &mut ctx).await?;

assert_eq!(null_count, 0);
assert!(buffer.is_none());

Ok(())
}

#[crate::test]
async fn test_export_all_false_validity_buffer_is_zeroed_on_device() -> VortexResult<()> {
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
Expand All @@ -2983,6 +3215,11 @@ mod tests {
let bytes = buffer.to_host_sync();
assert_eq!(bytes.len(), (len + arrow_offset).div_ceil(8));
assert!(bytes.iter().all(|byte| *byte == 0));
let backing = cuda_backing_allocation(&buffer)?;
assert_eq!(
backing.len(),
bytes.len().next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);

Ok(())
}
Expand Down
Loading
Loading