Skip to content

Commit f932017

Browse files
Kasper JungeRalphify
authored andcommitted
fix: contain exceptions in _pump_stream to prevent thread death
Wrap the readline loop in _pump_stream with two layers of exception handling: callback exceptions (on_output_line) are caught per-line so draining never stops, and ValueError/OSError from readline (concurrent pipe close) cause a clean exit instead of an unhandled thread crash. Without this, a single raising callback would kill the daemon thread silently, causing the subprocess to block on a full pipe buffer and either hang forever or spuriously time out. Task: critical-02-pump-stream-exception-handling.md Co-authored-by: Ralphify <noreply@ralphify.co>
1 parent 91d8e4a commit f932017

3 files changed

Lines changed: 236 additions & 5 deletions

File tree

src/ralphify/_agent.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,27 @@ def _pump_stream(
349349
350350
Runs on a background thread so stdout and stderr can be drained
351351
concurrently without deadlocking the child subprocess.
352+
353+
Exception handling:
354+
355+
- **Callback exceptions** are caught per-line so that a raising
356+
callback never kills the drain loop. The line is still buffered.
357+
- **``ValueError`` / ``OSError``** from ``readline`` (e.g. the pipe
358+
was closed concurrently) cause a clean exit so ``join()`` returns.
352359
"""
353-
for line in iter(stream.readline, ""):
354-
if buffer is not None:
355-
buffer.append(line)
356-
if on_output_line is not None:
357-
on_output_line(line.rstrip("\r\n"), stream_name)
360+
try:
361+
for line in iter(stream.readline, ""):
362+
if buffer is not None:
363+
buffer.append(line)
364+
if on_output_line is not None:
365+
try:
366+
on_output_line(line.rstrip("\r\n"), stream_name)
367+
except Exception:
368+
# Callback is best-effort; draining must not stop.
369+
pass
370+
except (ValueError, OSError):
371+
# Pipe closed concurrently — exit cleanly so join() returns.
372+
pass
358373

359374

360375
def _start_pump_thread(
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Critical 02 — `_pump_stream` exception containment
2+
3+
**Original findings:** C2 (raising callback wedges subprocess) + M9 (`ValueError` on closed file)
4+
**Severity:** Critical — a single raising callback hangs or spuriously times out the agent
5+
**Files:** `src/ralphify/_agent.py`
6+
7+
## Problem
8+
9+
`_pump_stream` is the reader-thread body used by both the blocking and streaming paths to drain stdout/stderr line-by-line. Its entire body is:
10+
11+
```python
12+
# src/ralphify/_agent.py:320-334
13+
def _pump_stream(
14+
stream: IO[str],
15+
buffer: list[str],
16+
stream_name: OutputStream,
17+
on_output_line: Callable[[str, OutputStream], None] | None,
18+
) -> None:
19+
for line in iter(stream.readline, ""):
20+
buffer.append(line)
21+
if on_output_line is not None:
22+
on_output_line(line.rstrip("\r\n"), stream_name)
23+
```
24+
25+
Zero exception handling. Two failure modes:
26+
27+
1. **Callback raises** (`on_output_line` is a lambda that goes through emitter → event construction → Rich print → tty write). Any raise anywhere in that chain — queue-full in a `QueueEmitter`, Rich markup bug, `UnicodeEncodeError` on a weird line, buggy user handler in the Python API — propagates out of the daemon thread. The thread dies silently (daemon thread exceptions are not surfaced), the pipe stops draining, the child fills its OS pipe buffer, and blocks.
28+
29+
2. **`readline` raises** `ValueError("I/O operation on closed file")` if the main thread closes the pipe concurrently (GC, explicit cleanup, `with` block exit). Thread dies, log gets truncated, no warning.
30+
31+
## Why it matters
32+
33+
After the thread dies:
34+
35+
- `proc.wait(timeout=timeout)` either hangs forever (when `timeout is None`) or spuriously times out (the user is told the agent timed out when it actually finished fine, but the output never reached Python).
36+
- The logged output is silently truncated — tail of the iteration is lost, the user has no way to know.
37+
- On CI where the agent produces JSON events consumed by downstream tooling, a single malformed line permanently breaks that run.
38+
39+
Because the feature is on by default in any TTY, this is a live footgun for anyone with a Python-API integration that raises from a handler.
40+
41+
## Fix direction
42+
43+
Wrap the loop so (a) callback exceptions never kill draining, (b) `readline` exceptions let the thread exit cleanly:
44+
45+
```python
46+
def _pump_stream(stream, buffer, stream_name, on_output_line):
47+
try:
48+
for line in iter(stream.readline, ""):
49+
buffer.append(line)
50+
if on_output_line is not None:
51+
try:
52+
on_output_line(line.rstrip("\r\n"), stream_name)
53+
except Exception:
54+
# Callback is best-effort; draining must not stop.
55+
# Consider sys.excepthook for visibility in dev.
56+
pass
57+
except (ValueError, OSError):
58+
# Pipe closed concurrently — exit cleanly so join() returns.
59+
pass
60+
```
61+
62+
Two design choices worth thinking through:
63+
64+
- **Should callback exceptions be logged?** Silent `pass` hides real bugs. `sys.excepthook` on a dedicated `sys.exc_info()` call surfaces to stderr in dev but pollutes CI logs. A conservative middle ground: log to a module-level counter and emit one warning at the end of the run if any occurred.
65+
- **Should buffering continue after `readline` fails?** No — if the pipe is closed the fd is gone; there's nothing to read. Exit the loop.
66+
67+
## Done when
68+
69+
- [ ] Both try/except layers are in place.
70+
- [ ] New test `tests/test_agent.py::test_pump_stream_continues_when_callback_raises` — run a real subprocess, pass a callback that raises on the first line, assert that all subsequent lines are still captured in `buffer` and the subprocess exits cleanly.
71+
- [ ] New test `tests/test_agent.py::test_pump_stream_exits_cleanly_on_closed_stream` — start `_pump_stream` on a pipe, close the read end, assert the thread exits within a short bounded join.
72+
- [ ] No regression in existing `_pump_stream` tests.
73+
- [ ] `uv run pytest`, `uv run ruff check .`, `uv run ruff format --check .`, `uv run ty check` all pass.
74+
75+
## Context
76+
77+
- `_pump_stream` is called from both `_run_agent_blocking` (two threads) and `_run_agent_streaming` (one thread, stderr only). All three call sites benefit from this fix.
78+
- Daemon threads (`daemon=True`) do not surface exceptions; see `threading.excepthook` (Python 3.8+) if you want a module-level install instead of per-call try/except. Per-call is more localized and preferred here.
79+
- `buffer.append(line)` is GIL-atomic so no lock needed for the append itself.
80+
- Related but **out of scope** for this task: reader-thread join timeouts (that's `critical-04`) and whether the event is even emitted when peek is off (`medium-01`). This task is purely about survivability of the pump loop.
81+
- The fix is small (a few lines) but the test coverage is the real deliverable — these are exactly the flaky-looking "agent timed out" bugs that are impossible to diagnose later without a regression test locking in the behavior.

tests/test_agent.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ralphify._agent import (
1414
AgentResult,
1515
_kill_process_group,
16+
_pump_stream,
1617
_read_agent_stream,
1718
_run_agent_blocking,
1819
_run_agent_streaming,
@@ -936,3 +937,137 @@ def test_callback_only_does_not_buffer(self, tmp_path):
936937
assert result.returncode == 0
937938
assert result.log_file is None
938939
assert received == ["line1", "line2"]
940+
941+
942+
class TestPumpStreamExceptionHandling:
943+
"""Tests for _pump_stream resilience against callback and I/O errors."""
944+
945+
def test_continues_when_callback_raises(self):
946+
"""A callback that raises on the first line must not prevent
947+
subsequent lines from being buffered."""
948+
script = (
949+
"import sys; "
950+
"print('line1'); print('line2'); print('line3'); "
951+
"sys.stdout.flush()"
952+
)
953+
call_count = 0
954+
955+
def raising_callback(line, stream):
956+
nonlocal call_count
957+
call_count += 1
958+
if call_count == 1:
959+
raise RuntimeError("boom")
960+
961+
result = _run_agent_blocking(
962+
[sys.executable, "-u", "-c", script],
963+
prompt="",
964+
timeout=10,
965+
log_path_dir=None,
966+
iteration=1,
967+
on_output_line=raising_callback,
968+
)
969+
970+
assert result.returncode == 0
971+
# The callback was called for all three lines, even though
972+
# the first invocation raised.
973+
assert call_count == 3
974+
975+
def test_buffers_all_lines_when_callback_raises(self, tmp_path):
976+
"""When logging is enabled, all lines must be captured in the log
977+
even if the callback raises on every single line."""
978+
script = "import sys; print('a'); print('b'); print('c'); sys.stdout.flush()"
979+
980+
def always_raises(line, stream):
981+
raise ValueError("always fails")
982+
983+
result = _run_agent_blocking(
984+
[sys.executable, "-u", "-c", script],
985+
prompt="",
986+
timeout=10,
987+
log_path_dir=tmp_path,
988+
iteration=1,
989+
on_output_line=always_raises,
990+
)
991+
992+
assert result.returncode == 0
993+
assert result.log_file is not None
994+
log_text = result.log_file.read_text()
995+
assert "a" in log_text
996+
assert "b" in log_text
997+
assert "c" in log_text
998+
999+
def test_exits_cleanly_on_closed_stream(self):
1000+
"""When the read end of a pipe is closed, the pump thread must
1001+
exit within a short bounded join — not hang forever."""
1002+
import os
1003+
import threading
1004+
1005+
read_fd, write_fd = os.pipe()
1006+
read_file = os.fdopen(read_fd, "r")
1007+
write_file = os.fdopen(write_fd, "w")
1008+
1009+
buffer: list[str] = []
1010+
thread = threading.Thread(
1011+
target=_pump_stream,
1012+
args=(read_file, buffer, "stdout", None),
1013+
daemon=True,
1014+
)
1015+
thread.start()
1016+
1017+
# Write a line so the thread is actively reading, then close.
1018+
write_file.write("hello\n")
1019+
write_file.flush()
1020+
write_file.close()
1021+
1022+
# The thread must exit promptly — EOF from the closed write end.
1023+
thread.join(timeout=5)
1024+
assert not thread.is_alive(), (
1025+
"_pump_stream thread did not exit after pipe closed"
1026+
)
1027+
assert buffer == ["hello\n"]
1028+
1029+
read_file.close()
1030+
1031+
def test_exits_cleanly_on_valueerror(self):
1032+
"""A stream whose readline raises ValueError (e.g. closed file)
1033+
must not crash the thread — it should exit cleanly."""
1034+
import threading
1035+
1036+
class ClosedStream:
1037+
"""Fake stream that raises ValueError on readline, simulating
1038+
a concurrent close of the underlying file descriptor."""
1039+
1040+
def readline(self):
1041+
raise ValueError("I/O operation on closed file")
1042+
1043+
buffer: list[str] = []
1044+
thread = threading.Thread(
1045+
target=_pump_stream,
1046+
args=(ClosedStream(), buffer, "stdout", None),
1047+
daemon=True,
1048+
)
1049+
thread.start()
1050+
thread.join(timeout=5)
1051+
assert not thread.is_alive(), (
1052+
"_pump_stream thread did not exit after ValueError"
1053+
)
1054+
assert buffer == []
1055+
1056+
def test_exits_cleanly_on_oserror(self):
1057+
"""A stream whose readline raises OSError must not crash the thread."""
1058+
import threading
1059+
1060+
class BrokenStream:
1061+
def readline(self):
1062+
raise OSError("stream error")
1063+
1064+
buffer: list[str] = []
1065+
thread = threading.Thread(
1066+
target=_pump_stream,
1067+
args=(BrokenStream(), buffer, "stdout", None),
1068+
daemon=True,
1069+
)
1070+
thread.start()
1071+
thread.join(timeout=5)
1072+
assert not thread.is_alive(), "_pump_stream thread did not exit after OSError"
1073+
assert buffer == []

0 commit comments

Comments
 (0)