Skip to content

Commit 7c42a4d

Browse files
committed
[#1354] Close SPICE thread-safety test multiprocessing resources
1 parent 821564c commit 7c42a4d

1 file changed

Lines changed: 98 additions & 41 deletions

File tree

src/simulation/environment/spiceInterface/_UnitTest/test_spiceThreadSafety.py

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,59 @@ def _runTestWithTimeout(resultQueue, numWorkers, iterationsPerWorker):
232232
False,
233233
)
234234
)
235+
finally:
236+
_closeQueue(resultQueue)
237+
238+
239+
def _closeQueue(resultQueue):
240+
"""
241+
Close a multiprocessing queue and wait for its feeder thread.
242+
243+
Parameters
244+
----------
245+
resultQueue : multiprocessing.Queue
246+
Queue to close after all expected data has been read or written.
247+
"""
248+
try:
249+
resultQueue.close()
250+
except (OSError, ValueError):
251+
pass
252+
try:
253+
resultQueue.join_thread()
254+
except (AssertionError, OSError, ValueError):
255+
pass
256+
257+
258+
def _closeProcess(testProcess):
259+
"""
260+
Release multiprocessing process resources after the process exits.
261+
262+
Parameters
263+
----------
264+
testProcess : multiprocessing.Process
265+
Process object to close after it has been joined.
266+
"""
267+
try:
268+
testProcess.close()
269+
except (OSError, ValueError):
270+
pass
271+
272+
273+
def _terminateProcess(testProcess):
274+
"""
275+
Stop a multiprocessing process and wait for shutdown.
276+
277+
Parameters
278+
----------
279+
testProcess : multiprocessing.Process
280+
Process object to terminate.
281+
"""
282+
testProcess.terminate()
283+
shutdownTimeout = 1 # [s]
284+
testProcess.join(shutdownTimeout)
285+
if testProcess.is_alive():
286+
os.kill(testProcess.pid, 9)
287+
testProcess.join(shutdownTimeout)
235288

236289

237290
@pytest.mark.flaky(reruns=3)
@@ -260,34 +313,37 @@ def testSpiceThreadSafety(numWorkers, iterationsPerWorker):
260313
target=_runTestWithTimeout,
261314
args=(resultQueue, numWorkers, iterationsPerWorker),
262315
)
263-
testProcess.start()
316+
try:
317+
testProcess.start()
264318

265-
timeoutSeconds = 60
266-
testProcess.join(timeoutSeconds)
319+
timeoutSeconds = 60 # [s]
320+
testProcess.join(timeoutSeconds)
267321

268-
if testProcess.is_alive():
269-
# Hard timeout: kill the worker process and fail the test
270-
testProcess.terminate()
271-
testProcess.join(1)
272322
if testProcess.is_alive():
273-
os.kill(testProcess.pid, 9)
274-
pytest.fail(f"Thread safety test timed out after {timeoutSeconds} seconds")
275-
276-
try:
277-
results, success = resultQueue.get(block=False)
278-
279-
if isinstance(results, dict) and "error" in results:
323+
# Hard timeout: kill the worker process and fail the test
324+
_terminateProcess(testProcess)
280325
pytest.fail(
281-
"Thread safety test failed with error: "
282-
f"{results['error']}\n{results.get('traceback')}"
326+
f"Thread safety test timed out after {timeoutSeconds} seconds"
283327
)
284328

285-
assert success, "Thread safety test reported thread-safety issues"
286-
assert results["failedIterations"] == 0, (
287-
"Some iterations failed in the thread-safety test"
288-
)
289-
except queue.Empty:
290-
pytest.fail("Thread safety test completed but did not return any results")
329+
try:
330+
results, success = resultQueue.get(block=False)
331+
332+
if isinstance(results, dict) and "error" in results:
333+
pytest.fail(
334+
"Thread safety test failed with error: "
335+
f"{results['error']}\n{results.get('traceback')}"
336+
)
337+
338+
assert success, "Thread safety test reported thread-safety issues"
339+
assert results["failedIterations"] == 0, (
340+
"Some iterations failed in the thread-safety test"
341+
)
342+
except queue.Empty:
343+
pytest.fail("Thread safety test completed but did not return any results")
344+
finally:
345+
_closeQueue(resultQueue)
346+
_closeProcess(testProcess)
291347

292348

293349
if __name__ == "__main__":
@@ -307,28 +363,29 @@ def testSpiceThreadSafety(numWorkers, iterationsPerWorker):
307363
target=_runTestWithTimeout,
308364
args=(resultQueue, numWorkers, iterationsPerWorker),
309365
)
310-
testProcess.start()
366+
try:
367+
testProcess.start()
311368

312-
timeoutSeconds = 60
313-
testProcess.join(timeoutSeconds)
369+
timeoutSeconds = 60 # [s]
370+
testProcess.join(timeoutSeconds)
314371

315-
if testProcess.is_alive():
316-
testProcess.terminate()
317-
testProcess.join(1)
318372
if testProcess.is_alive():
319-
os.kill(testProcess.pid, 9)
320-
print(f"ERROR: Thread safety test timed out after {timeoutSeconds} seconds")
321-
sys.exit(2)
373+
_terminateProcess(testProcess)
374+
print(f"ERROR: Thread safety test timed out after {timeoutSeconds} seconds")
375+
sys.exit(2)
322376

323-
try:
324-
results, success = resultQueue.get(block=False)
377+
try:
378+
results, success = resultQueue.get(block=False)
325379

326-
if isinstance(results, dict) and "error" in results:
327-
print(f"ERROR: Thread safety test failed with error: {results['error']}")
328-
print(results.get("traceback"))
329-
sys.exit(1)
380+
if isinstance(results, dict) and "error" in results:
381+
print(f"ERROR: Thread safety test failed with error: {results['error']}")
382+
print(results.get("traceback"))
383+
sys.exit(1)
330384

331-
sys.exit(0 if success else 1)
332-
except queue.Empty:
333-
print("ERROR: Thread safety test completed but did not return results")
334-
sys.exit(1)
385+
sys.exit(0 if success else 1)
386+
except queue.Empty:
387+
print("ERROR: Thread safety test completed but did not return results")
388+
sys.exit(1)
389+
finally:
390+
_closeQueue(resultQueue)
391+
_closeProcess(testProcess)

0 commit comments

Comments
 (0)