Skip to content

Commit 0abb36b

Browse files
Kasper JungeRalphify
authored andcommitted
fix: enforce --timeout across stdin delivery via writer thread
Move proc.stdin.write(prompt) to a background daemon thread in all three agent execution paths (streaming, blocking-inherit, and blocking-capture). Previously, a large prompt combined with an agent that never reads stdin would block the main thread in write() forever, silently preventing proc.wait(timeout=...) from ever being reached. Now the main thread proceeds directly to proc.wait(timeout=...). When timeout fires, _kill_process_group terminates the child, which closes the pipe and unblocks the writer thread with BrokenPipeError (already swallowed by _deliver_prompt). Task: critical-03-stdin-write-timeout.md Co-authored-by: Ralphify <noreply@ralphify.co>
1 parent 43fea10 commit 0abb36b

3 files changed

Lines changed: 161 additions & 5 deletions

File tree

src/ralphify/_agent.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def _run_agent_streaming(
287287
start = time.monotonic()
288288
deadline = (start + timeout) if timeout is not None else None
289289

290+
writer_thread: threading.Thread | None = None
290291
stderr_lines: list[str] = []
291292
stderr_thread: threading.Thread | None = None
292293

@@ -311,7 +312,15 @@ def _run_agent_streaming(
311312
proc.stderr, stderr_lines, _STDERR, on_output_line
312313
)
313314

314-
_deliver_prompt(proc, prompt)
315+
# Deliver the prompt on a background thread so that a blocked write
316+
# (child not reading stdin, pipe buffer full) cannot prevent
317+
# proc.wait / deadline checks from firing. Killing the process
318+
# group unblocks the write with BrokenPipeError, which
319+
# _deliver_prompt already swallows.
320+
writer_thread = threading.Thread(
321+
target=_deliver_prompt, args=(proc, prompt), daemon=True
322+
)
323+
writer_thread.start()
315324

316325
stream = _read_agent_stream(proc.stdout, deadline, on_activity, on_output_line)
317326

@@ -320,7 +329,7 @@ def _run_agent_streaming(
320329
proc.wait()
321330
finally:
322331
_ensure_process_dead(proc)
323-
_drain_readers(stderr_thread)
332+
_drain_readers(stderr_thread, writer_thread)
324333

325334
log_file = _write_log(
326335
log_path_dir, iteration, "".join(stream.stdout_lines), "".join(stderr_lines)
@@ -449,6 +458,7 @@ def _run_agent_blocking(
449458
# pipes ralph's output (e.g. ``ralph run | cat``).
450459
returncode: int | None = None
451460
timed_out = False
461+
writer_thread: threading.Thread | None = None
452462

453463
proc = subprocess.Popen(
454464
cmd,
@@ -460,7 +470,10 @@ def _run_agent_blocking(
460470
if proc.stdin is None:
461471
raise RuntimeError("subprocess.Popen failed to create PIPE stdin")
462472

463-
_deliver_prompt(proc, prompt)
473+
writer_thread = threading.Thread(
474+
target=_deliver_prompt, args=(proc, prompt), daemon=True
475+
)
476+
writer_thread.start()
464477

465478
try:
466479
returncode = proc.wait(timeout=timeout)
@@ -472,6 +485,7 @@ def _run_agent_blocking(
472485
raise
473486
finally:
474487
_ensure_process_dead(proc)
488+
_drain_readers(writer_thread)
475489

476490
return AgentResult(
477491
returncode=None if timed_out else returncode,
@@ -486,6 +500,7 @@ def _run_agent_blocking(
486500
# the callback alone observes them, avoiding unbounded memory growth.
487501
returncode = None
488502
timed_out = False
503+
writer_thread: threading.Thread | None = None
489504
stdout_lines: list[str] | None = [] if log_path_dir is not None else None
490505
stderr_lines: list[str] | None = [] if log_path_dir is not None else None
491506
stdout_thread: threading.Thread | None = None
@@ -510,7 +525,10 @@ def _run_agent_blocking(
510525
proc.stderr, stderr_lines, _STDERR, on_output_line
511526
)
512527

513-
_deliver_prompt(proc, prompt)
528+
writer_thread = threading.Thread(
529+
target=_deliver_prompt, args=(proc, prompt), daemon=True
530+
)
531+
writer_thread.start()
514532

515533
try:
516534
returncode = proc.wait(timeout=timeout)
@@ -522,7 +540,7 @@ def _run_agent_blocking(
522540
raise
523541
finally:
524542
_ensure_process_dead(proc)
525-
_drain_readers(stdout_thread, stderr_thread)
543+
_drain_readers(stdout_thread, stderr_thread, writer_thread)
526544

527545
stdout = "".join(stdout_lines) if stdout_lines is not None else None
528546
stderr = "".join(stderr_lines) if stderr_lines is not None else None
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Critical 03 — `stdin.write` timeout enforcement
2+
3+
**Original finding:** C3
4+
**Severity:** Critical — `--timeout` silently stops enforcing in a realistic case
5+
**Files:** `src/ralphify/_agent.py`
6+
7+
## Problem
8+
9+
The blocking path is now:
10+
11+
```python
12+
# src/ralphify/_agent.py ~396
13+
try:
14+
proc.stdin.write(prompt)
15+
except BrokenPipeError:
16+
pass
17+
finally:
18+
try:
19+
proc.stdin.close()
20+
except BrokenPipeError:
21+
pass
22+
23+
try:
24+
returncode = proc.wait(timeout=timeout)
25+
except subprocess.TimeoutExpired:
26+
...
27+
```
28+
29+
The streaming path at `:283-288` has the same shape. **`proc.stdin.write(prompt)` has no timeout.** The pre-refactor code used `proc.communicate(input=prompt, timeout=timeout)`, which enforced the user's `--timeout` across both stdin delivery AND wait. The new split design only enforces it on `wait`.
30+
31+
## Why it matters
32+
33+
Concrete failure case: an agent that starts up, blocks on a network call before reading stdin, eventually decides to exit. The sequence:
34+
35+
1. Parent spawns child, starts reader threads for stdout/stderr.
36+
2. Parent calls `proc.stdin.write(prompt)`. Prompt is, say, 120 KB.
37+
3. Pipe buffer (~64 KB on Linux) fills. `write` blocks waiting for the child to drain its stdin.
38+
4. Child is stuck in a `requests.get(...)` call with no outer timeout. It never reads stdin.
39+
5. Parent is now blocked in `write` forever. `proc.wait(timeout=…)` is never reached. The user's `--timeout 60` does nothing.
40+
41+
This is exactly the hang `ralph run --timeout` is supposed to prevent. The feature flag is a lie in this code path.
42+
43+
Secondary concern: even a well-behaved agent that is momentarily slow to read stdin (busy importing modules, JIT warm-up, etc.) briefly blocks the main thread in `write`. Normally fine, but combined with `critical-04`'s unbounded joins, any extra blocking on the main thread compounds the hang surface.
44+
45+
## Fix direction
46+
47+
You have three reasonable options. Pick one based on complexity vs fidelity tradeoff.
48+
49+
### Option A — Write on a background thread with a deadline (recommended)
50+
51+
Spawn a short-lived writer thread alongside the reader threads:
52+
53+
```python
54+
write_error: list[BaseException] = []
55+
def _write():
56+
try:
57+
proc.stdin.write(prompt)
58+
except BaseException as exc:
59+
write_error.append(exc)
60+
finally:
61+
try:
62+
proc.stdin.close()
63+
except BrokenPipeError:
64+
pass
65+
66+
writer = threading.Thread(target=_write, daemon=True)
67+
writer.start()
68+
69+
try:
70+
returncode = proc.wait(timeout=timeout)
71+
except subprocess.TimeoutExpired:
72+
_kill_process_group(proc)
73+
...
74+
writer.join(timeout=1.0)
75+
```
76+
77+
Pros: minimal change, same timeout applies to the whole stdin-delivery-plus-wait pipeline because killing the process group makes the writer's blocked `write` return with `BrokenPipeError`, which is already swallowed.
78+
79+
Cons: one extra thread per agent invocation (cheap).
80+
81+
### Option B — Revert to `proc.communicate`
82+
83+
`communicate` enforces the timeout across everything but spawns its own reader threads that conflict with ours. You'd have to drop the pre-spawned readers and lose the "start readers before writing stdin" property that fixed the original deadlock. **Do not use this option** — it reintroduces the bug the refactor was supposed to fix.
84+
85+
### Option C — Non-blocking write + `select`
86+
87+
Put stdin in non-blocking mode, loop on `select.select([], [stdin], [], remaining_deadline)`, write chunks. Maximally correct, fairly invasive, and Windows doesn't support `select` on pipes.
88+
89+
**Use Option A.**
90+
91+
## Done when
92+
93+
- [ ] Both `_run_agent_blocking` and `_run_agent_streaming` use the writer-thread pattern (or equivalent that enforces `timeout` across stdin delivery).
94+
- [ ] The writer thread is joined with a bounded timeout in `finally`.
95+
- [ ] New test `tests/test_agent.py::test_timeout_enforced_when_agent_does_not_read_stdin` — launch a real Python subprocess that reads zero bytes from stdin and sleeps for 30 seconds, pass a `timeout=1.0`, assert the call returns within ~3 seconds with `timed_out=True`. Use a prompt large enough (>64 KB on Linux, >8 KB on macOS) to fill the pipe buffer.
96+
- [ ] Existing `test_large_prompt_with_concurrent_stderr_does_not_deadlock` still passes (large prompt + chatty child is still fine).
97+
- [ ] `uv run pytest`, lint, format, ty check all pass.
98+
99+
## Context
100+
101+
- `SUBPROCESS_TEXT_KWARGS` in `src/ralphify/_output.py` opens stdin in text mode with a text encoder, so chunk writes mid-string are fine but partial-surrogate edges can corrupt encoding — just write the whole prompt in one `write` call.
102+
- `_kill_process_group(proc)` on POSIX sends SIGTERM to the group, waits briefly, then SIGKILL. The child's death closes the pipe on its end, which unblocks the parent's blocked `write` with `BrokenPipeError`. That's why Option A's error handling is mostly "swallow `BrokenPipeError`."
103+
- On Windows, `_kill_process_group` falls back to `proc.kill()` (just the direct child), and `start_new_session` is not set. Grandchild processes can still hold the pipe — but that's `critical-04`'s problem, not this one.
104+
- The writer thread must be daemon=True so it cannot block interpreter shutdown, but it must be joined on the main thread's exit path so the caller sees any write errors it should care about (practically: `BrokenPipeError` is expected and swallowed; other errors you might want to surface).
105+
- The existing `BrokenPipeError` swallow is still needed after the move — the writer thread will hit it when the child exits early.

tests/test_agent.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import signal
66
import subprocess
77
import sys
8+
import time
89
from unittest.mock import MagicMock, patch
910

1011
import pytest
@@ -835,6 +836,38 @@ def test_streaming_large_stderr_drained_concurrently(self, tmp_path):
835836
assert result.timed_out is False
836837
assert result.result_text == "ok"
837838

839+
def test_timeout_enforced_when_agent_does_not_read_stdin(self, tmp_path):
840+
"""If the agent never reads stdin, --timeout must still fire.
841+
842+
Before the writer-thread fix, proc.stdin.write(prompt) blocked on
843+
the main thread when the OS pipe buffer was full, and
844+
proc.wait(timeout=...) was never reached — so --timeout silently
845+
did nothing.
846+
847+
Uses a prompt large enough to fill the pipe buffer (64 KB on
848+
Linux, ~8 KB on macOS) so the write would block if it were on
849+
the main thread.
850+
"""
851+
# Agent that never reads stdin and sleeps for 30 seconds.
852+
script = "import time; time.sleep(30)"
853+
# Prompt larger than OS pipe buffer on any platform.
854+
large_prompt = "x" * 200_000
855+
856+
start = time.monotonic()
857+
result = _run_agent_blocking(
858+
[sys.executable, "-c", script],
859+
prompt=large_prompt,
860+
timeout=2.0,
861+
log_path_dir=tmp_path,
862+
iteration=1,
863+
)
864+
elapsed = time.monotonic() - start
865+
866+
assert result.timed_out is True
867+
assert result.returncode is None
868+
# Must complete well before the child's 30-second sleep finishes.
869+
assert elapsed < 15.0
870+
838871

839872
class TestBlockingInheritPath:
840873
"""Tests for the fd-inheritance path in _run_agent_blocking.

0 commit comments

Comments
 (0)