Skip to content

Commit fdda67e

Browse files
Kasper Jungeclaude
authored andcommitted
refactor: extract _read_agent_stream to separate stream parsing from process lifecycle
The _run_agent_streaming function mixed two concerns in a single loop: subprocess lifecycle management (kill, wait, cleanup) and JSON stream content processing (parse, extract result, dispatch callbacks). This made the function harder to reason about — you had to mentally track 5 mutable state variables across nested control flow. Now _read_agent_stream owns the stream reading and returns a typed _StreamResult, while _run_agent_streaming owns the subprocess lifecycle (spawn, stdin, timeout kill, finally cleanup). Each function has one clear job. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5aad9de commit fdda67e

1 file changed

Lines changed: 62 additions & 39 deletions

File tree

src/ralphify/_agent.py

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from collections.abc import Callable
2323
from datetime import datetime
2424
from pathlib import Path
25-
from typing import NamedTuple
25+
from typing import IO, NamedTuple
2626

2727
from ralphify._output import collect_output
2828

@@ -36,6 +36,14 @@ class AgentResult(NamedTuple):
3636
result_text: str | None = None
3737

3838

39+
class _StreamResult(NamedTuple):
40+
"""Accumulated output from reading the agent's JSON stream."""
41+
42+
stdout_lines: list[str]
43+
result_text: str | None
44+
timed_out: bool
45+
46+
3947
def _write_log(
4048
log_path_dir: Path,
4149
iteration: int,
@@ -57,6 +65,47 @@ def _is_claude_command(cmd: list[str]) -> bool:
5765
return binary == "claude"
5866

5967

68+
def _read_agent_stream(
69+
stdout: IO[str],
70+
deadline: float | None,
71+
on_activity: Callable[[dict], None] | None,
72+
) -> _StreamResult:
73+
"""Read the agent's JSON stream line-by-line until EOF or timeout.
74+
75+
Parses each non-empty line as JSON. Valid JSON objects are forwarded
76+
to *on_activity* (if provided). The ``result`` field from
77+
``{"type": "result"}`` events is captured as *result_text*.
78+
79+
Lines that aren't valid JSON are silently collected for logging but
80+
not forwarded — this keeps the caller working even if the agent
81+
emits non-JSON diagnostics to stdout.
82+
83+
Returns early with ``timed_out=True`` when the deadline is exceeded,
84+
leaving the caller responsible for killing the subprocess.
85+
"""
86+
stdout_lines: list[str] = []
87+
result_text: str | None = None
88+
89+
for line in stdout:
90+
if deadline and time.monotonic() > deadline:
91+
return _StreamResult(stdout_lines, result_text, timed_out=True)
92+
93+
stdout_lines.append(line)
94+
stripped = line.strip()
95+
if not stripped:
96+
continue
97+
try:
98+
parsed = json.loads(stripped)
99+
except json.JSONDecodeError:
100+
continue
101+
if parsed.get("type") == "result" and "result" in parsed:
102+
result_text = parsed["result"]
103+
if on_activity is not None:
104+
on_activity(parsed)
105+
106+
return _StreamResult(stdout_lines, result_text, timed_out=False)
107+
108+
60109
def _run_agent_streaming(
61110
cmd: list[str],
62111
prompt: str,
@@ -68,23 +117,13 @@ def _run_agent_streaming(
68117
"""Run the agent subprocess with line-by-line streaming of JSON output.
69118
70119
Used for agents that support ``--output-format stream-json`` (e.g. Claude
71-
Code). Each JSON line is passed to *on_activity* (if provided) so the
72-
caller can emit events or update UI.
73-
74-
Falls back gracefully if any line is not valid JSON — it is still
75-
collected for logging but not forwarded as structured data.
76-
77-
Timeout is enforced manually between line reads (``Popen`` has no
78-
built-in timeout on iteration). A ``try/finally`` ensures the child
79-
process is cleaned up even on unexpected errors.
120+
Code). Stream processing is delegated to :func:`_read_agent_stream`;
121+
this function owns the subprocess lifecycle (spawn, stdin delivery,
122+
timeout kill, and cleanup via ``try/finally``).
80123
"""
81124
stream_cmd = cmd + ["--output-format", "stream-json", "--verbose"]
82125
start = time.monotonic()
83126
deadline = (start + timeout) if timeout else None
84-
stdout_lines: list[str] = []
85-
stderr_data = ""
86-
returncode: int | None = None
87-
result_text: str | None = None
88127

89128
proc = subprocess.Popen(
90129
stream_cmd,
@@ -99,32 +138,16 @@ def _run_agent_streaming(
99138
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
100139
raise RuntimeError("subprocess.Popen failed to create PIPE streams")
101140

102-
# Send prompt and close stdin so the agent can start.
103141
proc.stdin.write(prompt)
104142
proc.stdin.close()
105143

106-
timed_out = False
107-
for line in proc.stdout:
108-
if deadline and time.monotonic() > deadline:
109-
proc.kill()
110-
proc.wait()
111-
timed_out = True
112-
break
113-
stdout_lines.append(line)
114-
stripped = line.strip()
115-
if not stripped:
116-
continue
117-
try:
118-
parsed = json.loads(stripped)
119-
except json.JSONDecodeError:
120-
continue
121-
if parsed.get("type") == "result" and "result" in parsed:
122-
result_text = parsed["result"]
123-
if on_activity is not None:
124-
on_activity(parsed)
125-
126-
if not timed_out:
127-
# stdout exhausted — process finished normally.
144+
stream = _read_agent_stream(proc.stdout, deadline, on_activity)
145+
146+
if stream.timed_out:
147+
proc.kill()
148+
proc.wait()
149+
returncode = None
150+
else:
128151
proc.wait()
129152
returncode = proc.returncode
130153

@@ -136,13 +159,13 @@ def _run_agent_streaming(
136159

137160
log_file: Path | None = None
138161
if log_path_dir:
139-
log_file = _write_log(log_path_dir, iteration, "".join(stdout_lines), stderr_data)
162+
log_file = _write_log(log_path_dir, iteration, "".join(stream.stdout_lines), stderr_data)
140163

141164
return AgentResult(
142165
returncode=returncode,
143166
elapsed=time.monotonic() - start,
144167
log_file=log_file,
145-
result_text=result_text,
168+
result_text=stream.result_text,
146169
)
147170

148171

0 commit comments

Comments
 (0)