Skip to content

Commit 1e7bd2e

Browse files
committed
fifo doesn't work
1 parent 6d0c57e commit 1e7bd2e

2 files changed

Lines changed: 49 additions & 39 deletions

File tree

executor/executor.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import logging
44
import subprocess
55
import os
6-
import io
76

87
from dataclasses import dataclass
98
from executor_util import ptempfile, ptempdir, create_sandbox, copy, PASH_SPEC_TOP
@@ -12,9 +11,7 @@
1211
@dataclass
1312
class ExecCtxt:
1413
process: subprocess.Popen
15-
trace_file: str # base path; .r and .w are FIFOs, .missed is written at exit
16-
read_fifo: io.IOBase # open read-end of trace_file + ".r"
17-
write_fifo: io.IOBase # open read-end of trace_file + ".w"
14+
trace_file: str # base path; .r/.w are streaming files, .missed written at exit
1815
outfds: str
1916
stderr: str
2017
pre_env_file: str
@@ -58,17 +55,6 @@ def run_trace_sandboxed(args: ExecArgs):
5855
logging.debug(f'Scheduler: Stdout file for: {args.concrete_node_id} is: {outfiles_dir}')
5956
logging.debug(f'Scheduler: Stderr file for: {args.concrete_node_id} is: {stderr_file}')
6057

61-
# Create FIFOs on the host before the sandbox starts so they land in the
62-
# OverlayFS lower layer and trace_v3 (running inside) can open them for writing.
63-
# Open the read ends now with O_NONBLOCK so trace_v3's open() doesn't block.
64-
read_fifo_path = trace_file + '.r'
65-
write_fifo_path = trace_file + '.w'
66-
os.mkfifo(read_fifo_path)
67-
os.mkfifo(write_fifo_path)
68-
_nonblock_open = lambda path, flags: os.open(path, flags | os.O_NONBLOCK)
69-
read_fifo = open(read_fifo_path, 'r', opener=_nonblock_open)
70-
write_fifo = open(write_fifo_path, 'r', opener=_nonblock_open)
71-
7258
sandbox_dir, tmp_dir = create_sandbox()
7359
post_execution_env_file = ptempfile(prefix='hs_post_env')
7460
lower_dirs_str = ':'.join(args.lower_sandboxes)
@@ -78,7 +64,7 @@ def run_trace_sandboxed(args: ExecArgs):
7864
logging.debug(cmd)
7965
process = subprocess.Popen(cmd, stdout=None, stderr=None, preexec_fn=set_pgid)
8066

81-
return ExecCtxt(process, trace_file, read_fifo, write_fifo, outfiles_dir, stderr_file, args.pre_execution_env_file, post_execution_env_file, sandbox_dir)
67+
return ExecCtxt(process, trace_file, outfiles_dir, stderr_file, args.pre_execution_env_file, post_execution_env_file, sandbox_dir)
8268

8369
def commit_workspace(workspace_path):
8470
run_script = f'{PASH_SPEC_TOP}/deps/try/try'

scheduler/node.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,12 @@ class ConcreteNode:
292292
# Exists when node is in READY
293293
assignments: "list[NodeId]"
294294

295-
# Leftover partial line from each FIFO during incremental reads
296-
read_fifo_buf: str
297-
write_fifo_buf: str
295+
# Open file descriptors for the streaming dep files (opened lazily in gather_fs_actions)
296+
read_stream_fd: object
297+
write_stream_fd: object
298+
# Leftover partial line from each stream file during incremental reads
299+
read_stream_buf: str
300+
write_stream_buf: str
298301

299302
def __init__(self, cnid: ConcreteNodeId, node: Node, loop_list_context: HSLoopListContext,
300303
spec_pre_env=None):
@@ -309,8 +312,7 @@ def __init__(self, cnid: ConcreteNodeId, node: Node, loop_list_context: HSLoopLi
309312
self.spec_pre_env = spec_pre_env
310313
self.loop_list_context = loop_list_context
311314
self.init_loop_list_context = loop_list_context
312-
self.read_fifo_buf = ''
313-
self.write_fifo_buf = ''
315+
self._reset_stream_state()
314316

315317
def __str__(self):
316318
return f'Node(id:{self.id_}, cmd:{self.cmd}, state:{self.state}, wait_env_file:{self.wait_env_file}, exec_ctxt:{self.exec_ctxt})'
@@ -451,12 +453,23 @@ def kill(self):
451453
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]
452454
self.exec_ctxt.process.kill()
453455

454-
def _drain_fifo(self, fifo, buf: str) -> tuple[set, str]:
455-
"""Non-blocking read from a FIFO; return (complete paths, leftover partial line)."""
456+
def _reset_stream_state(self):
457+
self.read_stream_fd = None
458+
self.write_stream_fd = None
459+
self.read_stream_buf = ''
460+
self.write_stream_buf = ''
461+
462+
def _open_stream_fd(self, suffix):
463+
path = util.sandboxed_path(self.exec_ctxt.sandbox_dir,
464+
self.exec_ctxt.trace_file + suffix)
456465
try:
457-
chunk = fifo.read()
458-
except BlockingIOError:
459-
return set(), buf
466+
return open(path, 'r')
467+
except FileNotFoundError:
468+
return None
469+
470+
def _drain_stream(self, fd, buf: str) -> tuple[set, str]:
471+
"""Read new content from fd; return (complete paths, leftover partial line)."""
472+
chunk = fd.read()
460473
if not chunk:
461474
return set(), buf
462475
buf += chunk
@@ -466,8 +479,15 @@ def _drain_fifo(self, fifo, buf: str) -> tuple[set, str]:
466479

467480
def gather_fs_actions(self):
468481
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]
469-
read_paths, self.read_fifo_buf = self._drain_fifo(self.exec_ctxt.read_fifo, self.read_fifo_buf)
470-
write_paths, self.write_fifo_buf = self._drain_fifo(self.exec_ctxt.write_fifo, self.write_fifo_buf)
482+
if self.read_stream_fd is None:
483+
self.read_stream_fd = self._open_stream_fd('.r')
484+
if self.write_stream_fd is None:
485+
self.write_stream_fd = self._open_stream_fd('.w')
486+
read_paths, write_paths = set(), set()
487+
if self.read_stream_fd:
488+
read_paths, self.read_stream_buf = self._drain_stream(self.read_stream_fd, self.read_stream_buf)
489+
if self.write_stream_fd:
490+
write_paths, self.write_stream_buf = self._drain_stream(self.write_stream_fd, self.write_stream_buf)
471491
self.update_rw_set(read_paths, write_paths)
472492

473493
def get_rw_set(self):
@@ -652,8 +672,8 @@ def reset_to_ready(self, spec_pre_env: str = None, loop_list_context: HSLoopList
652672
self.loop_list_context = self.init_loop_list_context
653673
if spec_pre_env is not None:
654674
self.spec_pre_env = spec_pre_env
655-
self.read_fifo_buf = ''
656-
self.write_fifo_buf = ''
675+
self._reset_stream_state()
676+
657677
self.state = NodeState.READY
658678
self.trace_state()
659679

@@ -662,17 +682,17 @@ def start_executing(self, env_file):
662682

663683
self.start_command(env_file)
664684
self.state = NodeState.EXECUTING
665-
self.read_fifo_buf = ''
666-
self.write_fifo_buf = ''
685+
self._reset_stream_state()
686+
667687
self.rwset = RWSet(set(), set())
668688
self.trace_state()
669689

670690
def start_spec_executing(self, env_file, speculated_nodes):
671691
# raise NotImplementedError
672692
assert self.state == NodeState.READY
673693
self.start_command(env_file, speculate=True, speculated_nodes=speculated_nodes)
674-
self.read_fifo_buf = ''
675-
self.write_fifo_buf = ''
694+
self._reset_stream_state()
695+
676696
self.rwset = RWSet(set(), set())
677697
self.state = NodeState.SPEC_EXECUTING
678698
self.trace_state()
@@ -686,8 +706,10 @@ def collect_result(self):
686706
def commit_frontier_execution(self):
687707
assert self.state == NodeState.EXECUTING
688708
self.gather_fs_actions()
689-
self.exec_ctxt.read_fifo.close()
690-
self.exec_ctxt.write_fifo.close()
709+
if self.read_stream_fd:
710+
self.read_stream_fd.close()
711+
if self.write_stream_fd:
712+
self.write_stream_fd.close()
691713
self.update_loop_list_context()
692714
util.overhead_log(f"COMMIT|{self.cnid}")
693715
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
@@ -699,12 +721,14 @@ def commit_frontier_execution(self):
699721
self.trace_state()
700722

701723
def finish_spec_execution(self) -> bool:
702-
"""Drain FIFOs, close them, and return True if trace_v3 missed events."""
724+
"""Drain stream files, close them, and return True if trace_v3 missed events."""
703725
assert self.state == NodeState.SPEC_EXECUTING
704726
self.update_loop_list_context()
705727
self.gather_fs_actions()
706-
self.exec_ctxt.read_fifo.close()
707-
self.exec_ctxt.write_fifo.close()
728+
if self.read_stream_fd:
729+
self.read_stream_fd.close()
730+
if self.write_stream_fd:
731+
self.write_stream_fd.close()
708732
missed = dep_util.read_missed(self.exec_ctxt.sandbox_dir, self.exec_ctxt.trace_file)
709733
self.fixup_fds()
710734
self.state = NodeState.SPECULATED

0 commit comments

Comments
 (0)