Skip to content

Commit e13182b

Browse files
committed
Initial implementation of IPC-enabled events. Changes the type of max_size memory resource attribute to size_t from int32. Various updates and additions to test helpers.
1 parent ebd5453 commit e13182b

11 files changed

Lines changed: 629 additions & 93 deletions

File tree

cuda_core/cuda/core/experimental/_device.pyx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ from cuda.core.experimental._utils.cuda_utils import (
2828
from cuda.core.experimental._stream cimport default_stream
2929

3030

31+
3132
# TODO: I prefer to type these as "cdef object" and avoid accessing them from within Python,
3233
# but it seems it is very convenient to expose them for testing purposes...
3334
_tls = threading.local()
@@ -1273,7 +1274,7 @@ class Device:
12731274
"""
12741275
self._check_context_initialized()
12751276
ctx = self._get_current_context()
1276-
return Event._init(self._id, ctx, options)
1277+
return Event._init(self._id, ctx, options, True)
12771278

12781279
def allocate(self, size, stream: Optional[Stream] = None) -> Buffer:
12791280
"""Allocate device memory from a specified stream.

cuda_core/cuda/core/experimental/_event.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ cdef class Event:
1111
cydriver.CUevent _handle
1212
bint _timing_disabled
1313
bint _busy_waited
14+
bint _ipc_enabled
15+
object _ipc_descriptor
1416
int _device_id
1517
object _ctx_handle
1618

cuda_core/cuda/core/experimental/_event.pyx

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
from __future__ import annotations
66

7+
cimport cpython
78
from libc.stdint cimport uintptr_t
9+
from libc.string cimport memcpy
810

911
from cuda.bindings cimport cydriver
1012

@@ -14,12 +16,14 @@ from cuda.core.experimental._utils.cuda_utils cimport (
1416
)
1517

1618
from dataclasses import dataclass
19+
import multiprocessing
1720
from typing import TYPE_CHECKING, Optional
1821

1922
from cuda.core.experimental._context import Context
2023
from cuda.core.experimental._utils.cuda_utils import (
2124
CUDAError,
2225
driver,
26+
handle_return,
2327
)
2428
if TYPE_CHECKING:
2529
import cuda.bindings
@@ -40,15 +44,15 @@ cdef class EventOptions:
4044
has actually been completed.
4145
Otherwise, the CPU thread will busy-wait until the event has
4246
been completed. (Default to False)
43-
support_ipc : bool, optional
47+
ipc_enabled : bool, optional
4448
Event will be suitable for interprocess use.
4549
Note that enable_timing must be False. (Default to False)
4650
4751
"""
4852

4953
enable_timing: Optional[bool] = False
5054
busy_waited_sync: Optional[bool] = False
51-
support_ipc: Optional[bool] = False
55+
ipc_enabled: Optional[bool] = False
5256

5357

5458
cdef class Event:
@@ -86,24 +90,35 @@ cdef class Event:
8690
raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).")
8791

8892
@classmethod
89-
def _init(cls, device_id: int, ctx_handle: Context, options=None):
93+
def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False):
9094
cdef Event self = Event.__new__(cls)
9195
cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options")
9296
cdef unsigned int flags = 0x0
9397
self._timing_disabled = False
9498
self._busy_waited = False
99+
self._ipc_enabled = False
100+
self._ipc_descriptor = None
95101
if not opts.enable_timing:
96102
flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING
97103
self._timing_disabled = True
98104
if opts.busy_waited_sync:
99105
flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC
100106
self._busy_waited = True
101-
if opts.support_ipc:
102-
raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103")
107+
if opts.ipc_enabled:
108+
if is_free:
109+
raise TypeError(
110+
"IPC-enabled events must be bound; use Stream.record for creation."
111+
)
112+
flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS
113+
self._ipc_enabled = True
114+
if not self._timing_disabled:
115+
raise TypeError("IPC-enabled events cannot use timing.")
103116
with nogil:
104117
HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags))
105118
self._device_id = device_id
106119
self._ctx_handle = ctx_handle
120+
if opts.ipc_enabled:
121+
self.get_ipc_descriptor()
107122
return self
108123

109124
cpdef close(self):
@@ -151,6 +166,40 @@ cdef class Event:
151166
raise CUDAError(err)
152167
raise RuntimeError(explanation)
153168

169+
def get_ipc_descriptor(self) -> IPCEventDescriptor:
170+
"""Export an event allocated for sharing between processes."""
171+
if self._ipc_descriptor is not None:
172+
return self._ipc_descriptor
173+
if not self.is_ipc_enabled:
174+
raise RuntimeError("Event is not IPC-enabled")
175+
cdef cydriver.CUipcEventHandle data
176+
with nogil:
177+
HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, <cydriver.CUevent>(self._handle)))
178+
cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved))
179+
self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited)
180+
return self._ipc_descriptor
181+
182+
@classmethod
183+
def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event:
184+
"""Import an event that was exported from another process."""
185+
cdef cydriver.CUipcEventHandle data
186+
memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved))
187+
cdef Event self = Event.__new__(cls)
188+
with nogil:
189+
HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data))
190+
self._timing_disabled = True
191+
self._busy_waited = ipc_descriptor._busy_waited
192+
self._ipc_enabled = True
193+
self._ipc_descriptor = ipc_descriptor
194+
self._device_id = -1 # ??
195+
self._ctx_handle = None # ??
196+
return self
197+
198+
@property
199+
def is_ipc_enabled(self) -> bool:
200+
"""Return True if the event can be shared across process boundaries, otherwise False."""
201+
return self._ipc_enabled
202+
154203
@property
155204
def is_timing_disabled(self) -> bool:
156205
"""Return True if the event does not record timing data, otherwise False."""
@@ -161,11 +210,6 @@ cdef class Event:
161210
"""Return True if the event synchronization would keep the CPU busy-waiting, otherwise False."""
162211
return self._busy_waited
163212

164-
@property
165-
def is_ipc_supported(self) -> bool:
166-
"""Return True if this event can be used as an interprocess event, otherwise False."""
167-
raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103")
168-
169213
def sync(self):
170214
"""Synchronize until the event completes.
171215
@@ -212,12 +256,43 @@ cdef class Event:
212256
context is set current after a event is created.
213257

214258
"""
215-
216-
from cuda.core.experimental._device import Device # avoid circular import
217-
218-
return Device(self._device_id)
259+
if self._device_id >= 0:
260+
from ._device import Device # avoid circular import
261+
return Device(self._device_id)
219262

220263
@property
221264
def context(self) -> Context:
222265
"""Return the :obj:`~_context.Context` associated with this event."""
223-
return Context._from_ctx(self._ctx_handle, self._device_id)
266+
if self._ctx_handle is not None and self._device_id >= 0:
267+
return Context._from_ctx(self._ctx_handle, self._device_id)
268+
269+
270+
cdef class IPCEventDescriptor:
271+
"""Serializable object describing an event that can be shared between processes."""
272+
273+
cdef:
274+
bytes _reserved
275+
bint _busy_waited
276+
277+
def __init__(self, *arg, **kwargs):
278+
raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.")
279+
280+
@classmethod
281+
def _init(cls, reserved: bytes, busy_waited: bint):
282+
cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls)
283+
self._reserved = reserved
284+
self._busy_waited = busy_waited
285+
return self
286+
287+
def __eq__(self, IPCEventDescriptor rhs):
288+
# No need to check self._busy_waited.
289+
return self._reserved == rhs._reserved
290+
291+
def __reduce__(self):
292+
return self._init, (self._reserved, self._busy_waited)
293+
294+
295+
def _reduce_event(event):
296+
return event.from_ipc_descriptor, (event.get_ipc_descriptor(),)
297+
298+
multiprocessing.reduction.register(Event, _reduce_event)

cuda_core/cuda/core/experimental/_memory.pyx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,11 @@ cdef class Buffer(_cyBuffer, MemoryResourceAttributes):
231231
if stream is None:
232232
# Note: match this behavior to DeviceMemoryResource.allocate()
233233
stream = default_stream()
234-
cdef cydriver.CUmemPoolPtrExportData share_data
235-
memcpy(share_data.reserved, <const void*><const char*>(ipc_buffer._reserved), sizeof(share_data.reserved))
234+
cdef cydriver.CUmemPoolPtrExportData data
235+
memcpy(data.reserved, <const void*><const char*>(ipc_buffer._reserved), sizeof(data.reserved))
236236
cdef cydriver.CUdeviceptr ptr
237237
with nogil:
238-
HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &share_data))
238+
HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &data))
239239
return Buffer._init(<intptr_t>ptr, ipc_buffer.size, mr, stream)
240240

241241
def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer:
@@ -516,7 +516,7 @@ cdef class DeviceMemoryResourceOptions:
516516
(Default to 0)
517517
"""
518518
ipc_enabled : cython.bint = False
519-
max_size : cython.int = 0
519+
max_size : cython.size_t = 0
520520

521521

522522
# TODO: cythonize this?

cuda_core/cuda/core/experimental/_stream.pyx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,13 @@ cdef class Stream:
262262
# and CU_EVENT_RECORD_EXTERNAL, can be set in EventOptions.
263263
if event is None:
264264
self._get_device_and_context()
265-
event = Event._init(<int>(self._device_id), <uintptr_t>(self._ctx_handle), options)
265+
event = Event._init(<int>(self._device_id), <uintptr_t>(self._ctx_handle), options, False)
266+
elif event.is_ipc_enabled:
267+
raise TypeError(
268+
"IPC-enabled events should not be re-recorded, instead create a "
269+
"new event by supplying options."
270+
)
271+
266272
cdef cydriver.CUevent e = (<cyEvent?>(event))._handle
267273
with nogil:
268274
HANDLE_RETURN(cydriver.cuEventRecord(e, self._handle))
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import cuda_python_test_helpers
2323
except ImportError:
2424
# Import shared platform helpers for tests across repos
25-
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "cuda_python_test_helpers"))
25+
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3] / "cuda_python_test_helpers"))
2626
import cuda_python_test_helpers
2727

2828

cuda_core/tests/helpers/latch.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from cuda.core.experimental import (
5+
LaunchConfig,
6+
LegacyPinnedMemoryResource,
7+
Program,
8+
ProgramOptions,
9+
launch,
10+
)
11+
import helpers
12+
import ctypes
13+
14+
class LatchKernel:
15+
"""
16+
Manages a kernel that blocks progress until released.
17+
"""
18+
19+
def __init__(self, device):
20+
code = """
21+
#include <cuda/atomic>
22+
23+
extern "C"
24+
__global__ void latch(int* val) {
25+
cuda::atomic_ref<int, cuda::thread_scope_system> signal{*val};
26+
while (true) {
27+
if (signal.load(cuda::memory_order_relaxed)) {
28+
break;
29+
}
30+
}
31+
}
32+
"""
33+
program_options = ProgramOptions(
34+
std="c++17",
35+
arch=f"sm_{''.join(f'{i}' for i in device.compute_capability)}",
36+
include_path=helpers.CCCL_INCLUDE_PATHS,
37+
)
38+
prog = Program(code, code_type="c++", options=program_options)
39+
mod = prog.compile(target_type="cubin")
40+
self.kernel = mod.get_kernel("latch")
41+
42+
mr = LegacyPinnedMemoryResource()
43+
self.buffer = mr.allocate(4)
44+
self.busy_wait_flag[0] = 0
45+
46+
def launch(self, stream):
47+
config = LaunchConfig(grid=1, block=1)
48+
launch(stream, config, self.kernel, self.busy_wait_flag_address)
49+
50+
def release(self):
51+
self.busy_wait_flag[0] = 1
52+
53+
@property
54+
def busy_wait_flag_address(self):
55+
return int(self.buffer.handle)
56+
57+
@property
58+
def busy_wait_flag(self):
59+
return ctypes.cast(self.busy_wait_flag_address, ctypes.POINTER(ctypes.c_int32))
60+
61+
def close(self):
62+
buffer = getattr(self, 'buffer', None)
63+
if buffer is not None:
64+
buffer.close()
65+
66+
def __del__(self):
67+
self.close()
68+
69+

0 commit comments

Comments
 (0)