-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathtest_spiceThreadSafety.py
More file actions
391 lines (325 loc) · 11.9 KB
/
test_spiceThreadSafety.py
File metadata and controls
391 lines (325 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
#
# ISC License
#
# Copyright (c) 2025, Autonomous Vehicle
# Systems Lab, University of Colorado at Boulder
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import multiprocessing as mp
import os
import sys
import time
import traceback
import pytest
from Basilisk import __path__
from Basilisk.simulation import spiceInterface
r"""
Unit Test for SPICE Interface Thread Safety
===========================================
This script stress-tests the SPICE interface in parallel, reproducing
the conditions of GitHub issue #220 where parallel simulations using
SPICE could deadlock or corrupt data.
Multiple worker processes repeatedly create and destroy SpiceInterface
instances, forcing concurrent kernel load/unload operations. The test
passes if all workers complete without hangs or unhandled exceptions.
"""
bskPath = __path__[0]
def createLoadDestroySpice(workerId, iterations, dataPath):
"""
Repeatedly create, reset, and destroy SpiceInterface objects.
This function is run in parallel by multiple processes. Each worker
performs ``iterations`` cycles of:
1. Constructing a SpiceInterface
2. Configuring planet names and SPICE data path
3. Calling Reset (which triggers kernel loads)
4. Brief sleep to increase contention
5. Deleting the interface (allowing kernels to be released)
Parameters
----------
workerId : int
Identifier for this worker process.
iterations : int
Number of create/reset/destroy cycles to perform.
dataPath : str
Directory containing SPICE kernel data.
Returns
-------
dict
Summary for this worker with counts of successes, failures, and
a list of captured exception details.
"""
print(f"Worker {workerId} starting with {iterations} iterations")
successCount = 0
failureCount = 0
exceptionList = []
try:
for iteration in range(iterations):
try:
# Create a new SpiceInterface
spiceObj = spiceInterface.SpiceInterface()
# Use a fixed planet set to avoid random differences
planets = ["earth", "sun"]
spiceObj.addPlanetNames(planets)
# Configure SPICE data path and trigger kernel loads
spiceObj.SPICEDataPath = dataPath
spiceObj.Reset(0)
# Short sleep to encourage overlap among workers
time.sleep(0.001)
# Drop reference so the object can be destroyed
del spiceObj
successCount += 1
print(
f"Worker {workerId} completed iteration "
f"{iteration + 1}/{iterations}"
)
except Exception as exc:
failureCount += 1
errorInfo = {
"workerId": workerId,
"iteration": iteration,
"error": str(exc),
"traceback": traceback.format_exc(),
}
exceptionList.append(errorInfo)
print(
f"Worker {workerId} failed at iteration {iteration} "
f"with error: {exc}"
)
# Continue with next iteration
continue
except Exception as exc:
# Catch any exception outside the main loop
failureCount += 1
errorInfo = {
"workerId": workerId,
"iteration": -1, # Outside the loop
"error": str(exc),
"traceback": traceback.format_exc(),
}
exceptionList.append(errorInfo)
print(f"Worker {workerId} failed with error outside iteration loop: {exc}")
return {
"workerId": workerId,
"successCount": successCount,
"failureCount": failureCount,
"exceptions": exceptionList,
}
def runThreadSafetyTest(numWorkers=2, iterationsPerWorker=5):
"""
Run the SPICE thread-safety stress test.
Parameters
----------
numWorkers : int
Number of parallel worker processes to launch.
iterationsPerWorker : int
Number of create/reset/destroy cycles per worker.
Returns
-------
results : dict
Aggregate statistics over all workers.
success : bool
True if all iterations completed without failure, False otherwise.
"""
print(f"Starting SPICE Thread Safety Test with {numWorkers} workers")
print(f"Each worker will perform {iterationsPerWorker} iterations")
dataPath = bskPath + "/supportData/EphemerisData/"
startTime = time.time()
workerArgs = [
(workerId, iterationsPerWorker, dataPath) for workerId in range(numWorkers)
]
with mp.Pool(processes=numWorkers) as pool:
workerResults = list(pool.starmap(createLoadDestroySpice, workerArgs))
endTime = time.time()
executionTime = endTime - startTime
totalSuccess = sum(r["successCount"] for r in workerResults)
totalFailure = sum(r["failureCount"] for r in workerResults)
allExceptions = [e for r in workerResults for e in r["exceptions"]]
results = {
"executionTime": executionTime,
"totalIterations": numWorkers * iterationsPerWorker,
"successfulIterations": totalSuccess,
"failedIterations": totalFailure,
"exceptions": allExceptions,
}
print("\n--- SPICE Thread Safety Test Report ---")
print(f"Total execution time: {executionTime:.2f} seconds")
print(f"Total iterations: {numWorkers * iterationsPerWorker}")
print(f"Successful iterations: {totalSuccess}")
print(f"Failed iterations: {totalFailure}")
print(f"Exceptions encountered: {len(allExceptions)}")
print("--------------------------------------\n")
if totalSuccess == 0:
print("TEST FAILED: No successful iterations completed")
if len(allExceptions) > 0:
print("\nFirst exception details:")
print(allExceptions[0]["traceback"])
success = False
else:
success = totalFailure == 0
if success:
print("TEST PASSED: SPICE interface thread safety looks robust")
else:
print("TEST FAILED: Issues detected with SPICE interface thread safety")
if len(allExceptions) > 0:
print("\nFirst exception details:")
print(allExceptions[0]["traceback"])
return results, success
def _runTestWithTimeout(resultQueue, numWorkers, iterationsPerWorker):
"""
Helper used as a process entry point to run the test with a timeout.
This is defined at module level so that it is picklable by
multiprocessing on all supported platforms.
"""
try:
results, success = runThreadSafetyTest(numWorkers, iterationsPerWorker)
resultQueue.put((results, success))
except Exception as exc:
resultQueue.put(
(
{
"error": str(exc),
"traceback": traceback.format_exc(),
},
False,
)
)
finally:
_closeQueue(resultQueue)
def _closeQueue(resultQueue):
"""
Close a multiprocessing queue and wait for its feeder thread.
Parameters
----------
resultQueue : multiprocessing.Queue
Queue to close after all expected data has been read or written.
"""
try:
resultQueue.close()
except (OSError, ValueError):
pass
try:
resultQueue.join_thread()
except (AssertionError, OSError, ValueError):
pass
def _closeProcess(testProcess):
"""
Release multiprocessing process resources after the process exits.
Parameters
----------
testProcess : multiprocessing.Process
Process object to close after it has been joined.
"""
try:
testProcess.close()
except (OSError, ValueError):
pass
def _terminateProcess(testProcess):
"""
Stop a multiprocessing process and wait for shutdown.
Parameters
----------
testProcess : multiprocessing.Process
Process object to terminate.
"""
testProcess.terminate()
shutdownTimeout = 1 # [s]
testProcess.join(shutdownTimeout)
if testProcess.is_alive():
os.kill(testProcess.pid, 9)
testProcess.join(shutdownTimeout)
@pytest.mark.flaky(reruns=3)
@pytest.mark.parametrize(
"numWorkers, iterationsPerWorker",
[
(10, 3),
],
)
def testSpiceThreadSafety(numWorkers, iterationsPerWorker):
"""
Pytest entry point for the SPICE thread-safety test.
Parameters
----------
numWorkers : int
Number of parallel worker processes.
iterationsPerWorker : int
Number of load/unload cycles per worker.
"""
import queue
from multiprocessing import Process, Queue
resultQueue = Queue()
testProcess = Process(
target=_runTestWithTimeout,
args=(resultQueue, numWorkers, iterationsPerWorker),
)
try:
testProcess.start()
timeoutSeconds = 60 # [s]
testProcess.join(timeoutSeconds)
if testProcess.is_alive():
# Hard timeout: kill the worker process and fail the test
_terminateProcess(testProcess)
pytest.fail(
f"Thread safety test timed out after {timeoutSeconds} seconds"
)
try:
results, success = resultQueue.get(block=False)
if isinstance(results, dict) and "error" in results:
pytest.fail(
"Thread safety test failed with error: "
f"{results['error']}\n{results.get('traceback')}"
)
assert success, "Thread safety test reported thread-safety issues"
assert results["failedIterations"] == 0, (
"Some iterations failed in the thread-safety test"
)
except queue.Empty:
pytest.fail("Thread safety test completed but did not return any results")
finally:
_closeQueue(resultQueue)
_closeProcess(testProcess)
if __name__ == "__main__":
import queue
from multiprocessing import Process, Queue
numWorkers = 50
iterationsPerWorker = 3
if len(sys.argv) > 1:
numWorkers = int(sys.argv[1])
if len(sys.argv) > 2:
iterationsPerWorker = int(sys.argv[2])
resultQueue = Queue()
testProcess = Process(
target=_runTestWithTimeout,
args=(resultQueue, numWorkers, iterationsPerWorker),
)
try:
testProcess.start()
timeoutSeconds = 60 # [s]
testProcess.join(timeoutSeconds)
if testProcess.is_alive():
_terminateProcess(testProcess)
print(f"ERROR: Thread safety test timed out after {timeoutSeconds} seconds")
sys.exit(2)
try:
results, success = resultQueue.get(block=False)
if isinstance(results, dict) and "error" in results:
print(f"ERROR: Thread safety test failed with error: {results['error']}")
print(results.get("traceback"))
sys.exit(1)
sys.exit(0 if success else 1)
except queue.Empty:
print("ERROR: Thread safety test completed but did not return results")
sys.exit(1)
finally:
_closeQueue(resultQueue)
_closeProcess(testProcess)