Skip to content

Commit 68b3da1

Browse files
Non-thread-safe fix
1 parent ab5c80d commit 68b3da1

1 file changed

Lines changed: 107 additions & 42 deletions

File tree

hwcomponents_neurosim/neurointerface.py

Lines changed: 107 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
includes integration of cell files from NVSim and NVMExplorer.
44
"""
55

6+
import fcntl
7+
import glob
8+
import hashlib
69
import logging
710
from statistics import mean
811
import threading
@@ -13,11 +16,53 @@
1316
import re
1417
import os
1518

16-
MY_PID = os.getpid()
1719
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
1820
DEFAULT_CONFIG = os.path.join(SCRIPT_DIR, "default_config.cfg")
1921
NEUROSIM_PATH = os.path.join(SCRIPT_DIR, "NeuroSim/main")
20-
CFG_WRITE_PATH = os.path.join(SCRIPT_DIR, f"./neurosim_input_{MY_PID}.cfg")
22+
23+
24+
def _clean_tmp_dir():
25+
temp_dir = os.path.join(SCRIPT_DIR, "neurosim_inputs_outputs")
26+
os.makedirs(temp_dir, exist_ok=True)
27+
28+
# Skip .lock (persistent, one per hash) and .tmp (in-flight atomic writes).
29+
# getctime swallows OSError so a peer's concurrent eviction can't crash us.
30+
def ctime(path):
31+
try:
32+
return os.path.getctime(path)
33+
except OSError:
34+
return 0
35+
36+
files = sorted(
37+
[
38+
f
39+
for f in glob.glob(os.path.join(temp_dir, "*"))
40+
if not f.endswith((".lock", ".tmp"))
41+
],
42+
key=ctime,
43+
reverse=True,
44+
)
45+
if len(files) > 200:
46+
for file in files[200:]:
47+
# Take the per-hash .lock non-blocking before deleting. If a concurrent
48+
# worker holds it, the file is in active use — skip and let the next
49+
# call evict it.
50+
lock_path = os.path.splitext(file)[0] + ".lock"
51+
try:
52+
with open(lock_path, "a") as lock_file:
53+
try:
54+
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
55+
except BlockingIOError:
56+
continue
57+
try:
58+
os.remove(file)
59+
except OSError:
60+
pass
61+
fcntl.flock(lock_file, fcntl.LOCK_UN)
62+
except OSError:
63+
pass
64+
return temp_dir
65+
2166

2267
# ==================================================================================================
2368
# NVSIM/NVMEXPLORER -> NEUROSIM TRANSLATIONS
@@ -399,43 +444,66 @@ def run_neurosim(
399444
if "cycle_period" not in to_set[0]:
400445
cfg = replace_cfg(to_set[0], to_set[1], cfg, cfgfile, logger)
401446

402-
# Write config
403-
inputpath = os.path.realpath(CFG_WRITE_PATH)
404-
with open(inputpath, "w") as f:
405-
f.write(cfg)
406-
os.chmod(inputpath, 0o777)
407-
408-
# Run
409-
logger.info("Running %s %s", NEUROSIM_PATH, inputpath)
410-
proc = subprocess.Popen(
411-
[NEUROSIM_PATH, inputpath],
412-
stdout=subprocess.PIPE,
413-
stderr=subprocess.PIPE,
414-
env=os.environ.copy(),
415-
)
416-
417-
def read_pipe_thread(pipe, write_to: list):
418-
while proc.poll() is None:
419-
write_to.append(pipe.read().decode("utf-8"))
420-
write_to.append(pipe.read().decode("utf-8"))
421-
422-
stdout, stderr = [], []
423-
stdout_thread = threading.Thread(
424-
target=read_pipe_thread, args=(proc.stdout, stdout)
425-
)
426-
stderr_thread = threading.Thread(
427-
target=read_pipe_thread, args=(proc.stderr, stderr)
428-
)
429-
stdout_thread.start()
430-
stderr_thread.start()
431-
stdout_thread.join()
432-
stderr_thread.join()
433-
stdout, stderr = "".join(stdout), "".join(stderr)
434-
if proc.returncode != 0:
435-
logger.error("NeuroSIM returned error code %s", proc.returncode)
436-
logger.error(stderr)
437-
raise ValueError("NeuroSIM returned error code %s", proc.returncode)
438-
results = stdout
447+
# Name files by a sha256 of the config. Concurrent callers with the same
448+
# hash serialize on the .lock and share the cached .out; different hashes
449+
# never collide on the same path.
450+
temp_dir = _clean_tmp_dir()
451+
input_name = hashlib.sha256(cfg.encode()).hexdigest()
452+
input_path = os.path.join(temp_dir, input_name + ".cfg")
453+
output_path = os.path.join(temp_dir, input_name + ".out")
454+
lock_path = os.path.join(temp_dir, input_name + ".lock")
455+
456+
with open(lock_path, "w") as lock_file:
457+
fcntl.flock(lock_file, fcntl.LOCK_EX)
458+
try:
459+
if os.path.exists(output_path):
460+
with open(output_path, "r") as f:
461+
results = f.read()
462+
else:
463+
with open(input_path, "w") as f:
464+
f.write(cfg)
465+
os.chmod(input_path, 0o777)
466+
467+
logger.info("Running %s %s", NEUROSIM_PATH, input_path)
468+
proc = subprocess.Popen(
469+
[NEUROSIM_PATH, input_path],
470+
stdout=subprocess.PIPE,
471+
stderr=subprocess.PIPE,
472+
env=os.environ.copy(),
473+
)
474+
475+
def read_pipe_thread(pipe, write_to: list):
476+
while proc.poll() is None:
477+
write_to.append(pipe.read().decode("utf-8"))
478+
write_to.append(pipe.read().decode("utf-8"))
479+
480+
stdout, stderr = [], []
481+
stdout_thread = threading.Thread(
482+
target=read_pipe_thread, args=(proc.stdout, stdout)
483+
)
484+
stderr_thread = threading.Thread(
485+
target=read_pipe_thread, args=(proc.stderr, stderr)
486+
)
487+
stdout_thread.start()
488+
stderr_thread.start()
489+
stdout_thread.join()
490+
stderr_thread.join()
491+
stdout, stderr = "".join(stdout), "".join(stderr)
492+
if proc.returncode != 0:
493+
logger.error("NeuroSIM returned error code %s", proc.returncode)
494+
logger.error(stderr)
495+
raise ValueError(
496+
"NeuroSIM returned error code %s", proc.returncode
497+
)
498+
results = stdout
499+
# Write-then-rename so a crash mid-write can't leave a partial
500+
# .out that future callers would treat as a cache hit.
501+
tmp_path = output_path + ".tmp"
502+
with open(tmp_path, "w") as f:
503+
f.write(results)
504+
os.replace(tmp_path, output_path)
505+
finally:
506+
fcntl.flock(lock_file, fcntl.LOCK_UN)
439507
logger.debug("NeuroSIM output:\n" + results)
440508
self.comps = [
441509
Component(line) for line in results.split("\n") if "<COMPONENT>" in line
@@ -476,9 +544,6 @@ def read_pipe_thread(pipe, write_to: list):
476544
" config and make sure all values are populated."
477545
)
478546

479-
# Remove the config file
480-
os.remove(inputpath)
481-
482547
def get_components(self, read: bool, hi: bool) -> List[Component]:
483548
"""Returns a list of components matching the criteria"""
484549
comps = [c for c in self.comps if c.read == read]

0 commit comments

Comments
 (0)