Skip to content

Commit 53fe4d9

Browse files
committed
streaming support attempt v2
executor/executor.py - Imported sandboxed_path from executor_util - After create_sandbox(), pre-creates two FIFOs (.r, .w) in {sandbox_dir}/upperdir/{trace_file}.{r,w} — visible inside the sandbox at the same /tmp/... path because try's overlayfs upperdir preserves pre-created files scheduler/node.py - Added import threading - Replaced read_stream_fd/write_stream_fd/read_stream_buf/write_stream_buf fields with _reader_r/_reader_w thread fields - _reset_stream_state() — now joins any live threads with 0.5s timeout before clearing - _run_stream_reader(path, is_write) — opens FIFO blocking (waits for trace_v3), reads line-by-line, updates rwset.read_set or write_set directly (GIL ensures atomicity) - _start_reader_threads() — spawns two daemon threads immediately after start_command() - _join_reader_threads() — joins with 1.0s timeout; called before dep_util.read_missed() - Removed _open_stream_fd, _drain_stream, gather_fs_actions - commit_frontier_execution and finish_spec_execution now call _join_reader_threads() instead of the old drain+close pattern scheduler/partial_program_order.py - Removed fetch_fs_actions() method entirely - has_fs_deps() now calls _has_fs_deps() directly — rwsets are always current via reader threads - eager_fs_killing() no longer needs to poll before checking
1 parent 35836df commit 53fe4d9

6 files changed

Lines changed: 96 additions & 88 deletions

File tree

executor/executor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66

77
from dataclasses import dataclass
8-
from executor_util import ptempfile, ptempdir, create_sandbox, copy, PASH_SPEC_TOP
8+
from executor_util import ptempfile, ptempdir, create_sandbox, copy, PASH_SPEC_TOP, sandboxed_path
99

1010

1111
@dataclass
@@ -56,6 +56,10 @@ def run_trace_sandboxed(args: ExecArgs):
5656
logging.debug(f'Scheduler: Stderr file for: {args.concrete_node_id} is: {stderr_file}')
5757

5858
sandbox_dir, tmp_dir = create_sandbox()
59+
for suffix in ('.r', '.w'):
60+
fifo_path = sandboxed_path(sandbox_dir, trace_file + suffix)
61+
os.makedirs(os.path.dirname(fifo_path), exist_ok=True)
62+
os.mkfifo(fifo_path)
5963
post_execution_env_file = ptempfile(prefix='hs_post_env')
6064
lower_dirs_str = ':'.join(args.lower_sandboxes)
6165
speculate_mode = "speculate" if args.speculate_mode else "standard"

executor/executor_util.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,16 @@ def create_sandbox():
3535
return sdir, tdir
3636

3737

38+
def sandboxed_path(sandbox_dir: str, path: str) -> str:
39+
"""Return the host-side path to a file written by a process inside the sandbox.
40+
41+
try(1) mounts an overlayfs for each root-level directory; writes to /foo/bar
42+
inside the sandbox land in {sandbox_dir}/upperdir/foo/bar on the host.
43+
"""
44+
if sandbox_dir:
45+
return f"{sandbox_dir}/upperdir/{path}"
46+
return path
47+
48+
3849
def copy(path_from, path_to):
3950
shutil.copy(path_from, path_to)

scheduler/node.py

Lines changed: 49 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
from enum import Enum, auto
2-
import logging
32
import os
43
import re
54
import executor
65
from executor import ExecCtxt, ExecResult, ExecArgs
76
import dep_util
87
import util
98
import signal
9+
import threading
1010
from dataclasses import dataclass
1111
from typing import Tuple
12-
from enum import Enum, auto
1312
from pathlib import Path
14-
import util
1513
import analysis
1614

1715
STATE_LOG = '[STATE_LOG] '
@@ -292,12 +290,9 @@ class ConcreteNode:
292290
# Exists when node is in READY
293291
assignments: "list[NodeId]"
294292

295-
# Open file descriptors for the streaming dep files (opened lazily in gather_fs_actions)
296-
read_stream_fd: object
297-
write_stream_fd: object
298-
# Leftover partial line from each stream file during incremental reads
299-
read_stream_buf: str
300-
write_stream_buf: str
293+
# Daemon reader threads — one per FIFO; started after exec_ctxt is set
294+
_reader_r: "threading.Thread"
295+
_reader_w: "threading.Thread"
301296

302297
def __init__(self, cnid: ConcreteNodeId, node: Node, loop_list_context: HSLoopListContext,
303298
spec_pre_env=None):
@@ -454,45 +449,44 @@ def kill(self):
454449
self.exec_ctxt.process.kill()
455450

456451
def _reset_stream_state(self):
457-
self.read_stream_fd = None
458-
self.write_stream_fd = None
459-
self.read_stream_buf = ''
460-
self.write_stream_buf = ''
461-
462-
def _open_stream_fd(self, suffix):
463-
path = util.sandboxed_path(self.exec_ctxt.sandbox_dir,
464-
self.exec_ctxt.trace_file + suffix)
452+
for t in (getattr(self, '_reader_r', None), getattr(self, '_reader_w', None)):
453+
if t is not None and t.is_alive():
454+
t.join(timeout=0.5)
455+
self._reader_r = None
456+
self._reader_w = None
457+
458+
def _run_stream_reader(self, path: str, is_write: bool):
465459
try:
466-
return open(path, 'r')
467-
except FileNotFoundError:
468-
return None
469-
470-
def _drain_stream(self, fd, buf: str) -> tuple[set, str]:
471-
"""Read new content from fd; return (complete paths, leftover partial line)."""
472-
chunk = fd.read()
473-
if not chunk:
474-
return set(), buf
475-
buf += chunk
476-
*complete, buf = buf.split('\n')
477-
paths = {p for line in complete if (p := line.strip()) and not dep_util.should_filter(p)}
478-
return paths, buf
479-
480-
def gather_fs_actions(self):
481-
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]
482-
if self.read_stream_fd is None:
483-
self.read_stream_fd = self._open_stream_fd('.r')
484-
if self.write_stream_fd is None:
485-
self.write_stream_fd = self._open_stream_fd('.w')
486-
read_paths, write_paths = set(), set()
487-
if self.read_stream_fd:
488-
read_paths, self.read_stream_buf = self._drain_stream(self.read_stream_fd, self.read_stream_buf)
489-
if self.write_stream_fd:
490-
write_paths, self.write_stream_buf = self._drain_stream(self.write_stream_fd, self.write_stream_buf)
491-
self.update_rw_set(read_paths, write_paths)
460+
fd = open(path, 'r')
461+
except OSError:
462+
return
463+
try:
464+
for line in fd:
465+
p = line.rstrip('\n')
466+
if p and not dep_util.should_filter(p):
467+
if is_write:
468+
self.rwset.write_set.add(p)
469+
else:
470+
self.rwset.read_set.add(p)
471+
finally:
472+
fd.close()
473+
474+
def _start_reader_threads(self):
475+
sandbox_dir = self.exec_ctxt.sandbox_dir
476+
trace_file = self.exec_ctxt.trace_file
477+
for suffix, is_write, attr in (('.r', False, '_reader_r'), ('.w', True, '_reader_w')):
478+
path = util.sandboxed_path(sandbox_dir, trace_file + suffix)
479+
t = threading.Thread(target=self._run_stream_reader, args=(path, is_write),
480+
daemon=True)
481+
t.start()
482+
setattr(self, attr, t)
483+
484+
def _join_reader_threads(self):
485+
for t in (getattr(self, '_reader_r', None), getattr(self, '_reader_w', None)):
486+
if t is not None:
487+
t.join(timeout=1.0)
492488

493489
def get_rw_set(self):
494-
# if self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]:
495-
# self.gather_fs_actions()
496490
return self.rwset
497491

498492
fd_line = re.compile(r'(\d+) ([rwd]) (\d+) (.+)')
@@ -550,7 +544,7 @@ def parse_env(content):
550544
function_body_lines.append(line)
551545
if line == '}':
552546
inside_function = False
553-
if not current_function in ignore_vars:
547+
if current_function not in ignore_vars:
554548
env_vars[current_function] = '\n'.join(function_body_lines)
555549
function_body_lines = []
556550
return env_vars
@@ -683,17 +677,16 @@ def start_executing(self, env_file):
683677
self.start_command(env_file)
684678
self.state = NodeState.EXECUTING
685679
self._reset_stream_state()
686-
687680
self.rwset = RWSet(set(), set())
681+
self._start_reader_threads()
688682
self.trace_state()
689683

690684
def start_spec_executing(self, env_file, speculated_nodes):
691-
# raise NotImplementedError
692685
assert self.state == NodeState.READY
693686
self.start_command(env_file, speculate=True, speculated_nodes=speculated_nodes)
694687
self._reset_stream_state()
695-
696688
self.rwset = RWSet(set(), set())
689+
self._start_reader_threads()
697690
self.state = NodeState.SPEC_EXECUTING
698691
self.trace_state()
699692

@@ -703,13 +696,11 @@ def collect_result(self):
703696
self.exec_result = ExecResult(self.exec_ctxt.process.returncode, self.exec_ctxt.process.pid)
704697
return self.exec_result.exit_code == 137, self.runtime_finished()
705698

706-
def commit_frontier_execution(self):
699+
def commit_frontier_execution(self) -> int:
700+
"""Commit the frontier node and return the missed-event count from trace_v3."""
707701
assert self.state == NodeState.EXECUTING
708-
self.gather_fs_actions()
709-
if self.read_stream_fd:
710-
self.read_stream_fd.close()
711-
if self.write_stream_fd:
712-
self.write_stream_fd.close()
702+
self._join_reader_threads()
703+
missed = dep_util.read_missed(self.exec_ctxt.sandbox_dir, self.exec_ctxt.trace_file)
713704
self.update_loop_list_context()
714705
util.overhead_log(f"COMMIT|{self.cnid}")
715706
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
@@ -719,16 +710,13 @@ def commit_frontier_execution(self):
719710
self.fixup_fds()
720711
self.state = NodeState.COMMITTED
721712
self.trace_state()
713+
return missed
722714

723715
def finish_spec_execution(self) -> bool:
724-
"""Drain stream files, close them, and return True if trace_v3 missed events."""
716+
"""Join reader threads and return True if trace_v3 missed events."""
725717
assert self.state == NodeState.SPEC_EXECUTING
726718
self.update_loop_list_context()
727-
self.gather_fs_actions()
728-
if self.read_stream_fd:
729-
self.read_stream_fd.close()
730-
if self.write_stream_fd:
731-
self.write_stream_fd.close()
719+
self._join_reader_threads()
732720
missed = dep_util.read_missed(self.exec_ctxt.sandbox_dir, self.exec_ctxt.trace_file)
733721
self.fixup_fds()
734722
self.state = NodeState.SPECULATED

scheduler/partial_program_order.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
from enum import Enum
21
from node import NodeId, Node, CFGEdgeType, ConcreteNodeId, ConcreteNode, HSProg, HSBasicBlock, HSLoopListContext, loop_iters_do_action, get_loop_list_from_env
32
import logging
43
import util
5-
from pathlib import Path
6-
from collections import deque
74
from executor import run_assignment_and_return_env_file
85

96
PROG_LOG = '[PROG_LOG] '
@@ -403,12 +400,6 @@ def adjust_to_be_resolved_dict(self):
403400
def valid(self):
404401
return True
405402

406-
def fetch_fs_actions(self):
407-
for node in self.get_executing_normal_and_spec_nodes():
408-
util.overhead_log(f"TRACE_FETCHING|{node.cnid}")
409-
node.gather_fs_actions()
410-
util.overhead_log(f"TRACE_FETCHING_END|{node.cnid}")
411-
412403
def _has_fs_deps(self, concrete_node_id: ConcreteNodeId):
413404
node_of_interest : ConcreteNode = self.get_concrete_node(concrete_node_id)
414405
for dep_entry in self.to_be_resolved[concrete_node_id]:
@@ -423,10 +414,7 @@ def _has_fs_deps(self, concrete_node_id: ConcreteNodeId):
423414
return True
424415
return False
425416

426-
# TODO: It's currently designed this way to avoid reading trace file all the time
427-
# When we have complex caching code for this we can make this go away
428417
def has_fs_deps(self, concrete_node_id: ConcreteNodeId):
429-
self.fetch_fs_actions()
430418
return self._has_fs_deps(concrete_node_id)
431419

432420
def schedule_spec_work(self, concrete_node_id: ConcreteNodeId):
@@ -444,7 +432,7 @@ def simulate_var_assignments(self, env, assignments: "list[NodeId]"):
444432
return env
445433

446434
def reset_speculation(self):
447-
event_log(f"reset speculation")
435+
event_log("reset speculation")
448436
for cnid in self.spec_exec_order:
449437
self.concrete_nodes[cnid].try_reset_to_ready()
450438
self.spec_exec_order = []
@@ -471,9 +459,12 @@ def handle_complete(self, concrete_node_id: ConcreteNodeId, has_pending_wait: bo
471459
node.transition_from_ready_to_unsafe()
472460
return
473461
if node.is_executing():
474-
node.commit_frontier_execution()
462+
missed = node.commit_frontier_execution()
475463
self.current_loop_list = node.loop_list_context
476464
self.adjust_to_be_resolved_dict()
465+
if missed > 0:
466+
util.debug_log(f"{concrete_node_id} frontier missed {missed} trace events — resetting speculation")
467+
self.reset_speculation()
477468
elif node.is_spec_executing():
478469
if self.has_fs_deps(concrete_node_id):
479470
node.reset_to_ready()
@@ -483,7 +474,8 @@ def handle_complete(self, concrete_node_id: ConcreteNodeId, has_pending_wait: bo
483474
else:
484475
had_missed = node.finish_spec_execution()
485476
if had_missed:
486-
util.debug_log(f"{concrete_node_id} missed trace events — moving to frontier")
477+
util.debug_log(f"{concrete_node_id} spec missed trace events — resetting speculation and moving to frontier")
478+
self.reset_speculation()
487479
node.reset_to_ready(loop_list_context=self.current_loop_list)
488480
node.start_executing(current_env)
489481
elif has_pending_wait:
@@ -529,7 +521,7 @@ def handle_wait(self, concrete_node_id: ConcreteNodeId, env_file: str):
529521
else:
530522
self.reset_speculation()
531523

532-
if not concrete_node_id in self.concrete_nodes:
524+
if concrete_node_id not in self.concrete_nodes:
533525
abstract_node = self.hsprog.find_node(concrete_node_id.node_id)
534526
new_concrete_node = ConcreteNode(concrete_node_id, abstract_node, self.current_loop_list)
535527
new_concrete_node.transition_from_init_to_ready(env_file)
@@ -593,7 +585,6 @@ def handle_wait(self, concrete_node_id: ConcreteNodeId, env_file: str):
593585
def eager_fs_killing(self):
594586
event_log("try to eagerly kill conflicted speculation")
595587
to_be_killed: "list[ConcreteNode]" = []
596-
self.fetch_fs_actions()
597588
for node in self.get_all_nodes():
598589
if ((node.is_speculated() or node.is_spec_executing())
599590
and self._has_fs_deps(node.cnid)):

scheduler/scheduler_server.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
import util
1111
import config
12-
from partial_program_order import PartialProgramOrder, NodeId
13-
from node import LoopStack, ConcreteNodeId
12+
from partial_program_order import NodeId
13+
from node import ConcreteNodeId
1414

1515
##
1616
## A scheduler server
@@ -110,14 +110,20 @@ def handle_wait(self, input_cmd: str, connection):
110110
util.debug_log(f'ignoring var assignment {concrete_node_id}')
111111
self.respond_to_wait_on_unsafe(concrete_node_id)
112112

113-
def process_next_cmd(self):
114-
connection, input_cmd = util.socket_get_next_cmd(self.socket)
113+
def process_next_cmd(self, timeout=None):
114+
if timeout is not None:
115+
result = util.socket_try_get_next_cmd(self.socket, timeout)
116+
if result is None:
117+
return
118+
connection, input_cmd = result
119+
else:
120+
connection, input_cmd = util.socket_get_next_cmd(self.socket)
115121

116122
if(input_cmd.startswith("Init")):
117123
connection.close()
118124
self.handle_init(input_cmd)
119125
elif (input_cmd.startswith("Daemon Start") or input_cmd == ""):
120-
util.debug_log(f'Scheduler: Received daemon start message.')
126+
util.debug_log('Scheduler: Received daemon start message.')
121127
connection.close()
122128
elif (input_cmd.startswith("CommandExecComplete:")):
123129
node_id, exec_id, sandbox_dir, trace_file = self.__parse_command_exec_x(input_cmd)
@@ -195,6 +201,8 @@ def __parse_command_exec_x(self, input_cmd: str) -> "tuple[int, int]":
195201
def schedule_work(self):
196202
self.partial_program_order.try_schedule_spec_nodes(self.window)
197203

204+
_POLL_INTERVAL = 0.010 # 10 ms — drain trace streams even when no message arrives
205+
198206
def run(self):
199207
## The first command should be the daemon start
200208
self.process_next_cmd()
@@ -204,7 +212,7 @@ def run(self):
204212

205213
self.partial_program_order.log_state()
206214
while not self.done:
207-
self.process_next_cmd()
215+
self.process_next_cmd(timeout=self._POLL_INTERVAL)
208216
self.partial_program_order.log_state()
209217
self.schedule_work()
210218
self.partial_program_order.log_state()

scheduler/util.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
import config
22
import logging
33
import os
4+
import select
45
import socket
5-
import subprocess
66
import tempfile
77
import time
88
import re
99
import psutil
1010
import signal
1111
import analysis
1212
import shutil
13-
from node import Node, NodeId, LoopStack, HSProg, HSBasicBlock
13+
from node import Node, NodeId, HSProg
1414
from partial_program_order import PartialProgramOrder
15-
from config import PASH_SPEC_TMP_PREFIX
1615

1716
DEBUG_LOG = '[DEBUG_LOG] '
1817
ENV_LOG = '[ENV_LOG] '
@@ -110,6 +109,13 @@ def socket_get_next_cmd(sock: socket.socket) -> "tuple[socket.socket, str]" :
110109

111110
return (connection, str_data)
112111

112+
def socket_try_get_next_cmd(sock: socket.socket, timeout: float) -> "tuple[socket.socket, str] | None":
113+
"""Like socket_get_next_cmd but returns None if no connection arrives within timeout seconds."""
114+
ready, _, _ = select.select([sock], [], [], timeout)
115+
if not ready:
116+
return None
117+
return socket_get_next_cmd(sock)
118+
113119
def socket_respond(connection: socket.socket, message: str):
114120
bytes_message = message.encode('utf-8')
115121
connection.sendall(bytes_message)

0 commit comments

Comments
 (0)