-
Notifications
You must be signed in to change notification settings - Fork 172
Expand file tree
/
Copy pathexecutor.rs
More file actions
444 lines (406 loc) · 15.5 KB
/
Copy pathexecutor.rs
File metadata and controls
444 lines (406 loc) · 15.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use cudarc::driver::CudaEvent;
use cudarc::driver::CudaFunction;
use cudarc::driver::CudaSlice;
use cudarc::driver::DeviceRepr;
use cudarc::driver::LaunchArgs;
use cudarc::driver::LaunchConfig;
use futures::future::BoxFuture;
use tracing::debug;
use tracing::trace;
use vortex::array::ArrayRef;
use vortex::array::ArrayVTable;
use vortex::array::Canonical;
use vortex::array::ExecutionCtx;
use vortex::array::IntoArray;
use vortex::array::arrays::Struct;
use vortex::array::arrays::StructArray;
use vortex::array::arrays::struct_::StructDataParts;
use vortex::array::buffer::BufferHandle;
use vortex::dtype::PType;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::error::vortex_err;
use crate::CudaSession;
use crate::ExportDeviceArray;
use crate::hybrid_dispatch;
use crate::kernel::DefaultLaunchStrategy;
use crate::kernel::LaunchStrategy;
use crate::kernel::LaunchStrategyExt;
use crate::kernel::launch_cuda_kernel_impl;
use crate::kernel::launch_cuda_kernel_with_config;
use crate::session::CudaSessionExt;
use crate::stream::VortexCudaStream;
/// CUDA kernel events recorded before and after kernel launch.
#[derive(Debug)]
pub struct CudaKernelEvents {
/// Event recorded before kernel launch.
pub before_launch: CudaEvent,
/// Event recorded after kernel launch.
pub after_launch: CudaEvent,
}
impl CudaKernelEvents {
pub fn duration(&self) -> VortexResult<Duration> {
self.before_launch
.elapsed_ms(&self.after_launch) // synchronizes
.map_err(|e| vortex_err!("failed to get elapsed time: {}", e))
.map(|f| Duration::from_secs_f32(f / 1000.0))
}
}
/// Controls which GPU dispatch strategy is used when executing arrays.
///
/// By default, `execute_cuda` tries fused dynamic dispatch first,
/// then falls back to standalone per-encoding kernels. Benchmarks and tests
/// can force a specific strategy to get stable, isolated measurements.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CudaDispatchMode {
/// Automatically choose the best strategy (current default behavior).
/// Tries fused → partially-fused → standalone → CPU fallback.
#[default]
Auto,
/// Only use standalone per-encoding `CudaExecute` kernels.
/// Skips the fused/partially-fused dynamic dispatch planner entirely.
StandaloneOnly,
/// Only use fused or partially-fused dynamic dispatch.
/// Returns an error if the array is not dyn-dispatch-compatible.
DynDispatchOnly,
}
/// CUDA execution context.
///
/// Provides access to the CUDA context and stream for kernel execution.
/// Handles memory allocation and data transfers between host and device.
pub struct CudaExecutionCtx {
stream: VortexCudaStream,
ctx: ExecutionCtx,
cuda_session: CudaSession,
strategy: Arc<dyn LaunchStrategy>,
dispatch_mode: CudaDispatchMode,
}
impl CudaExecutionCtx {
/// Creates a new CUDA execution context.
pub(crate) fn new(stream: VortexCudaStream, ctx: ExecutionCtx) -> Self {
let cuda_session = ctx.session().cuda_session().clone();
Self {
stream,
ctx,
cuda_session,
strategy: Arc::new(DefaultLaunchStrategy),
dispatch_mode: CudaDispatchMode::Auto,
}
}
/// Get a mutable handle to the CPU execution context.
pub fn execution_ctx(&mut self) -> &mut ExecutionCtx {
&mut self.ctx
}
/// Set the launch strategy for the execution context.
///
/// This can only be set on setup (an "owned" context) and not from within
/// a kernel execution.
pub fn with_launch_strategy(mut self, launch_strategy: Arc<dyn LaunchStrategy>) -> Self {
self.strategy = launch_strategy;
self
}
/// Set the dispatch mode for the execution context.
///
/// This controls whether `execute_cuda` uses fused dynamic dispatch,
/// standalone per-encoding kernels, or the automatic (default) strategy.
pub fn with_dispatch_mode(mut self, dispatch_mode: CudaDispatchMode) -> Self {
self.dispatch_mode = dispatch_mode;
self
}
/// Perform an external kernel launch, with events created and logged via the configured
/// [`LaunchStrategy`].
///
/// We use CUB and NVCOMP routines, and those don't match the normal `cudarc` entrypoints, so
/// to inject the configured launch strategy we need to bracket it ourselves.
pub fn launch_external<F: FnMut() -> VortexResult<()>>(
&self,
len: usize,
function: F,
) -> VortexResult<()> {
self.strategy
.as_ref()
.with_strategy(&self.stream, len, function)
}
/// Launch a Kernel function with args setup done by the provided `build_args` closure.
///
/// Kernels launched this way will use the default launch configuration, which provides no
/// shared memory bytes, and uses grid parameters based on the ideal thread block size for
/// the given `len`.
pub fn launch_kernel<'a, F>(
&'a mut self,
function: &'a CudaFunction,
len: usize,
build_args: F,
) -> VortexResult<()>
where
F: FnOnce(&mut LaunchArgs<'a>),
{
let mut launcher = self.launch_builder(function);
build_args(&mut launcher);
let events = launch_cuda_kernel_impl(&mut launcher, self.strategy.event_flags(), len)?;
self.strategy.on_complete(&events, len)?;
Ok(())
}
/// Launch a function with args provided by the `build_args` closure, with an explicit
/// [`LaunchConfig`], for kernels which need specific grid and shared memory configuration.
pub fn launch_kernel_config<'a, F>(
&'a mut self,
function: &'a CudaFunction,
cfg: LaunchConfig,
len: usize,
build_args: F,
) -> VortexResult<()>
where
F: FnOnce(&mut LaunchArgs<'a>),
{
let mut launcher = self.launch_builder(function);
build_args(&mut launcher);
let events =
launch_cuda_kernel_with_config(&mut launcher, cfg, self.strategy.event_flags())?;
self.strategy.on_complete(&events, len)?;
Ok(())
}
/// Loads a CUDA kernel function by module name and ptype(s).
///
/// # Arguments
///
/// * `module_name` - Name of the module (`kernels/{module_name}.ptx`)
/// * `ptypes` - List of ptype strings for the kernel name
///
/// # Errors
///
/// Returns an error if kernel loading fails.
pub fn load_function(&self, module_name: &str, ptypes: &[PType]) -> VortexResult<CudaFunction> {
let type_suffixes: Vec<String> = ptypes.iter().map(|ptype| ptype.to_string()).collect();
self.load_function_with_suffixes(
module_name,
type_suffixes
.iter()
.map(|t| t.as_str())
.collect::<Vec<_>>()
.as_slice(),
)
}
/// Loads a CUDA kernel function by module name and type suffixes.
///
/// This is a lower-level version of [`load_function`][Self::load_function] that accepts
/// string suffixes directly, useful for types that don't have a `PType` (e.g., i128, i256).
///
/// # Arguments
///
/// * `module_name` - Name of the module (`kernels/{module_name}.ptx`)
/// * `type_suffixes` - List of type suffix strings for the kernel name
///
/// # Errors
///
/// Returns an error if kernel loading fails.
pub(crate) fn load_function_with_suffixes(
&self,
module_name: &str,
type_suffixes: &[&str],
) -> VortexResult<CudaFunction> {
self.cuda_session
.load_function_with_suffixes(module_name, type_suffixes)
}
/// Returns a launch builder for a CUDA kernel function.
///
/// Arguments can be added to the kernel launch with `.arg(buffer)`.
///
/// # Arguments
///
/// * `func` - CUDA kernel function to launch
pub fn launch_builder<'a>(&'a self, func: &'a CudaFunction) -> LaunchArgs<'a> {
self.stream.launch_builder(func)
}
/// Allocates a typed buffer on the GPU.
pub fn device_alloc<T: DeviceRepr + Send + Sync + 'static>(
&self,
len: usize,
) -> VortexResult<CudaSlice<T>> {
self.stream.device_alloc(len)
}
/// Copies host data to the device.
///
/// For **pageable** host memory the source is staged synchronously; for
/// **pinned** memory the transfer is async. In both cases `data` is
/// kept alive by the returned future until the copy completes.
pub fn copy_to_device<T, D>(
&self,
data: D,
) -> VortexResult<BoxFuture<'static, VortexResult<BufferHandle>>>
where
T: DeviceRepr + Debug + Send + Sync + 'static,
D: AsRef<[T]> + Send + 'static,
{
self.stream.copy_to_device(data)
}
/// Ensures a buffer is resident on the device, copying from host if necessary.
///
/// If the buffer is already on the device it is returned as-is. Otherwise
/// copies from host to device.
pub async fn ensure_on_device(&self, handle: BufferHandle) -> VortexResult<BufferHandle> {
if handle.is_on_device() {
return Ok(handle);
}
let host_buffer = handle
.as_host_opt()
.ok_or_else(|| vortex_err!("Buffer is not on host"))?
.clone();
self.stream.copy_to_device(host_buffer)?.await
}
/// Synchronous variant of [`ensure_on_device`](Self::ensure_on_device).
///
/// Safe to call from within an async executor (no nested `block_on`).
/// The copy is enqueued on the stream and completes before any subsequent
/// work on the same stream.
pub fn ensure_on_device_sync(&self, handle: BufferHandle) -> VortexResult<BufferHandle> {
if handle.is_on_device() {
return Ok(handle);
}
let host_buffer = handle
.as_host_opt()
.ok_or_else(|| vortex_err!("Buffer is not on host"))?
.clone();
self.stream.copy_to_device_sync(host_buffer.as_ref())
}
/// Returns a reference to the underlying [`VortexCudaStream`].
///
/// Through [`Deref`][std::ops::Deref], this also provides access to the
/// inner [`Arc<CudaStream>`] and all of cudarc's stream methods.
pub fn stream(&self) -> &VortexCudaStream {
&self.stream
}
/// Returns the Vortex session backing this CUDA execution context.
#[cfg(feature = "unstable_encodings")]
pub(crate) fn session(&self) -> &vortex::session::VortexSession {
self.ctx.session()
}
/// Returns the current dispatch mode.
pub fn dispatch_mode(&self) -> CudaDispatchMode {
self.dispatch_mode
}
/// Returns a reference to the CUDA session.
pub(crate) fn cuda_session(&self) -> &CudaSession {
&self.cuda_session
}
/// Get a handle to the exporter that can convert arrays into `ArrowDeviceArray`.
pub fn exporter(&self) -> &Arc<dyn ExportDeviceArray> {
self.cuda_session.export_device_array()
}
pub fn synchronize_stream(&self) -> VortexResult<()> {
self.stream
.synchronize()
.map_err(|e| vortex_err!("cuda error: {e}"))
}
}
/// Support trait for CUDA-accelerated decompression of arrays.
///
/// # Execution model
///
/// Work is enqueued onto a single CUDA stream and executes in FIFO order.
/// Kernel launches are synchronous fire-and-forget: They enqueue work and
/// return immediately. The returned [`Canonical`] may reference device buffers
/// with in-flight writes.
///
/// ## Pageable vs. page-locked (pinned) host memory
///
/// Whether the H2D transfer is asynchronous depends on whether the source
/// memory is page-locked:
///
/// - **Page-locked memory** (allocated via `cuMemAllocHost` / `cudaMallocHost`):
/// the GPU's DMA engine holds a stable physical address for the allocation and
/// can transfer directly without CPU involvement. The call returns immediately
/// and the copy proceeds in parallel with subsequent CPU work.
///
/// - **Pageable memory** (ordinary `malloc` / Rust allocator): CUDA must first
/// stage the data through an internal page-locked bounce buffer, performing a
/// CPU `memcpy` into that buffer before the DMA can begin. The `memcpy_htod_async`
/// call blocks until the staging copy finishes, making the transfer effectively
/// synchronous from the caller's perspective, though the DMA itself still runs
/// on the stream.
///
///
/// ## Synchronisation
///
/// To insert an explicit sync point, use `await_stream_callback`, which completes
/// when all preceding stream work — including in-flight kernels — has finished.
#[async_trait]
pub trait CudaExecute: 'static + Send + Sync + Debug {
/// Executes the array on CUDA, returning a canonical array.
///
/// # Errors
///
/// Returns an error if execution fails on the GPU.
async fn execute(&self, array: ArrayRef, ctx: &mut CudaExecutionCtx)
-> VortexResult<Canonical>;
}
/// Extension trait for executing arrays on CUDA.
#[async_trait]
pub trait CudaArrayExt {
/// Recursively walks the encoding tree, dispatching each layer to its
/// registered [`CudaExecute`] implementation and returning a canonical array
/// on the device.
///
/// See [`CudaExecute`] for details on the execution model.
///
/// Falls back to CPU execution if no CUDA support is registered for the
/// encoding.
async fn execute_cuda(self, ctx: &mut CudaExecutionCtx) -> VortexResult<Canonical>;
}
#[async_trait]
impl CudaArrayExt for ArrayRef {
#[expect(clippy::unwrap_used)]
async fn execute_cuda(self, ctx: &mut CudaExecutionCtx) -> VortexResult<Canonical> {
if self.encoding_id() == Struct.id() {
let len = self.len();
let StructDataParts {
fields,
struct_fields,
validity,
..
} = self.try_downcast::<Struct>().unwrap().into_data_parts();
let mut cuda_fields = Vec::with_capacity(fields.len());
for field in fields.iter() {
cuda_fields.push(field.clone().execute_cuda(ctx).await?.into_array());
}
return Ok(Canonical::Struct(StructArray::new(
struct_fields.names().clone(),
cuda_fields,
len,
validity,
)));
}
if self.is_canonical() || self.is_empty() {
trace!(encoding = ?self.encoding_id(), "skipping canonical");
return self.execute(&mut ctx.ctx);
}
// Try all GPU execution strategies: fused dynamic dispatch, partial
// fusion with subtree fallbacks, and single-kernel fallback.
// If none succeed, fall back to CPU execution only when all buffers
// remain host-resident.
let gpu_error = match hybrid_dispatch::try_gpu_dispatch(&self, ctx).await {
Ok(canonical) => return Ok(canonical),
Err(e) => {
debug!(
encoding = %self.encoding_id(),
error = %e,
"No GPU execution path available, falling back to CPU"
);
e
}
};
if !self.is_host() {
vortex_bail!(
"GPU execution for encoding {} failed ({gpu_error}); CPU fallback with device-resident buffers is not supported",
self.encoding_id()
);
}
self.execute(&mut ctx.ctx)
}
}