Skip to content

Commit fbcf3b3

Browse files
committed
Add docstrings. Change is_imported to is_mapped. Register DeviceMemoryResource reduction with multiprocessing. Add a quick exit to from_allocation_handle. Simplify the worker pool tests based on the new reduction method.
1 parent e0d0bf4 commit fbcf3b3

File tree

5 files changed

+192
-87
lines changed

5 files changed

+192
-87
lines changed

cuda_core/cuda/core/experimental/_memory.pyx

Lines changed: 120 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -532,14 +532,16 @@ class DeviceMemoryResourceAttributes:
532532

533533
del mempool_property
534534

535+
535536
# Holds DeviceMemoryResource objects imported by this process.
536537
# This enables buffer serialization, as buffers can reduce to a pair
537538
# of comprising the memory resource UUID (the key into this registry)
538539
# and the serialized buffer descriptor.
539540
_ipc_registry = {}
540541

541542
class DeviceMemoryResource(MemoryResource):
542-
"""Create a device memory resource managing a stream-ordered memory pool.
543+
"""
544+
Create a device memory resource managing a stream-ordered memory pool.
543545
544546
Parameters
545547
----------
@@ -560,9 +562,63 @@ class DeviceMemoryResource(MemoryResource):
560562
When using an existing (current or default) memory pool, the returned
561563
device memory resource does not own the pool (`is_handle_owned` is
562564
`False`), and closing the resource has no effect.
565+
566+
IPC-Enabled Memory Resources
567+
----------------------------
568+
If ``ipc_enabled=True`` is specified as an initializer option, the memory
569+
resource constructed will be capable of sharing allocations between
570+
processes. Sharing an allocation is a two-step procedure that involves
571+
mapping a memory resource and then mapping buffers owned by that resource.
572+
These steps can be accomplished in several ways.
573+
574+
An IPC-enabled memory resource (MR) can allocate memory buffers but cannot
575+
receive shared buffers. Mapping an MR to another process creates a "mapped
576+
memory resource" (MMR). An MMR cannot allocate memory buffers and can only
577+
receive shared buffers. MRs and MMRs are both of type
578+
:class:`DeviceMemoryResource` and can be distinguished via
579+
:attr:`DeviceMemoryResource.is_mapped`.
580+
581+
An MR is shared via an allocation handle obtained by calling
582+
:meth:`DeviceMemoryResource.get_allocation_handle`. The allocation handle
583+
has a platform-specific interpretation; however, memory IPC is currently
584+
only supported for Linux, and in that case allocation handles are file
585+
descriptors. After sending an allocation handle to another process, it can
586+
be used to create an MMR by invoking
587+
:meth:`DeviceMemoryResource.from_allocation_handle`.
588+
589+
Buffers can be shared as serializable descriptors obtained by calling
590+
:meth:`Buffer.get_ipc_descriptor`. In a receiving process, a shared buffer is
591+
created by invoking :meth:`Buffer.from_ipc_descriptor` with an MMR and
592+
buffer descriptor, where the MMR corresponds to the MR that created the
593+
described buffer.
594+
595+
To help manage the association between memory resources and buffers, a
596+
registry is provided. Every MR has a unique identifier (UUID). MMRs can be
597+
registered by calling :meth:`DeviceMemoryResource.register` with the UUID
598+
of the corresponding MR. Registered MMRs can be looked up via
599+
:meth:`DeviceMemoryResource.from_registry`. When registering MMRs in this
600+
way, the use of buffer descriptors can be avoided. Instead, buffer objects
601+
can themselves be serialized and transferred directly. Serialization embeds
602+
the UUID, which is used to locate the correct MMR during reconstruction.
603+
604+
IPC-enabled memory resources interoperate with the :mod:`multiprocessing`
605+
module to provide a simplified interface. This approach can avoid direct
606+
use of allocation handles, buffer descriptors, MMRs, and the registry. When
607+
using :mod:`multiprocessing` to spawn processes or send objects through
608+
communication channels such as :class:`multiprocessing.Queue`,
609+
:class:`multiprocessing.Pipe`, or :class:`multiprocessing.Connection`,
610+
:class:`Buffer` objects may be sent directly, and in such cases the process
611+
for creating MMRs and mapping buffers will be handled automatically.
612+
613+
For greater efficiency when transferring many buffers, one may also send
614+
MRs and buffers separately. When an MR is sent via :mod:`multiprocessing`,
615+
an MMR is created and registered in the receiving process. Subsequently,
616+
buffers may be serialized and transferred using ordinary :mod:`pickle`
617+
methods. The reconstruction procedure uses the registry to find the
618+
associated MMR.
563619
"""
564620
__slots__ = ("_dev_id", "_mempool_handle", "_attributes", "_ipc_handle_type",
565-
"_mempool_owned", "_is_imported", "_uuid", "_alloc_handle")
621+
"_mempool_owned", "_is_mapped", "_uuid", "_alloc_handle")
566622

567623
def __init__(self, device_id: int | Device, options=None):
568624
device_id = getattr(device_id, 'device_id', device_id)
@@ -577,7 +633,7 @@ class DeviceMemoryResource(MemoryResource):
577633
self._attributes = None
578634
self._ipc_handle_type = _NOIPC_HANDLE_TYPE
579635
self._mempool_owned = False
580-
self._is_imported = False
636+
self._is_mapped = False
581637
self._uuid = None
582638
self._alloc_handle = None
583639

@@ -620,7 +676,7 @@ class DeviceMemoryResource(MemoryResource):
620676
self._attributes = None
621677
self._ipc_handle_type = properties.handleTypes
622678
self._mempool_owned = True
623-
self._is_imported = False
679+
self._is_mapped = False
624680
self._uuid = None
625681
self._alloc_handle = None
626682

@@ -641,38 +697,46 @@ class DeviceMemoryResource(MemoryResource):
641697
err, = driver.cuMemPoolDestroy(self._mempool_handle)
642698
raise_if_driver_error(err)
643699
finally:
644-
self.unregister()
700+
if self.is_mapped:
701+
self.unregister()
645702
self._dev_id = None
646703
self._mempool_handle = None
647704
self._attributes = None
648705
self._ipc_handle_type = _NOIPC_HANDLE_TYPE
649706
self._mempool_owned = False
650-
self._is_imported = False
707+
self._is_mapped = False
651708
self._uuid = None
652709
self._alloc_handle = None
653710

654711

655712
def __reduce__(self):
656-
# If spawning a new process, serialize the resources; otherwise, just
657-
# send the UUID, using the registry on the receiving end.
658-
import multiprocessing
659-
is_spawning = multiprocessing.context.get_spawning_popen() is not None
660-
if is_spawning:
661-
from ._device import Device
662-
device = Device(self.device_id)
663-
alloc_handle = self.get_allocation_handle()
664-
return DeviceMemoryResource.from_allocation_handle, (device, alloc_handle)
665-
else:
666-
return DeviceMemoryResource.from_registry, (self.uuid,)
713+
return DeviceMemoryResource.from_registry, (self.uuid,)
667714

668715
@staticmethod
669-
def from_registry(uuid: uuid.UUID):
716+
def from_registry(uuid: uuid.UUID) -> DeviceMemoryResource:
717+
"""
718+
Obtain a registered mapped memory resource.
719+
720+
Raises
721+
------
722+
RuntimeError
723+
If no mapped memory resource is found in the registry.
724+
"""
725+
670726
try:
671727
return _ipc_registry[uuid]
672728
except KeyError:
673729
raise RuntimeError(f"Memory resource {uuid} was not found") from None
674730

675-
def register(self, uuid: uuid.UUID):
731+
def register(self, uuid: uuid.UUID) -> DeviceMemoryResource:
732+
"""
733+
Register a mapped memory resource.
734+
735+
Returns
736+
-------
737+
The registered mapped memory resource. If one was previously registered
738+
with the given key, it is returned.
739+
"""
676740
existing = _ipc_registry.get(uuid)
677741
if existing is not None:
678742
return existing
@@ -682,12 +746,18 @@ class DeviceMemoryResource(MemoryResource):
682746
return self
683747

684748
def unregister(self):
685-
if _ipc_registry is not None:
749+
"""Unregister this mapped memory resource."""
750+
assert self.is_mapped
751+
if _ipc_registry is not None: # can occur during shutdown catastrophe
686752
with contextlib.suppress(KeyError):
687753
del _ipc_registry[self.uuid]
688754

689755
@property
690-
def uuid(self):
756+
def uuid(self) -> Optional[uuid.UUID]:
757+
"""
758+
A universally unique identifier for this memory resource. Meaningful
759+
only for IPC-enabled memory resources.
760+
"""
691761
return self._uuid
692762

693763
@classmethod
@@ -711,6 +781,12 @@ class DeviceMemoryResource(MemoryResource):
711781
-------
712782
A new device memory resource instance with the imported handle.
713783
"""
784+
# Quick exit for registry hits.
785+
uuid = getattr(alloc_handle, 'uuid', None)
786+
self = _ipc_registry.get(uuid)
787+
if self is not None:
788+
return self
789+
714790
device_id = getattr(device_id, 'device_id', device_id)
715791

716792
self = cls.__new__(cls)
@@ -719,15 +795,15 @@ class DeviceMemoryResource(MemoryResource):
719795
self._attributes = None
720796
self._ipc_handle_type = _IPC_HANDLE_TYPE
721797
self._mempool_owned = True
722-
self._is_imported = True
798+
self._is_mapped = True
723799
self._uuid = None
724800
self._alloc_handle = None # only used for non-imported
725801

726802
err, self._mempool_handle = driver.cuMemPoolImportFromShareableHandle(int(alloc_handle), _IPC_HANDLE_TYPE, 0)
727803
raise_if_driver_error(err)
728-
uuid = getattr(alloc_handle, 'uuid', None)
729804
if uuid is not None:
730-
self = self.register(uuid)
805+
registered = self.register(uuid)
806+
assert registered is self
731807
return self
732808

733809
def get_allocation_handle(self) -> IPCAllocationHandle:
@@ -743,13 +819,13 @@ class DeviceMemoryResource(MemoryResource):
743819
if self._alloc_handle is None:
744820
if not self.is_ipc_enabled:
745821
raise RuntimeError("Memory resource is not IPC-enabled")
746-
if self._is_imported:
822+
if self._is_mapped:
747823
raise RuntimeError("Imported memory resource cannot be exported")
748824
err, alloc_handle = driver.cuMemPoolExportToShareableHandle(self._mempool_handle, _IPC_HANDLE_TYPE, 0)
749825
raise_if_driver_error(err)
750826
try:
751827
assert self._uuid is None
752-
import uuid as uuid
828+
import uuid
753829
self._uuid = uuid.uuid4()
754830
self._alloc_handle = IPCAllocationHandle._init(alloc_handle, self._uuid)
755831
except:
@@ -774,8 +850,8 @@ class DeviceMemoryResource(MemoryResource):
774850
The allocated buffer object, which is accessible on the device that this memory
775851
resource was created for.
776852
"""
777-
if self._is_imported:
778-
raise TypeError("Cannot allocate from shared memory pool imported via IPC")
853+
if self._is_mapped:
854+
raise TypeError("Cannot allocate from a mapped IPC-enabled memory resource")
779855
if stream is None:
780856
stream = default_stream()
781857
err, ptr = driver.cuMemAllocFromPoolAsync(size, self._mempool_handle, stream.handle)
@@ -823,9 +899,12 @@ class DeviceMemoryResource(MemoryResource):
823899
return self._mempool_owned
824900

825901
@property
826-
def is_imported(self) -> bool:
827-
"""Whether the memory resource was imported from another process. If True, allocation is not permitted."""
828-
return self._is_imported
902+
def is_mapped(self) -> bool:
903+
"""
904+
Whether this is a mapping of an IPC-enabled memory resource from
905+
another process. If True, allocation is not permitted.
906+
"""
907+
return self._is_mapped
829908

830909
@property
831910
def is_device_accessible(self) -> bool:
@@ -843,6 +922,16 @@ class DeviceMemoryResource(MemoryResource):
843922
return self._ipc_handle_type != _NOIPC_HANDLE_TYPE
844923

845924

925+
def _deep_reduce_device_memory_resource(mr):
926+
from ._device import Device
927+
device = Device(mr.device_id)
928+
alloc_handle = mr.get_allocation_handle()
929+
return mr.from_allocation_handle, (device, alloc_handle)
930+
931+
932+
multiprocessing.reduction.register(DeviceMemoryResource, _deep_reduce_device_memory_resource)
933+
934+
846935
class LegacyPinnedMemoryResource(MemoryResource):
847936
"""Create a pinned memory resource that uses legacy cuMemAllocHost/cudaMallocHost
848937
APIs.

cuda_core/docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ CUDA runtime
3030

3131
:template: dataclass.rst
3232

33+
DeviceMemoryResourceOptions
3334
EventOptions
3435
GraphCompleteOptions
3536
GraphDebugPrintOptions

cuda_core/docs/source/api_private.rst

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
:orphan:
55

66
.. This page is to generate documentation for private classes exposed to users,
7-
i.e., users cannot instantiate it by themselves but may use it's properties
8-
or methods via returned values from public APIs. These classes must be referred
9-
in public APIs returning their instances.
7+
i.e., users cannot instantiate them but may use their properties or methods
8+
via returned values from public APIs. These classes must be referred in
9+
public APIs returning their instances.
1010
1111
.. currentmodule:: cuda.core.experimental
1212

@@ -18,8 +18,9 @@ CUDA runtime
1818

1919
_memory.PyCapsule
2020
_memory.DevicePointerT
21-
_memory.IPCBufferDescriptor
2221
_device.DeviceProperties
22+
_memory.IPCAllocationHandle
23+
_memory.IPCBufferDescriptor
2324
_module.KernelAttributes
2425
_module.KernelOccupancy
2526
_module.ParamInfo

cuda_core/tests/memory_ipc/test_errors.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import multiprocessing
5+
import pickle
56
import re
67

78
from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions
@@ -64,7 +65,7 @@ def CHILD_ACTION(self, queue):
6465

6566
def ASSERT(self, exc_type, exc_msg):
6667
assert exc_type is TypeError
67-
assert exc_msg == "Cannot allocate from shared memory pool imported via IPC"
68+
assert exc_msg == "Cannot allocate from a mapped IPC-enabled memory resource"
6869

6970

7071
class TestImportWrongMR(ChildErrorHarness):
@@ -128,11 +129,13 @@ def PARENT_ACTION(self, queue):
128129
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
129130
mr2 = DeviceMemoryResource(self.device, options=options)
130131
self.buffer = mr2.allocate(NBYTES)
131-
queue.put(self.buffer) # Note: mr2 not sent
132+
buffer_s = pickle.dumps(self.buffer)
133+
queue.put(buffer_s) # Note: mr2 not sent
132134

133135
def CHILD_ACTION(self, queue):
134136
Device().set_current()
135-
queue.get(timeout=CHILD_TIMEOUT_SEC)
137+
buffer_s = queue.get(timeout=CHILD_TIMEOUT_SEC)
138+
pickle.loads(buffer_s)
136139

137140
def ASSERT(self, exc_type, exc_msg):
138141
assert exc_type is RuntimeError

0 commit comments

Comments
 (0)