Skip to content

Commit 6303fe1

Browse files
Kasper JungeRalphify
authored andcommitted
fix: guard callbacks in _read_agent_stream against exceptions
_pump_stream (blocking path) already wraps on_output_line in try/except so a raising callback cannot kill the drain loop. _read_agent_stream (streaming path) did not — a transient error in the console emitter or any other callback would crash the stream reader and kill the entire run. Guard both on_output_line and on_activity with per-call try/except, matching _pump_stream's resilience contract. Co-authored-by: Ralphify <noreply@ralphify.co>
1 parent 0abb36b commit 6303fe1

2 files changed

Lines changed: 56 additions & 2 deletions

File tree

src/ralphify/_agent.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,11 @@ def _read_agent_stream(
237237
for line in stdout:
238238
stdout_lines.append(line)
239239
if on_output_line is not None:
240-
on_output_line(line.rstrip("\r\n"), _STDOUT)
240+
try:
241+
on_output_line(line.rstrip("\r\n"), _STDOUT)
242+
except Exception:
243+
# Callback is best-effort; draining must not stop.
244+
pass
241245

242246
stripped = line.strip()
243247
if stripped:
@@ -251,7 +255,11 @@ def _read_agent_stream(
251255
):
252256
result_text = parsed[_RESULT_FIELD]
253257
if on_activity is not None:
254-
on_activity(parsed)
258+
try:
259+
on_activity(parsed)
260+
except Exception:
261+
# Callback is best-effort; draining must not stop.
262+
pass
255263

256264
if deadline is not None and time.monotonic() > deadline:
257265
return _StreamResult(

tests/test_agent.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,52 @@ def test_last_result_wins(self):
204204

205205
assert result.result_text == "second"
206206

207+
def test_continues_when_on_output_line_raises(self):
208+
"""A raising on_output_line callback must not crash the stream reader.
209+
210+
This mirrors _pump_stream's behavior where callback exceptions are
211+
caught per-line so that draining continues. Without this guard, a
212+
transient rendering error in the console emitter would kill the
213+
entire streaming run."""
214+
call_count = 0
215+
216+
def raising_callback(line, stream):
217+
nonlocal call_count
218+
call_count += 1
219+
if call_count == 1:
220+
raise RuntimeError("boom")
221+
222+
stream = io.StringIO("line1\nline2\nline3\n")
223+
result = _read_agent_stream(
224+
stream, deadline=None, on_activity=None, on_output_line=raising_callback
225+
)
226+
227+
assert len(result.stdout_lines) == 3
228+
assert call_count == 3
229+
assert result.timed_out is False
230+
231+
def test_continues_when_on_activity_raises(self):
232+
"""A raising on_activity callback must not crash the stream reader."""
233+
call_count = 0
234+
235+
def raising_activity(data):
236+
nonlocal call_count
237+
call_count += 1
238+
if call_count == 1:
239+
raise RuntimeError("boom")
240+
241+
stream = io.StringIO(
242+
'{"type": "status", "msg": "a"}\n'
243+
'{"type": "status", "msg": "b"}\n'
244+
)
245+
result = _read_agent_stream(
246+
stream, deadline=None, on_activity=raising_activity
247+
)
248+
249+
assert len(result.stdout_lines) == 2
250+
assert call_count == 2
251+
assert result.timed_out is False
252+
207253

208254
class TestExecuteAgentBlocking:
209255
@patch(MOCK_SUBPROCESS, side_effect=ok_proc)

0 commit comments

Comments
 (0)