Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vortex-cuda/benches/dynamic_dispatch_cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl BenchRunner {
dispatch_plan,
device_buffers,
shared_mem_bytes,
..
} = plan.materialize(cuda_ctx).vortex_expect("materialize plan");

let device_plan = Arc::new(
Expand Down
227 changes: 225 additions & 2 deletions vortex-cuda/src/dynamic_dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ use cudarc::driver::DevicePtr;
use cudarc::driver::LaunchConfig;
use cudarc::driver::PushKernelArg;
use vortex::array::Canonical;
use vortex::array::IntoArray;
use vortex::array::arrays::ConstantArray;
use vortex::array::arrays::PrimitiveArray;
use vortex::array::buffer::BufferHandle;
use vortex::array::buffer::DeviceBufferExt;
use vortex::array::match_each_unsigned_integer_ptype;
use vortex::array::scalar::Scalar;
use vortex::array::validity::Validity;
use vortex::buffer::Alignment;
use vortex::buffer::ByteBuffer;
use vortex::buffer::ByteBufferMut;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::dtype::PType;
use vortex::error::VortexResult;
Expand Down Expand Up @@ -408,6 +412,15 @@ impl ScalarOp {
impl MaterializedPlan {
pub fn execute(self, len: usize, ctx: &mut CudaExecutionCtx) -> VortexResult<Canonical> {
let output_ptype = self.dispatch_plan.output_ptype();

// All values are null — no need to touch the GPU.
if matches!(self.validity, Validity::AllInvalid) {
let dtype = DType::Primitive(output_ptype, Nullability::Nullable);
return ConstantArray::new(Scalar::null(dtype), len)
.into_array()
.to_canonical();
}

// The CUDA kernels are instantiated for unsigned integer types only;
// map signed/float ptypes to their same-width unsigned counterpart.
let unsigned_ptype = match output_ptype {
Expand All @@ -431,9 +444,11 @@ impl MaterializedPlan {
where
T: cudarc::driver::DeviceRepr + vortex::dtype::NativePType,
{
let nullability = self.validity.nullability();

if len == 0 {
return Ok(Canonical::Primitive(PrimitiveArray::empty::<T>(
Nullability::NonNullable,
nullability,
)));
}

Expand Down Expand Up @@ -467,7 +482,7 @@ impl MaterializedPlan {
Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle(
BufferHandle::new_device(output_buf.slice_typed::<T>(0..len)),
output_ptype,
Validity::NonNullable,
self.validity,
)))
}
}
Expand All @@ -485,6 +500,7 @@ mod tests {
use vortex::array::arrays::DictArray;
use vortex::array::arrays::PrimitiveArray;
use vortex::array::scalar::Scalar;
use vortex::array::validity::Validity;
use vortex::array::validity::Validity::NonNullable;
use vortex::buffer::Buffer;
use vortex::dtype::PType;
Expand All @@ -509,6 +525,7 @@ mod tests {
use crate::CudaBufferExt;
use crate::CudaDeviceBuffer;
use crate::CudaExecutionCtx;
use crate::executor::CudaArrayExt;
use crate::hybrid_dispatch::try_gpu_dispatch;
use crate::session::CudaSession;

Expand Down Expand Up @@ -1817,4 +1834,210 @@ mod tests {

Ok(())
}

// ═══════════════════════════════════════════════════════════════════
// Validity propagation tests
// ═══════════════════════════════════════════════════════════════════

/// Nullable Primitive array — LOAD source with validity propagated.
#[crate::test]
async fn test_nullable_primitive() -> VortexResult<()> {
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let array = PrimitiveArray::from_option_iter(
(0..2048u32).map(|i| if i % 3 == 0 { None } else { Some(i) }),
);
let cpu = array.to_canonical()?.into_array();

let gpu = try_gpu_dispatch(&array.into_array(), &mut cuda_ctx)
.await?
.into_host()
.await?
.into_array();

vortex::array::assert_arrays_eq!(cpu, gpu);
Ok(())
}

/// Nullable FoR(BitPacked) — validity from the root propagated through
/// the fused plan. The standard encoding flow is: subtract FoR reference
/// to get residuals, then bitpack. BitPacked::encode preserves input
/// validity, so this produces a real nullable FoR(BitPacked) tree.
#[crate::test]
async fn test_nullable_for_bitpacked() -> VortexResult<()> {
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let len = 2048;
let reference = 1000u32;

// Original values in [reference, reference+63], every 5th null.
let values: Vec<Option<u32>> = (0..len)
.map(|i| {
if i % 5 == 0 {
None
} else {
Some((i as u32 % 64) + reference)
}
})
.collect();
let prim = PrimitiveArray::from_option_iter(values.iter().copied());
let cpu = prim.to_canonical()?.into_array();

// FoR encoding: subtract reference to get residuals [0..63].
// Null positions get 0 (from from_option_iter), which is fine —
// after subtracting reference it wraps, but validity masks it.
let residuals =
PrimitiveArray::from_option_iter(values.iter().map(|v| v.map(|x| x - reference)));

// BitPacked::encode preserves nullable validity from the input.
let bp = BitPacked::encode(&residuals.into_array(), 6)?;
let for_arr = FoR::try_new(bp.into_array(), reference.into())?;

// Verify the plan actually fuses (not just a LOAD).
assert!(
matches!(
DispatchPlan::new(&for_arr.clone().into_array())?,
DispatchPlan::Fused(_)
),
"FoR(BitPacked) with nullable validity should produce a Fused plan"
);

let gpu = try_gpu_dispatch(&for_arr.into_array(), &mut cuda_ctx)
.await?
.into_host()
.await?
.into_array();

vortex::array::assert_arrays_eq!(cpu, gpu);
Ok(())
}

/// AllInvalid array — kernel should be skipped entirely.
#[crate::test]
async fn test_all_invalid_skips_kernel() -> VortexResult<()> {
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let array = PrimitiveArray::new(Buffer::from(vec![0u32; 2048]), Validity::AllInvalid);

let result = try_gpu_dispatch(&array.into_array(), &mut cuda_ctx)
.await?
.into_host()
.await?;

let prim = result.into_primitive();
assert_eq!(prim.len(), 2048);
assert!(matches!(prim.validity()?, Validity::AllInvalid));
Ok(())
}

/// AllValid nullable array — should fuse and produce AllValid output.
#[crate::test]
async fn test_all_valid_nullable() -> VortexResult<()> {
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let values: Vec<u32> = (0..2048).collect();
let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::AllValid);

let cpu = array.to_canonical()?.into_array();
let gpu = try_gpu_dispatch(&array.into_array(), &mut cuda_ctx)
.await?
.into_host()
.await?
.into_array();

vortex::array::assert_arrays_eq!(cpu, gpu);
Ok(())
}

/// Dict with nullable codes must fall back to Unfused (not fused).
#[crate::test]
fn test_dict_nullable_codes_rejected() -> VortexResult<()> {
use vortex::buffer::buffer;

let codes = PrimitiveArray::from_option_iter([Some(0u32), None, Some(1), None, Some(2)]);
let values = PrimitiveArray::new(buffer![10u32, 20, 30], NonNullable);
let dict = DictArray::try_new(codes.into_array(), values.into_array())?;

let plan = DispatchPlan::new(&dict.into_array())?;
assert!(
matches!(plan, DispatchPlan::Unfused),
"Dict with nullable codes should fall back to Unfused"
);
Ok(())
}

/// Dict with non-nullable codes but nullable values should still fuse.
#[crate::test]
async fn test_dict_nullable_values_fuses() -> VortexResult<()> {
use vortex::buffer::buffer;

let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let codes = PrimitiveArray::new(buffer![0u32, 1, 2, 2, 1, 0], NonNullable);
let values = PrimitiveArray::from_option_iter([Some(10u32), None, Some(30)]);
let dict = DictArray::try_new(codes.into_array(), values.into_array())?;

let cpu = dict.to_canonical()?.into_array();
let gpu = dict
.into_array()
.execute_cuda(&mut cuda_ctx)
.await?
.into_host()
.await?
.into_array();

vortex::array::assert_arrays_eq!(cpu, gpu);
Ok(())
}

/// Nullable FoR(BitPacked) through CUB filter — the original bug scenario.
/// Validity must survive through fused dispatch and into the filter.
#[crate::test]
async fn test_nullable_fused_then_filter() -> VortexResult<()> {
use vortex::array::arrays::FilterArray;
use vortex::mask::Mask;

let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let len = 2048usize;
let values: Vec<Option<u32>> = (0..len)
.map(|i| {
if i % 7 == 0 {
None
} else {
Some((i % 64) as u32)
}
})
.collect();
let prim = PrimitiveArray::from_option_iter(values.iter().copied());

// Keep every other element.
let mask = Mask::from_iter((0..len).map(|i| i % 2 == 0));
let filter_array = FilterArray::try_new(prim.into_array(), mask)?;

let cpu = filter_array.to_canonical()?.into_array();
let gpu = filter_array
.into_array()
.execute_cuda(&mut cuda_ctx)
.await?
.into_host()
.await?
.into_array();

vortex::array::assert_arrays_eq!(cpu, gpu);
Ok(())
}

/// Empty nullable array should preserve nullability.
#[crate::test]
async fn test_empty_nullable_array() -> VortexResult<()> {
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;

let array = PrimitiveArray::new(Buffer::<u32>::empty(), Validity::AllValid);
let result = try_gpu_dispatch(&array.into_array(), &mut cuda_ctx).await?;
let prim = result.into_primitive();
assert_eq!(prim.len(), 0);
assert_eq!(prim.validity()?.nullability(), Nullability::Nullable);
Ok(())
}
}
23 changes: 22 additions & 1 deletion vortex-cuda/src/dynamic_dispatch/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use vortex::array::arrays::Slice;
use vortex::array::arrays::dict::DictArraySlotsExt;
use vortex::array::arrays::slice::SliceArrayExt;
use vortex::array::buffer::BufferHandle;
use vortex::array::validity::Validity;
use vortex::dtype::PType;
use vortex::encodings::alp::ALP;
use vortex::encodings::alp::ALPArrayExt;
Expand Down Expand Up @@ -52,6 +53,8 @@ pub struct MaterializedPlan {
pub device_buffers: Vec<BufferHandle>,
/// Dynamic shared memory bytes needed to launch this plan.
pub shared_mem_bytes: u32,
/// Validity of the root array, propagated to the output.
pub validity: Validity,
}

/// Checks whether the encoding of an array can be fused into a dynamic-dispatch plan.
Expand All @@ -72,6 +75,11 @@ fn is_dyn_dispatch_compatible(array: &ArrayRef) -> bool {
}
if id == Dict::ID {
let arr = array.as_::<Dict>();
// Nullable codes could hold garbage values at null positions, causing
// out-of-bounds shared memory reads in the DICT gather scalar op.
if arr.codes().dtype().is_nullable() {
return false;
}
// Dict codes and values may have different byte widths.
// The kernel handles mixed widths via widening input stages,
// but only when codes are no wider than values (the output type).
Expand All @@ -85,6 +93,12 @@ fn is_dyn_dispatch_compatible(array: &ArrayRef) -> bool {
}
if id == RunEnd::ID {
let arr = array.as_::<RunEnd>();
// Nullable ends could hold garbage values at null positions, causing
// unpredictable binary search / forward-scan behavior in the RUNEND
// source op.
if arr.ends().dtype().is_nullable() {
return false;
}
// RunEnd ends and values may have different byte widths.
// The kernel handles mixed widths via widening input stages,
// but only when ends are no wider than values (the output type).
Expand Down Expand Up @@ -213,14 +227,18 @@ pub struct FusedPlan {
output_elem_bytes: u32,
/// PType of the root (output) array, as a C ABI tag.
output_ptype: PTypeTag,
/// Validity of the root array, propagated to the output.
validity: Validity,
}

impl DispatchPlan {
/// Construct a plan by inspecting the encoding tree in a single pass.
///
/// # Limitations
///
/// - Validity bitmaps are ignored; only `NonNullable`/`AllValid` is supported.
/// - Validity is propagated from the root array to the output. Nullable
/// arrays are supported, but Dict with nullable codes and RunEnd with
/// nullable ends are rejected to guard against out-of-bounds access.
/// - `BitPackedArray` and `ALPArray` with patches are not supported.
/// - Only f32 ALP is supported (kernel stores multipliers as `float`).
pub fn new(array: &ArrayRef) -> VortexResult<Self> {
Expand Down Expand Up @@ -276,6 +294,7 @@ impl FusedPlan {
}
let output_elem_bytes = output_ptype_rust.byte_width() as u32;
let output_ptype = ptype_to_tag(output_ptype_rust);
let validity = array.validity()?;

let mut pending_subtrees: Vec<ArrayRef> = Vec::new();
let mut plan = Self {
Expand All @@ -284,6 +303,7 @@ impl FusedPlan {
source_buffers: Vec::new(),
output_elem_bytes,
output_ptype,
validity,
};

let len = array.len() as u32;
Expand Down Expand Up @@ -365,6 +385,7 @@ impl FusedPlan {
dispatch_plan: CudaDispatchPlan::new(stages, self.output_ptype),
device_buffers,
shared_mem_bytes,
validity: self.validity,
})
}

Expand Down
Loading
Loading