Skip to content

Commit b7a5b32

Browse files
committed
feat: add CUDA cuDF convenience API
Add vortex_cuda.to_cudf and install optional Vortex Array helpers for cuDF conversion and Arrow C Device export. Keep the conversion path CUDA-only by rejecting unsupported fallback policies and routing cuDF ingestion through fresh Arrow C Device capsules. Expand CUDA Python tests for the convenience API, installed Array methods, Arrow Device export smoke coverage, and capsule ownership paths. Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 4a90e13 commit b7a5b32

5 files changed

Lines changed: 540 additions & 4 deletions

File tree

vortex-python-cuda/python/vortex_cuda/__init__.py

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,116 @@
22
# SPDX-FileCopyrightText: Copyright the Vortex contributors
33
# pyright: reportMissingModuleSource=false, reportPrivateUsage=false
44

5+
import importlib
6+
from typing import Protocol, cast, final
7+
58
from . import _lib
69

10+
# Private debug hooks used by CUDA bridge tests.
711
_debug_array_metadata_dtype = _lib._debug_array_metadata_dtype
812
_debug_array_metadata_display_values = _lib._debug_array_metadata_display_values
13+
_debug_arrow_device_array_capsule_summary = _lib._debug_arrow_device_array_capsule_summary
14+
_debug_consume_arrow_device_array_capsules = _lib._debug_consume_arrow_device_array_capsules
915
cuda_available = _lib.cuda_available
1016
export_device_array = _lib.export_device_array
1117

12-
__all__ = ["cuda_available", "export_device_array"]
18+
class _FromArrow(Protocol):
19+
def from_arrow(self, obj: object) -> object: ...
20+
21+
22+
class _FromPylibcudf(Protocol):
23+
def from_pylibcudf(self, obj: object) -> object: ...
24+
25+
26+
class _DataFrameFromPylibcudf(Protocol):
27+
def from_pylibcudf(self, obj: object, metadata: dict[str, object] | None = None) -> object: ...
28+
29+
30+
class _PylibcudfModule(Protocol):
31+
Column: _FromArrow
32+
Table: _FromArrow
33+
34+
35+
class _CudfModule(Protocol):
36+
Series: _FromPylibcudf
37+
DataFrame: _DataFrameFromPylibcudf
38+
39+
40+
@final
41+
class _ArrowDeviceArrayExportable:
42+
def __init__(self, array: object) -> None:
43+
self._array: object = array
44+
45+
def __arrow_c_device_array__(
46+
self,
47+
requested_schema: object | None = None,
48+
**kwargs: object,
49+
) -> tuple[object, object]:
50+
return export_device_array(self._array, requested_schema, **kwargs)
51+
52+
53+
_SUPPORTED_FALLBACKS = frozenset({"error"})
54+
55+
56+
def _Array_to_cudf(self: object, *, fallback: str = "error") -> object:
57+
return to_cudf(self, fallback=fallback)
58+
59+
60+
def _Array___arrow_c_device_array__(
61+
self: object,
62+
requested_schema: object | None = None,
63+
**kwargs: object,
64+
) -> tuple[object, object]:
65+
return export_device_array(self, requested_schema, **kwargs)
66+
67+
68+
def _install_vortex_array_methods() -> None:
69+
import vortex
70+
71+
setattr(vortex.Array, "to_cudf", _Array_to_cudf)
72+
if cuda_available():
73+
setattr(vortex.Array, "__arrow_c_device_array__", _Array___arrow_c_device_array__)
74+
75+
76+
def _import_cudf_modules() -> tuple[_CudfModule, _PylibcudfModule]:
77+
try:
78+
cudf = importlib.import_module("cudf")
79+
pylibcudf = importlib.import_module("pylibcudf")
80+
except ImportError as err:
81+
raise ImportError("vortex_cuda.to_cudf requires RAPIDS cuDF and pylibcudf to be installed") from err
82+
return cast(_CudfModule, cast(object, cudf)), cast(_PylibcudfModule, cast(object, pylibcudf))
83+
84+
85+
def to_cudf(obj: object, *, fallback: str = "error") -> object:
86+
"""Convert a Vortex array to a cuDF object through the Arrow Device interface.
87+
88+
``fallback`` is reserved for future policy choices. The initial implementation
89+
supports only ``fallback="error"`` and never falls back to host Arrow conversion.
90+
"""
91+
if fallback not in _SUPPORTED_FALLBACKS:
92+
raise NotImplementedError("vortex_cuda.to_cudf currently supports only fallback='error'")
93+
94+
import vortex
95+
96+
if not isinstance(obj, vortex.Array):
97+
raise TypeError(f"vortex_cuda.to_cudf expected a vortex.Array, got {type(obj).__name__}")
98+
99+
if not cuda_available():
100+
raise RuntimeError("CUDA is not available; vortex_cuda.to_cudf requires a CUDA device")
101+
102+
cudf, pylibcudf = _import_cudf_modules()
103+
exportable = _ArrowDeviceArrayExportable(obj)
104+
105+
dtype = obj.dtype
106+
if isinstance(dtype, vortex.StructDType):
107+
table = pylibcudf.Table.from_arrow(exportable)
108+
return cudf.DataFrame.from_pylibcudf(table, metadata={"columns": dtype.names()})
109+
110+
column = pylibcudf.Column.from_arrow(exportable)
111+
return cudf.Series.from_pylibcudf(column)
112+
113+
114+
_install_vortex_array_methods()
115+
116+
117+
__all__ = ["cuda_available", "export_device_array", "to_cudf"]

vortex-python-cuda/python/vortex_cuda/_lib.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
def _debug_array_metadata_dtype(array: object) -> str: ...
55
def _debug_array_metadata_display_values(array: object) -> str: ...
6+
def _debug_arrow_device_array_capsule_summary(schema: object, device_array: object) -> dict[str, object]: ...
7+
def _debug_consume_arrow_device_array_capsules(
8+
schema: object, device_array: object
9+
) -> tuple[bool, bool, bool, bool, bool, bool]: ...
610
def cuda_available() -> bool: ...
711
def export_device_array(
812
array: object, requested_schema: object | None = None, **kwargs: object

vortex-python-cuda/src/lib.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ const USED_ARROW_SCHEMA_CAPSULE_NAME: &CStr = c_str!("used_arrow_schema");
5858
const ARROW_DEVICE_ARRAY_CAPSULE_NAME: &CStr = c_str!("arrow_device_array");
5959
const USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME: &CStr = c_str!("used_arrow_device_array");
6060

61+
// Owns a buffer export after we steal it from the source PyCapsule. `Bytes::from_owner`
62+
// keeps this guard alive for every cloned byte buffer and drops it to run the producer's
63+
// release callback once the reconstructed Vortex buffer is no longer referenced.
6164
struct BufferExportGuard {
6265
export: NonNull<VortexBufferExport>,
6366
}
@@ -89,8 +92,9 @@ impl Drop for BufferExportGuard {
8992
}
9093
}
9194

92-
// The guard is moved into `Bytes::from_owner`, which requires `Send + Sync`. After import we disable
93-
// the source capsule destructor and own the C export until this guard is dropped.
95+
// The guard is moved into `Bytes::from_owner`, which requires `Send + Sync`.
96+
// After import we disable the source capsule destructor and own the C export
97+
// until this guard is dropped.
9498
unsafe impl Send for BufferExportGuard {}
9599
unsafe impl Sync for BufferExportGuard {}
96100

@@ -461,6 +465,84 @@ fn release_exported(exported: &mut ArrowDeviceArrayWithSchema) {
461465
release_device_array(&mut exported.array);
462466
}
463467

468+
/// Return non-owning details from Arrow Device capsules for Python-side smoke consumers.
469+
#[pyfunction]
470+
fn _debug_arrow_device_array_capsule_summary<'py>(
471+
py: Python<'py>,
472+
schema: Bound<'py, PyCapsule>,
473+
device_array: Bound<'py, PyCapsule>,
474+
) -> PyResult<Bound<'py, PyDict>> {
475+
let schema = unsafe {
476+
schema
477+
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
478+
.cast::<FFI_ArrowSchema>()
479+
.as_ref()
480+
};
481+
let device_array = unsafe {
482+
device_array
483+
.pointer_checked(Some(ARROW_DEVICE_ARRAY_CAPSULE_NAME))?
484+
.cast::<ArrowDeviceArray>()
485+
.as_ref()
486+
};
487+
488+
let summary = PyDict::new(py);
489+
summary.set_item("schema_live", schema.release.is_some())?;
490+
summary.set_item("array_live", device_array.array.release.is_some())?;
491+
summary.set_item("is_cuda", device_array.device_type == ARROW_DEVICE_CUDA)?;
492+
summary.set_item("device_type", device_array.device_type)?;
493+
summary.set_item("device_id", device_array.device_id)?;
494+
summary.set_item("length", device_array.array.length)?;
495+
summary.set_item("null_count", device_array.array.null_count)?;
496+
summary.set_item("n_buffers", device_array.array.n_buffers)?;
497+
summary.set_item("n_children", device_array.array.n_children)?;
498+
Ok(summary)
499+
}
500+
501+
/// Simulate a Python Arrow Device consumer taking ownership from the returned capsules.
502+
#[pyfunction]
503+
fn _debug_consume_arrow_device_array_capsules(
504+
schema: Bound<'_, PyCapsule>,
505+
device_array: Bound<'_, PyCapsule>,
506+
) -> PyResult<(bool, bool, bool, bool, bool, bool)> {
507+
let mut schema_ptr = schema
508+
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
509+
.cast::<FFI_ArrowSchema>();
510+
let mut device_array_ptr = device_array
511+
.pointer_checked(Some(ARROW_DEVICE_ARRAY_CAPSULE_NAME))?
512+
.cast::<ArrowDeviceArray>();
513+
514+
let schema_ref = unsafe { schema_ptr.as_mut() };
515+
let device_array_ref = unsafe { device_array_ptr.as_mut() };
516+
let schema_had_release = schema_ref.release.is_some();
517+
let array_had_release = device_array_ref.array.release.is_some();
518+
519+
release_schema(schema_ref);
520+
release_device_array(device_array_ref);
521+
522+
let schema_release_cleared = schema_ref.release.is_none();
523+
let array_release_cleared = device_array_ref.array.release.is_none();
524+
525+
set_capsule_name(&schema, USED_ARROW_SCHEMA_CAPSULE_NAME)?;
526+
set_capsule_name(&device_array, USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME)?;
527+
528+
Ok((
529+
schema_had_release,
530+
array_had_release,
531+
schema_release_cleared,
532+
array_release_cleared,
533+
schema.is_valid_checked(Some(USED_ARROW_SCHEMA_CAPSULE_NAME)),
534+
device_array.is_valid_checked(Some(USED_ARROW_DEVICE_ARRAY_CAPSULE_NAME)),
535+
))
536+
}
537+
538+
fn set_capsule_name(capsule: &Bound<'_, PyCapsule>, name: &CStr) -> PyResult<()> {
539+
let result = unsafe { ffi::PyCapsule_SetName(capsule.as_ptr(), name.as_ptr()) };
540+
if result != 0 {
541+
return Err(PyErr::fetch(capsule.py()));
542+
}
543+
Ok(())
544+
}
545+
464546
fn schema_capsule<'py>(
465547
py: Python<'py>,
466548
schema: FFI_ArrowSchema,
@@ -573,6 +655,14 @@ fn _lib(m: &Bound<PyModule>) -> PyResult<()> {
573655
m.add_function(wrap_pyfunction!(cuda_available, m)?)?;
574656
m.add_function(wrap_pyfunction!(_debug_array_metadata_dtype, m)?)?;
575657
m.add_function(wrap_pyfunction!(_debug_array_metadata_display_values, m)?)?;
658+
m.add_function(wrap_pyfunction!(
659+
_debug_arrow_device_array_capsule_summary,
660+
m
661+
)?)?;
662+
m.add_function(wrap_pyfunction!(
663+
_debug_consume_arrow_device_array_capsules,
664+
m
665+
)?)?;
576666
m.add_function(wrap_pyfunction!(export_device_array, m)?)?;
577667
Ok(())
578668
}

0 commit comments

Comments
 (0)