Skip to content

Commit e5b8542

Browse files
committed
Remove call to set_current in Device reconstruction. Add device set-up to tests.
1 parent fbcf3b3 commit e5b8542

File tree

8 files changed

+55
-85
lines changed

8 files changed

+55
-85
lines changed

cuda_core/cuda/core/experimental/_device.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
import multiprocessing
65
import threading
76
from typing import Optional, Union
87

@@ -1161,6 +1160,9 @@ def __int__(self):
11611160
def __repr__(self):
11621161
return f"<Device {self._id} ({self.name})>"
11631162

1163+
def __reduce__(self):
1164+
return Device, (self.device_id,)
1165+
11641166
def set_current(self, ctx: Context = None) -> Union[Context, None]:
11651167
"""Set device to be used for GPU executions.
11661168
@@ -1335,17 +1337,3 @@ def create_graph_builder(self) -> GraphBuilder:
13351337
"""
13361338
self._check_context_initialized()
13371339
return GraphBuilder._init(stream=self.create_stream(), is_stream_owner=True)
1338-
1339-
1340-
def _reduce_device(device):
1341-
return _reconstruct_device, (device.device_id,)
1342-
1343-
1344-
def _reconstruct_device(device_id):
1345-
device = Device(device_id)
1346-
if not device._has_inited:
1347-
device.set_current()
1348-
return device
1349-
1350-
1351-
multiprocessing.reduction.register(Device, _reduce_device)

cuda_core/cuda/core/experimental/_memory.pyx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ class DeviceMemoryResource(MemoryResource):
923923

924924

925925
def _deep_reduce_device_memory_resource(mr):
926-
from ._device import Device
926+
from . import Device
927927
device = Device(mr.device_id)
928928
alloc_handle = mr.get_allocation_handle()
929929
return mr.from_allocation_handle, (device, alloc_handle)

cuda_core/tests/memory_ipc/test_errors.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def test_main(self, ipc_device, ipc_memory_resource):
4343
def child_main(self, pipe, device, mr):
4444
"""Child process that pushes IPC errors to a shared pipe for testing."""
4545
self.device = device
46+
self.device.set_current()
4647
self.mr = mr
4748
try:
4849
self.CHILD_ACTION(pipe[0])
@@ -129,13 +130,13 @@ def PARENT_ACTION(self, queue):
129130
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
130131
mr2 = DeviceMemoryResource(self.device, options=options)
131132
self.buffer = mr2.allocate(NBYTES)
132-
buffer_s = pickle.dumps(self.buffer)
133+
buffer_s = pickle.dumps(self.buffer) # noqa: S301
133134
queue.put(buffer_s) # Note: mr2 not sent
134135

135136
def CHILD_ACTION(self, queue):
136137
Device().set_current()
137138
buffer_s = queue.get(timeout=CHILD_TIMEOUT_SEC)
138-
pickle.loads(buffer_s)
139+
pickle.loads(buffer_s) # noqa: S301
139140

140141
def ASSERT(self, exc_type, exc_msg):
141142
assert exc_type is RuntimeError

cuda_core/tests/memory_ipc/test_leaks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def test_alloc_handle(ipc_memory_resource):
3333
[mr.get_allocation_handle() for _ in range(10)]
3434

3535

36-
def exec_with_object(obj, number=1):
36+
def exec_success(obj, number=1):
3737
"""Succesfully run a child process."""
3838
for _ in range(number):
3939
process = mp.Process(target=child_main, args=(obj,))
@@ -92,7 +92,7 @@ def __reduce__(self):
9292
],
9393
ids=["alloc_handle", "mr", "buffer", "buffer_desc"],
9494
)
95-
@pytest.mark.parametrize("launcher", [exec_with_object, exec_launch_failure, exec_reduce_failure])
95+
@pytest.mark.parametrize("launcher", [exec_success, exec_launch_failure, exec_reduce_failure])
9696
def test_pass_object(ipc_memory_resource, launcher, getobject):
9797
"""Check for fd leaks when an object is sent as a subprocess argument."""
9898
mr = ipc_memory_resource

cuda_core/tests/memory_ipc/test_memory_ipc.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import multiprocessing as mp
55

6-
from cuda.core.experimental import Buffer, Device, DeviceMemoryResource
6+
from cuda.core.experimental import Buffer, DeviceMemoryResource
77
from utility import IPCBufferTestHelper
88

99
CHILD_TIMEOUT_SEC = 20
@@ -21,7 +21,7 @@ def test_main(self, ipc_device, ipc_memory_resource):
2121

2222
# Start the child process.
2323
queue = mp.Queue()
24-
process = mp.Process(target=self.child_main, args=(mr, queue))
24+
process = mp.Process(target=self.child_main, args=(device, mr, queue))
2525
process.start()
2626

2727
# Allocate and fill memory.
@@ -39,8 +39,8 @@ def test_main(self, ipc_device, ipc_memory_resource):
3939
# Verify that the buffer was modified.
4040
helper.verify_buffer(flipped=True)
4141

42-
def child_main(self, mr, queue):
43-
device = Device()
42+
def child_main(self, device, mr, queue):
43+
device.set_current()
4444
buffer = queue.get(timeout=CHILD_TIMEOUT_SEC)
4545
helper = IPCBufferTestHelper(device, buffer)
4646
helper.verify_buffer(flipped=False)
@@ -64,8 +64,8 @@ def test_main(self, ipc_device, ipc_memory_resource):
6464
q2.put(buffer2)
6565

6666
# Start the child processes.
67-
p1 = mp.Process(target=self.child_main, args=(mr, 1, q1))
68-
p2 = mp.Process(target=self.child_main, args=(mr, 2, q2))
67+
p1 = mp.Process(target=self.child_main, args=(device, mr, 1, q1))
68+
p2 = mp.Process(target=self.child_main, args=(device, mr, 2, q2))
6969
p1.start()
7070
p2.start()
7171

@@ -79,10 +79,10 @@ def test_main(self, ipc_device, ipc_memory_resource):
7979
IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=False)
8080
IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True)
8181

82-
def child_main(self, mr, idx, queue):
82+
def child_main(self, device, mr, idx, queue):
8383
# Note: passing the mr registers it so that buffers can be passed
8484
# directly.
85-
device = Device()
85+
device.set_current()
8686
buffer1 = queue.get(timeout=CHILD_TIMEOUT_SEC)
8787
buffer2 = queue.get(timeout=CHILD_TIMEOUT_SEC)
8888
if idx == 1:
@@ -104,8 +104,8 @@ def test_main(self, ipc_device, ipc_memory_resource):
104104

105105
# Start children.
106106
q1, q2 = (mp.Queue() for _ in range(2))
107-
p1 = mp.Process(target=self.child_main, args=(alloc_handle, 1, q1))
108-
p2 = mp.Process(target=self.child_main, args=(alloc_handle, 2, q2))
107+
p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1))
108+
p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2))
109109
p1.start()
110110
p2.start()
111111

@@ -125,11 +125,10 @@ def test_main(self, ipc_device, ipc_memory_resource):
125125
IPCBufferTestHelper(device, buf1).verify_buffer(starting_from=1)
126126
IPCBufferTestHelper(device, buf2).verify_buffer(starting_from=2)
127127

128-
def child_main(self, alloc_handle, idx, queue):
128+
def child_main(self, device, alloc_handle, idx, queue):
129129
"""Fills a shared memory buffer."""
130130
# In this case, the device needs to be set up (passing the mr does it
131131
# implicitly in other tests).
132-
device = Device()
133132
device.set_current()
134133
mr = DeviceMemoryResource.from_allocation_handle(device, alloc_handle)
135134
buffer_descriptor = queue.get(timeout=CHILD_TIMEOUT_SEC)
@@ -149,8 +148,8 @@ def test_main(self, ipc_device, ipc_memory_resource):
149148

150149
# Start children.
151150
q1, q2 = (mp.Queue() for _ in range(2))
152-
p1 = mp.Process(target=self.child_main, args=(alloc_handle, 1, q1))
153-
p2 = mp.Process(target=self.child_main, args=(alloc_handle, 2, q2))
151+
p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1))
152+
p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2))
154153
p1.start()
155154
p2.start()
156155

@@ -170,9 +169,8 @@ def test_main(self, ipc_device, ipc_memory_resource):
170169
IPCBufferTestHelper(device, buf1).verify_buffer(starting_from=1)
171170
IPCBufferTestHelper(device, buf2).verify_buffer(starting_from=2)
172171

173-
def child_main(self, alloc_handle, idx, queue):
172+
def child_main(self, device, alloc_handle, idx, queue):
174173
"""Fills a shared memory buffer."""
175-
device = Device()
176174
device.set_current()
177175

178176
# Register the memory resource.

cuda_core/tests/memory_ipc/test_send_buffers.py

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
import multiprocessing
4+
import multiprocessing as mp
55
from itertools import cycle
66

7-
from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions
7+
import pytest
8+
from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions
89
from utility import IPCBufferTestHelper
910

1011
CHILD_TIMEOUT_SEC = 20
@@ -14,37 +15,13 @@
1415
POOL_SIZE = 2097152
1516

1617

17-
def test_ipc_send_buffers(ipc_device, ipc_memory_resource):
18-
"""Test passing buffers directly to a child separately from a memory resource."""
19-
device = ipc_device
20-
mr = ipc_memory_resource
21-
22-
# Allocate and fill memory.
23-
buffers = [mr.allocate(NBYTES) for _ in range(NTASKS)]
24-
for buffer in buffers:
25-
helper = IPCBufferTestHelper(device, buffer)
26-
helper.fill_buffer(flipped=False)
27-
28-
# Start the child process. Send the buffer directly.
29-
process = multiprocessing.Process(target=child_main, args=(buffers,))
30-
process.start()
31-
32-
# Wait for the child process.
33-
process.join(timeout=CHILD_TIMEOUT_SEC)
34-
assert process.exitcode == 0
35-
36-
# Verify that the buffers were modified.
37-
for buffer in buffers:
38-
helper = IPCBufferTestHelper(device, buffer)
39-
helper.verify_buffer(flipped=True)
40-
41-
42-
def test_ipc_send_buffers_multi(ipc_device, ipc_memory_resource):
18+
@pytest.mark.parametrize("nmrs", (1, NMRS))
19+
def test_ipc_send_buffers(ipc_device, nmrs):
4320
"""Test passing buffers sourced from multiple memory resources."""
4421
# Set up several IPC-enabled memory pools.
4522
device = ipc_device
4623
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
47-
mrs = [ipc_memory_resource] + [DeviceMemoryResource(device, options=options) for _ in range(NMRS - 1)]
24+
mrs = [DeviceMemoryResource(device, options=options) for _ in range(NMRS)]
4825

4926
# Allocate and fill memory.
5027
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
@@ -53,7 +30,13 @@ def test_ipc_send_buffers_multi(ipc_device, ipc_memory_resource):
5330
helper.fill_buffer(flipped=False)
5431

5532
# Start the child process.
56-
process = multiprocessing.Process(target=child_main, args=(buffers,))
33+
process = mp.Process(
34+
target=child_main,
35+
args=(
36+
device,
37+
buffers,
38+
),
39+
)
5740
process.start()
5841

5942
# Wait for the child process.
@@ -66,8 +49,8 @@ def test_ipc_send_buffers_multi(ipc_device, ipc_memory_resource):
6649
helper.verify_buffer(flipped=True)
6750

6851

69-
def child_main(buffers):
70-
device = Device()
52+
def child_main(device, buffers):
53+
device.set_current()
7154
for buffer in buffers:
7255
helper = IPCBufferTestHelper(device, buffer)
7356
helper.verify_buffer(flipped=False)

cuda_core/tests/memory_ipc/test_serialize.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ def test_main(self, ipc_device, ipc_memory_resource):
3333
# Send a memory resource by allocation handle.
3434
alloc_handle = mr.get_allocation_handle()
3535
mp.reduction.send_handle(parent_conn, alloc_handle.handle, process.pid)
36-
parent_conn.send(mr.uuid)
3736

3837
# Send a buffer.
3938
buffer1 = mr.allocate(NBYTES)
@@ -57,9 +56,7 @@ def child_main(self, conn):
5756

5857
# Receive the memory resource.
5958
handle = mp.reduction.recv_handle(conn)
60-
uuid = conn.recv()
6159
mr = DeviceMemoryResource.from_allocation_handle(device, handle)
62-
mr.register(uuid)
6360
os.close(handle)
6461

6562
# Receive the buffers.
@@ -135,15 +132,17 @@ def test_object_passing(ipc_device, ipc_memory_resource):
135132
helper.fill_buffer(flipped=False)
136133

137134
# Start the child process.
138-
process = mp.Process(target=child_main, args=(device, alloc_handle, mr, buffer_desc, buffer))
135+
process = mp.Process(target=child_main, args=(alloc_handle, mr, buffer_desc, buffer))
139136
process.start()
140137
process.join(timeout=CHILD_TIMEOUT_SEC)
141138
assert process.exitcode == 0
142139

143140
helper.verify_buffer(flipped=True)
144141

145142

146-
def child_main(device, alloc_handle, mr1, buffer_desc, buffer1):
143+
def child_main(alloc_handle, mr1, buffer_desc, buffer1):
144+
device = Device()
145+
device.set_current()
147146
mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle)
148147

149148
# OK to build the buffer from either mr and the descriptor.

cuda_core/tests/memory_ipc/test_workerpool.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from itertools import cycle
54
import multiprocessing as mp
65
import pickle
7-
import pytest
6+
from itertools import cycle
87

8+
import pytest
99
from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions
1010
from utility import IPCBufferTestHelper
1111

@@ -40,7 +40,8 @@ def test_main(self, ipc_device, nmrs):
4040
IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True)
4141

4242
def process_buffer(self, buffer):
43-
device = Device()
43+
device = Device(buffer.memory_resource.device_id)
44+
device.set_current()
4445
IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True)
4546

4647

@@ -67,15 +68,17 @@ def test_main(self, ipc_device, nmrs):
6768
with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool:
6869
pool.starmap(
6970
self.process_buffer,
70-
[(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers]
71+
[(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers],
7172
)
7273

7374
for buffer in buffers:
7475
IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True)
7576

7677
def process_buffer(self, mr_idx, buffer_desc):
77-
device = Device()
78-
buffer = Buffer.from_ipc_descriptor(self.mrs[mr_idx], buffer_desc)
78+
mr = self.mrs[mr_idx]
79+
device = Device(mr.device_id)
80+
device.set_current()
81+
buffer = Buffer.from_ipc_descriptor(mr, buffer_desc)
7982
IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True)
8083

8184

@@ -103,14 +106,12 @@ def test_main(self, ipc_device, nmrs):
103106
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
104107

105108
with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool:
106-
pool.map(self.process_buffer, [pickle.dumps(buffer) for buffer in buffers]
107-
)
109+
pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers])
108110

109111
for buffer in buffers:
110112
IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True)
111113

112-
def process_buffer(self, buffer_s):
113-
device = Device()
114-
buffer = pickle.loads(buffer_s)
114+
def process_buffer(self, device, buffer_s):
115+
device.set_current()
116+
buffer = pickle.loads(buffer_s) # noqa: S301
115117
IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True)
116-

0 commit comments

Comments
 (0)