Skip to content

Commit e9fe9b6

Browse files
Kasper JungeRalphify
authored andcommitted
refactor: extract _deliver_prompt and _ensure_process_dead helpers in _agent.py
The stdin delivery pattern (write + close with BrokenPipeError handling) and process cleanup pattern (poll + kill + wait) were duplicated across _run_agent_streaming and _run_agent_blocking. Extracted into shared helpers and replaced the magic timeout=1.0 with _THREAD_JOIN_TIMEOUT. Also made test mocks more resilient by using itertools.chain for wait() side effects, matching the existing poll() pattern. Co-authored-by: Ralphify <noreply@ralphify.co>
1 parent 66d22d7 commit e9fe9b6

4 files changed

Lines changed: 391 additions & 67 deletions

File tree

src/ralphify/_agent.py

Lines changed: 158 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import signal
2121
import subprocess
2222
import sys
23+
import threading
2324
import time
2425
from collections.abc import Callable
2526
from dataclasses import dataclass
2627
from datetime import datetime, timezone
2728
from pathlib import Path
2829
from typing import IO, Any
2930

31+
from ralphify._events import OutputStream
3032
from ralphify._output import (
3133
IS_WINDOWS,
3234
SESSION_KWARGS,
@@ -36,6 +38,11 @@
3638
ensure_str,
3739
)
3840

41+
# Typed constants for the OutputStream literal so the type checker enforces
42+
# that only "stdout" / "stderr" ever reach ``on_output_line``.
43+
_STDOUT: OutputStream = "stdout"
44+
_STDERR: OutputStream = "stderr"
45+
3946
# Agent binary name that supports --output-format stream-json.
4047
_CLAUDE_BINARY = "claude"
4148

@@ -55,6 +62,9 @@
5562
# Seconds to wait for graceful shutdown after SIGTERM before escalating to SIGKILL.
5663
_SIGTERM_GRACE_PERIOD = 3
5764

65+
# Seconds to wait for reader threads to drain during cleanup.
66+
_THREAD_JOIN_TIMEOUT = 1.0
67+
5868

5969
def _try_graceful_group_kill(proc: subprocess.Popen[Any]) -> bool:
6070
"""Attempt to kill the process via its POSIX process group.
@@ -105,6 +115,37 @@ def _kill_process_group(proc: subprocess.Popen[Any]) -> None:
105115
proc.kill()
106116

107117

118+
def _ensure_process_dead(proc: subprocess.Popen[Any]) -> None:
119+
"""Kill the agent process if still running, then wait for exit.
120+
121+
Safe to call multiple times — no-ops when the process has already
122+
exited. Used in ``finally`` and exception-handler blocks to
123+
guarantee the child is reaped before we move on.
124+
"""
125+
if proc.poll() is None:
126+
_kill_process_group(proc)
127+
proc.wait()
128+
129+
130+
def _deliver_prompt(proc: subprocess.Popen[Any], prompt: str) -> None:
131+
"""Write *prompt* to the agent's stdin and close the pipe.
132+
133+
Silently handles ``BrokenPipeError`` — the agent may exit before
134+
consuming the full prompt, which is a normal (if uncommon) lifecycle
135+
event.
136+
"""
137+
assert proc.stdin is not None
138+
try:
139+
proc.stdin.write(prompt)
140+
except BrokenPipeError:
141+
pass
142+
finally:
143+
try:
144+
proc.stdin.close()
145+
except BrokenPipeError:
146+
pass
147+
148+
108149
@dataclass
109150
class AgentResult(ProcessResult):
110151
"""Result of running the agent subprocess."""
@@ -175,6 +216,7 @@ def _read_agent_stream(
175216
stdout: IO[str],
176217
deadline: float | None,
177218
on_activity: Callable[[dict[str, Any]], None] | None,
219+
on_output_line: Callable[[str, OutputStream], None] | None = None,
178220
) -> _StreamResult:
179221
"""Read the agent's JSON stream line-by-line until EOF or timeout.
180222
@@ -194,6 +236,8 @@ def _read_agent_stream(
194236

195237
for line in stdout:
196238
stdout_lines.append(line)
239+
if on_output_line is not None:
240+
on_output_line(line.rstrip("\r\n"), _STDOUT)
197241

198242
stripped = line.strip()
199243
if stripped:
@@ -226,18 +270,26 @@ def _run_agent_streaming(
226270
log_path_dir: Path | None,
227271
iteration: int,
228272
on_activity: Callable[[dict[str, Any]], None] | None = None,
273+
on_output_line: Callable[[str, OutputStream], None] | None = None,
229274
) -> AgentResult:
230275
"""Run the agent subprocess with line-by-line streaming of JSON output.
231276
232277
Used for agents that support ``--output-format stream-json`` (e.g. Claude
233278
Code). Stream processing is delegated to :func:`_read_agent_stream`;
234279
this function owns the subprocess lifecycle (spawn, stdin delivery,
235280
timeout kill, and cleanup via ``try/finally``).
281+
282+
stderr is drained concurrently on a background reader thread so large
283+
stderr volume can't deadlock the child on a full OS pipe buffer while
284+
the main thread is reading stdout.
236285
"""
237286
stream_cmd = cmd + [_OUTPUT_FORMAT_FLAG, _STREAM_FORMAT, _VERBOSE_FLAG]
238287
start = time.monotonic()
239288
deadline = (start + timeout) if timeout is not None else None
240289

290+
stderr_lines: list[str] = []
291+
stderr_thread: threading.Thread | None = None
292+
241293
proc = subprocess.Popen(
242294
stream_cmd,
243295
stdin=subprocess.PIPE,
@@ -252,23 +304,30 @@ def _run_agent_streaming(
252304
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
253305
raise RuntimeError("subprocess.Popen failed to create PIPE streams")
254306

255-
proc.stdin.write(prompt)
256-
proc.stdin.close()
307+
# Start the stderr pump BEFORE writing stdin so large prompts can't
308+
# deadlock against an agent that writes substantial diagnostics to
309+
# stderr while still reading its stdin.
310+
stderr_thread = threading.Thread(
311+
target=_pump_stream,
312+
args=(proc.stderr, stderr_lines, _STDERR, on_output_line),
313+
daemon=True,
314+
)
315+
stderr_thread.start()
316+
317+
_deliver_prompt(proc, prompt)
257318

258-
stream = _read_agent_stream(proc.stdout, deadline, on_activity)
319+
stream = _read_agent_stream(proc.stdout, deadline, on_activity, on_output_line)
259320

260321
if stream.timed_out:
261322
_kill_process_group(proc)
262323
proc.wait()
263-
264-
stderr_data = proc.stderr.read()
265324
finally:
266-
if proc.poll() is None:
267-
_kill_process_group(proc)
268-
proc.wait()
325+
_ensure_process_dead(proc)
326+
if stderr_thread is not None:
327+
stderr_thread.join(timeout=_THREAD_JOIN_TIMEOUT)
269328

270329
log_file = _write_log(
271-
log_path_dir, iteration, "".join(stream.stdout_lines), stderr_data
330+
log_path_dir, iteration, "".join(stream.stdout_lines), "".join(stderr_lines)
272331
)
273332

274333
return AgentResult(
@@ -280,18 +339,41 @@ def _run_agent_streaming(
280339
)
281340

282341

342+
def _pump_stream(
343+
stream: IO[str],
344+
buffer: list[str],
345+
stream_name: OutputStream,
346+
on_output_line: Callable[[str, OutputStream], None] | None,
347+
) -> None:
348+
"""Read *stream* line by line, appending to *buffer* and forwarding to the callback.
349+
350+
Runs on a background thread so stdout and stderr can be drained
351+
concurrently without deadlocking the child subprocess.
352+
"""
353+
for line in iter(stream.readline, ""):
354+
buffer.append(line)
355+
if on_output_line is not None:
356+
on_output_line(line.rstrip("\r\n"), stream_name)
357+
358+
283359
def _run_agent_blocking(
284360
cmd: list[str],
285361
prompt: str,
286362
timeout: float | None,
287363
log_path_dir: Path | None,
288364
iteration: int,
365+
on_output_line: Callable[[str, OutputStream], None] | None = None,
289366
) -> AgentResult:
290-
"""Run the agent subprocess, optionally write logs, and return the result.
367+
"""Run the agent subprocess, line-streaming its output, and return the result.
368+
369+
stdout and stderr are always captured and drained by background reader
370+
threads so that callers can observe output live (for the peek feature)
371+
while still preserving the full buffered output for log writing.
291372
292-
When *log_path_dir* is set, output is captured, written to a log file,
293-
then echoed to stdout/stderr so the user still sees it live. When unset,
294-
output streams directly to the terminal (no capture overhead).
373+
Reader threads are started **before** the prompt is written to stdin so
374+
that an agent which writes a large burst of output before consuming the
375+
full prompt cannot deadlock us on a full OS pipe buffer (the classic
376+
writer-reader deadlock on prompts larger than the ~64 KB pipe buffer).
295377
296378
The subprocess is started in its own process group so that on
297379
``KeyboardInterrupt`` or timeout the entire child tree can be killed
@@ -303,36 +385,70 @@ def _run_agent_blocking(
303385
start = time.monotonic()
304386
returncode: int | None = None
305387
timed_out = False
306-
stdout: str | bytes | None = None
307-
stderr: str | bytes | None = None
308-
capture = log_path_dir is not None
388+
stdout_lines: list[str] = []
389+
stderr_lines: list[str] = []
390+
stdout_thread: threading.Thread | None = None
391+
stderr_thread: threading.Thread | None = None
309392

310393
proc = subprocess.Popen(
311394
cmd,
312395
stdin=subprocess.PIPE,
313-
stdout=subprocess.PIPE if capture else None,
314-
stderr=subprocess.PIPE if capture else None,
396+
stdout=subprocess.PIPE,
397+
stderr=subprocess.PIPE,
315398
**SUBPROCESS_TEXT_KWARGS,
316399
**SESSION_KWARGS,
317400
)
318401
try:
319-
stdout, stderr = proc.communicate(input=prompt, timeout=timeout)
320-
returncode = proc.returncode
321-
except subprocess.TimeoutExpired:
322-
_kill_process_group(proc)
323-
stdout, stderr = proc.communicate()
324-
timed_out = True
402+
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
403+
raise RuntimeError("subprocess.Popen failed to create PIPE streams")
404+
405+
stdout_thread = threading.Thread(
406+
target=_pump_stream,
407+
args=(proc.stdout, stdout_lines, _STDOUT, on_output_line),
408+
daemon=True,
409+
)
410+
stderr_thread = threading.Thread(
411+
target=_pump_stream,
412+
args=(proc.stderr, stderr_lines, _STDERR, on_output_line),
413+
daemon=True,
414+
)
415+
stdout_thread.start()
416+
stderr_thread.start()
417+
418+
_deliver_prompt(proc, prompt)
419+
420+
try:
421+
returncode = proc.wait(timeout=timeout)
422+
except subprocess.TimeoutExpired:
423+
_ensure_process_dead(proc)
424+
timed_out = True
425+
stdout_thread.join()
426+
stderr_thread.join()
325427
except KeyboardInterrupt:
326-
_kill_process_group(proc)
327-
proc.wait()
428+
_ensure_process_dead(proc)
429+
# Drain reader threads before re-raising so daemon threads don't
430+
# race the main thread's teardown and leave the pipes half-read.
431+
if stdout_thread is not None:
432+
stdout_thread.join(timeout=_THREAD_JOIN_TIMEOUT)
433+
if stderr_thread is not None:
434+
stderr_thread.join(timeout=_THREAD_JOIN_TIMEOUT)
328435
raise
436+
finally:
437+
_ensure_process_dead(proc)
438+
439+
stdout = "".join(stdout_lines)
440+
stderr = "".join(stderr_lines)
329441

330442
log_file = _write_log(log_path_dir, iteration, stdout, stderr)
331-
if capture:
443+
# When logging is enabled, output is diverted into the log file; echo it
444+
# so the user still sees what ran. When logging is disabled, live peek
445+
# (if enabled) has already shown the lines as they arrived — echoing here
446+
# would double every line.
447+
if log_path_dir is not None:
332448
_echo_output(stdout, stderr)
333449

334450
return AgentResult(
335-
returncode=returncode,
451+
returncode=None if timed_out else returncode,
336452
elapsed=time.monotonic() - start,
337453
log_file=log_file,
338454
timed_out=timed_out,
@@ -347,12 +463,15 @@ def execute_agent(
347463
log_path_dir: Path | None,
348464
iteration: int,
349465
on_activity: Callable[[dict[str, Any]], None] | None = None,
466+
on_output_line: Callable[[str, OutputStream], None] | None = None,
350467
) -> AgentResult:
351468
"""Run the agent subprocess, auto-selecting streaming or blocking mode.
352469
353470
Uses streaming mode for agents that support ``--output-format stream-json``
354-
(e.g. Claude Code); all other agents use the blocking ``subprocess.run``
355-
path. The *on_activity* callback is only invoked in streaming mode.
471+
(e.g. Claude Code); all other agents use the blocking path that drains
472+
stdout and stderr via reader threads. The *on_activity* callback is
473+
only invoked in streaming mode; *on_output_line* fires for both modes
474+
as raw lines arrive.
356475
357476
This is the single entry point the engine should use — callers don't need
358477
to know which execution mode is selected.
@@ -365,5 +484,13 @@ def execute_agent(
365484
log_path_dir,
366485
iteration,
367486
on_activity=on_activity,
487+
on_output_line=on_output_line,
368488
)
369-
return _run_agent_blocking(cmd, prompt, timeout, log_path_dir, iteration)
489+
return _run_agent_blocking(
490+
cmd,
491+
prompt,
492+
timeout,
493+
log_path_dir,
494+
iteration,
495+
on_output_line=on_output_line,
496+
)

0 commit comments

Comments
 (0)