Skip to content

Commit 7b82962

Browse files
cpcloudclaude
andcommitted
Wrap IPC memory resource cleanup in try/finally
Ensure MR.close() runs even when tests fail (timeout, assertion, queue error) so pools don't leak on the unhappy path either. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6fb39c1 commit 7b82962

File tree

3 files changed

+81
-69
lines changed

3 files changed

+81
-69
lines changed

cuda_core/tests/memory_ipc/test_errors.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,25 @@ def test_main(self, ipc_device, ipc_memory_resource):
2828
self.mr = ipc_memory_resource
2929
self._extra_mrs = []
3030

31-
# Start a child process to generate error info.
32-
pipe = [multiprocessing.Queue() for _ in range(2)]
33-
process = multiprocessing.Process(target=self.child_main, args=(pipe, self.device, self.mr))
34-
process.start()
35-
36-
# Interact.
37-
self.PARENT_ACTION(pipe[0])
38-
39-
# Check the error.
40-
exc_type, exc_msg = pipe[1].get(timeout=CHILD_TIMEOUT_SEC)
41-
self.ASSERT(exc_type, exc_msg)
42-
43-
# Wait for the child process.
44-
process.join(timeout=CHILD_TIMEOUT_SEC)
45-
assert process.exitcode == 0
46-
47-
for mr in self._extra_mrs:
48-
mr.close()
31+
try:
32+
# Start a child process to generate error info.
33+
pipe = [multiprocessing.Queue() for _ in range(2)]
34+
process = multiprocessing.Process(target=self.child_main, args=(pipe, self.device, self.mr))
35+
process.start()
36+
37+
# Interact.
38+
self.PARENT_ACTION(pipe[0])
39+
40+
# Check the error.
41+
exc_type, exc_msg = pipe[1].get(timeout=CHILD_TIMEOUT_SEC)
42+
self.ASSERT(exc_type, exc_msg)
43+
44+
# Wait for the child process.
45+
process.join(timeout=CHILD_TIMEOUT_SEC)
46+
assert process.exitcode == 0
47+
finally:
48+
for mr in self._extra_mrs:
49+
mr.close()
4950

5051
def child_main(self, pipe, device, mr):
5152
"""Child process that pushes IPC errors to a shared pipe for testing."""

cuda_core/tests/memory_ipc/test_send_buffers.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,29 @@ def test_main(self, ipc_device, nmrs):
2626
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
2727
mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)]
2828

29-
# Allocate and fill memory.
30-
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
31-
pgen = PatternGen(device, NBYTES)
32-
for buffer in buffers:
33-
pgen.fill_buffer(buffer, seed=False)
34-
35-
# Start the child process.
36-
process = mp.Process(target=self.child_main, args=(device, buffers))
37-
process.start()
38-
39-
# Wait for the child process.
40-
process.join(timeout=CHILD_TIMEOUT_SEC)
41-
assert process.exitcode == 0
42-
43-
# Verify that the buffers were modified.
44-
pgen = PatternGen(device, NBYTES)
45-
for buffer in buffers:
46-
pgen.verify_buffer(buffer, seed=True)
47-
buffer.close()
48-
for mr in mrs:
49-
mr.close()
29+
try:
30+
# Allocate and fill memory.
31+
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
32+
pgen = PatternGen(device, NBYTES)
33+
for buffer in buffers:
34+
pgen.fill_buffer(buffer, seed=False)
35+
36+
# Start the child process.
37+
process = mp.Process(target=self.child_main, args=(device, buffers))
38+
process.start()
39+
40+
# Wait for the child process.
41+
process.join(timeout=CHILD_TIMEOUT_SEC)
42+
assert process.exitcode == 0
43+
44+
# Verify that the buffers were modified.
45+
pgen = PatternGen(device, NBYTES)
46+
for buffer in buffers:
47+
pgen.verify_buffer(buffer, seed=True)
48+
buffer.close()
49+
finally:
50+
for mr in mrs:
51+
mr.close()
5052

5153
def child_main(self, device, buffers):
5254
device.set_current()

cuda_core/tests/memory_ipc/test_workerpool.py

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,20 @@ def test_main(self, ipc_device, nmrs):
3333
device = ipc_device
3434
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
3535
mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)]
36-
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
3736

38-
with mp.Pool(NWORKERS) as pool:
39-
pool.map(self.process_buffer, buffers)
37+
try:
38+
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
4039

41-
pgen = PatternGen(device, NBYTES)
42-
for buffer in buffers:
43-
pgen.verify_buffer(buffer, seed=True)
44-
buffer.close()
45-
for mr in mrs:
46-
mr.close()
40+
with mp.Pool(NWORKERS) as pool:
41+
pool.map(self.process_buffer, buffers)
42+
43+
pgen = PatternGen(device, NBYTES)
44+
for buffer in buffers:
45+
pgen.verify_buffer(buffer, seed=True)
46+
buffer.close()
47+
finally:
48+
for mr in mrs:
49+
mr.close()
4750

4851
def process_buffer(self, buffer):
4952
device = Device(buffer.memory_resource.device_id)
@@ -72,20 +75,23 @@ def test_main(self, ipc_device, nmrs):
7275
device = ipc_device
7376
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
7477
mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)]
75-
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
7678

77-
with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool:
78-
pool.starmap(
79-
self.process_buffer,
80-
[(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers],
81-
)
79+
try:
80+
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
8281

83-
pgen = PatternGen(device, NBYTES)
84-
for buffer in buffers:
85-
pgen.verify_buffer(buffer, seed=True)
86-
buffer.close()
87-
for mr in mrs:
88-
mr.close()
82+
with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool:
83+
pool.starmap(
84+
self.process_buffer,
85+
[(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers],
86+
)
87+
88+
pgen = PatternGen(device, NBYTES)
89+
for buffer in buffers:
90+
pgen.verify_buffer(buffer, seed=True)
91+
buffer.close()
92+
finally:
93+
for mr in mrs:
94+
mr.close()
8995

9096
def process_buffer(self, mr_idx, buffer_desc):
9197
mr = self.mrs[mr_idx]
@@ -119,17 +125,20 @@ def test_main(self, ipc_device, nmrs):
119125
device = ipc_device
120126
options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True)
121127
mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)]
122-
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
123128

124-
with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool:
125-
pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers])
129+
try:
130+
buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))]
126131

127-
pgen = PatternGen(device, NBYTES)
128-
for buffer in buffers:
129-
pgen.verify_buffer(buffer, seed=True)
130-
buffer.close()
131-
for mr in mrs:
132-
mr.close()
132+
with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool:
133+
pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers])
134+
135+
pgen = PatternGen(device, NBYTES)
136+
for buffer in buffers:
137+
pgen.verify_buffer(buffer, seed=True)
138+
buffer.close()
139+
finally:
140+
for mr in mrs:
141+
mr.close()
133142

134143
def process_buffer(self, device, buffer_s):
135144
device.set_current()

0 commit comments

Comments
 (0)