Skip to content

Commit 812c277

Browse files
authored
feat[gpu]: arrow device array stream support (#8483)
Adds Arrow device stream support which is exercised and tested through the cuDF harness. --------- Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 35e4d72 commit 812c277

11 files changed

Lines changed: 868 additions & 52 deletions

File tree

vortex-cuda/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,15 @@ cmake -S cpp -B cpp/build \
3939
-DCMAKE_EXE_LINKER_FLAGS="-Wl,--stub-group-size=1048576" \
4040
-GNinja && cmake --build cpp/build --target INTEROP_TEST --parallel
4141
```
42+
43+
## Running the cuDF test harness
44+
45+
```sh
46+
cargo build -p vortex-test-e2e-cuda
47+
cmake --build /path/to/cudf-test-harness/build --target cudf-test-harness --parallel
48+
49+
LD_LIBRARY_PATH=/usr/local/cuda-13.1/compat \
50+
target/debug/cudf_harness_runner \
51+
/path/to/cudf-test-harness/build/cudf-test-harness \
52+
target/debug/libvortex_test_e2e_cuda.so
53+
```

vortex-cuda/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ fn generate_arrow_device_array_bindings(manifest_dir: &Path, out_dir: &Path) {
206206
.header(header.to_string_lossy())
207207
.allowlist_type("ArrowArray")
208208
.allowlist_type("ArrowDeviceArray")
209+
.allowlist_type("ArrowDeviceArrayStream")
209210
.allowlist_type("ArrowDeviceType")
210211
.allowlist_var("ARROW_DEVICE_.*")
211212
// ArrowArray/ArrowDeviceArray own producer state through release/private_data.

vortex-cuda/ffi/cinclude/vortex_cuda.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ struct ArrowDeviceArray {
4343
};
4444
#endif
4545

46+
#if !defined(ARROW_C_DEVICE_STREAM_INTERFACE) && !defined(USE_OWN_ARROW_DEVICE)
47+
#define ARROW_C_DEVICE_STREAM_INTERFACE
48+
struct ArrowDeviceArrayStream {
49+
ArrowDeviceType device_type;
50+
int (*get_schema)(struct ArrowDeviceArrayStream *, struct ArrowSchema *out);
51+
int (*get_next)(struct ArrowDeviceArrayStream *, struct ArrowDeviceArray *out);
52+
const char *(*get_last_error)(struct ArrowDeviceArrayStream *);
53+
void (*release)(struct ArrowDeviceArrayStream *);
54+
void *private_data;
55+
};
56+
#endif
57+
4658
/**
4759
* Create a CUDA Vortex session.
4860
*
@@ -69,6 +81,25 @@ int vx_cuda_array_export_arrow_device(const vx_session *session,
6981
struct ArrowDeviceArray *out_array,
7082
vx_error **error_out);
7183

84+
/**
85+
* Consume a Vortex partition and scan it as an Arrow C Device stream.
86+
*
87+
* This function takes ownership of `partition`. Callers must not free or reuse
88+
* it after calling this function, regardless of success or failure.
89+
*
90+
* On success returns 0 and writes an owned `ArrowDeviceArrayStream` to
91+
* `out_stream`. The stream owns the resulting scan iterator. The caller must
92+
* release the stream through its embedded Arrow `release` callback, and must
93+
* release each produced `ArrowDeviceArray` through its embedded
94+
* `ArrowArray.release` callback.
95+
*
96+
* On error returns 1 and writes a `vx_error` to `error_out` when non-NULL.
97+
*/
98+
int vx_cuda_partition_scan_arrow_device_stream(const vx_session *session,
99+
vx_partition *partition,
100+
struct ArrowDeviceArrayStream *out_stream,
101+
vx_error **error_out);
102+
72103
#ifdef __cplusplus
73104
}
74105
#endif

vortex-cuda/ffi/src/lib.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@ use vortex::error::vortex_ensure;
1717
use vortex::session::VortexSession;
1818
use vortex_cuda::CudaSession;
1919
use vortex_cuda::arrow::ArrowDeviceArray;
20+
use vortex_cuda::arrow::ArrowDeviceArrayStream;
2021
use vortex_cuda::arrow::DeviceArrayExt;
22+
use vortex_cuda::arrow::DeviceArrayStreamExt;
23+
use vortex_ffi::ffi_runtime;
2124
use vortex_ffi::try_or;
2225
use vortex_ffi::vx_array;
2326
use vortex_ffi::vx_array_ref;
2427
use vortex_ffi::vx_error;
28+
use vortex_ffi::vx_partition;
29+
use vortex_ffi::vx_partition_into_array_stream;
2530
use vortex_ffi::vx_session;
2631
use vortex_ffi::vx_session_new_with;
2732
use vortex_ffi::vx_session_ref;
2833

2934
const VX_CUDA_OK: c_int = 0;
3035
const VX_CUDA_ERR: c_int = 1;
3136

37+
/// Return a Vortex session with a [`CudaSession`] session variable.
38+
///
39+
/// If `session` already has CUDA support, this returns a clone of it. Otherwise it
40+
/// returns a new session cloned from `session` with a default [`CudaSession`] attached.
3241
fn session_with_cuda(session: &VortexSession) -> VortexResult<VortexSession> {
3342
if session.get_opt::<CudaSession>().is_some() {
3443
return Ok(session.clone());
@@ -100,6 +109,46 @@ pub unsafe extern "C-unwind" fn vx_cuda_array_export_arrow_device(
100109
})
101110
}
102111

112+
/// Consume a Vortex partition and scan it as an Arrow C Device stream.
113+
///
114+
/// This function takes ownership of `partition`. Callers must not free or reuse it after calling
115+
/// this function, regardless of success or failure.
116+
///
117+
/// On success returns `0` and writes an owned `ArrowDeviceArrayStream` to `out_stream`. The stream
118+
/// owns the resulting scan iterator. The caller must release the stream through its embedded Arrow
119+
/// `release` callback, and must release each produced `ArrowDeviceArray` through its embedded
120+
/// `ArrowArray.release` callback.
121+
///
122+
/// On error returns `1` and, when `error_out` is non-null, writes a `vx_error` (free with
123+
/// `vx_error_free`).
124+
///
125+
/// # Safety
126+
///
127+
/// `session` must be a valid borrowed handle created by `vortex-ffi`. `partition` must be an owned
128+
/// partition handle created by `vortex-ffi`. `out_stream` must be a valid writable pointer. If
129+
/// `error_out` is non-null, it must be valid for writing one error pointer.
130+
#[unsafe(no_mangle)]
131+
pub unsafe extern "C-unwind" fn vx_cuda_partition_scan_arrow_device_stream(
132+
session: *const vx_session,
133+
partition: *mut vx_partition,
134+
out_stream: *mut ArrowDeviceArrayStream,
135+
error_out: *mut *mut vx_error,
136+
) -> c_int {
137+
try_or(error_out, VX_CUDA_ERR, || {
138+
vortex_ensure!(!partition.is_null(), "null vx_partition");
139+
140+
let array_stream = unsafe { vx_partition_into_array_stream(partition) }?;
141+
vortex_ensure!(!out_stream.is_null(), "null ArrowDeviceArrayStream output");
142+
143+
let session = session_with_cuda(unsafe { vx_session_ref(session) }?)?;
144+
// Drive the stream on the same runtime the partition's scan spawned its work onto.
145+
let device_stream = array_stream.export_device_array_stream(&session, ffi_runtime())?;
146+
147+
unsafe { ptr::write(out_stream, device_stream) };
148+
Ok(VX_CUDA_OK)
149+
})
150+
}
151+
103152
#[cfg(test)]
104153
mod tests {
105154
use std::ptr;

0 commit comments

Comments
 (0)