Skip to content

Commit fe39186

Browse files
Kasper JungeRalphify
authored andcommitted
refactor: move _drain_readers to finally block in blocking capture path
The blocking capture path in _run_agent_blocking had _drain_readers duplicated in the try body and except KeyboardInterrupt block, but missing from the finally block. An unexpected exception between thread start and drain could leak reader threads. This consolidates the drain into finally, matching the pattern already used by _run_agent_streaming. Co-authored-by: Ralphify <noreply@ralphify.co>
1 parent 88d9a7d commit fe39186

6 files changed

Lines changed: 301 additions & 33 deletions

File tree

src/ralphify/_agent.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,13 +502,12 @@ def _run_agent_blocking(
502502
except subprocess.TimeoutExpired:
503503
_ensure_process_dead(proc)
504504
timed_out = True
505-
_drain_readers(stdout_thread, stderr_thread)
506505
except KeyboardInterrupt:
507506
_ensure_process_dead(proc)
508-
_drain_readers(stdout_thread, stderr_thread)
509507
raise
510508
finally:
511509
_ensure_process_dead(proc)
510+
_drain_readers(stdout_thread, stderr_thread)
512511

513512
stdout = "".join(stdout_lines) if stdout_lines is not None else None
514513
stderr = "".join(stderr_lines) if stderr_lines is not None else None

src/ralphify/_console_emitter.py

Lines changed: 99 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
from __future__ import annotations
88

9+
import sys
10+
import threading
911
import time
1012
from collections.abc import Callable
1113
from functools import partial
@@ -20,6 +22,7 @@
2022
from ralphify._events import (
2123
LOG_ERROR,
2224
STOP_COMPLETED,
25+
AgentOutputLineData,
2326
CommandsCompletedData,
2427
Event,
2528
EventType,
@@ -99,12 +102,35 @@ def __rich_console__(
99102
yield text
100103

101104

105+
def _interactive_default_peek(console: Console) -> bool:
106+
"""Return True when live peek should be on by default.
107+
108+
Peek is only useful when both (a) the console is attached to a real
109+
terminal (so the user can see the extra lines) and (b) stdin is a TTY
110+
(so the keypress listener is actually active and the user can turn
111+
peek back off). Recording consoles used in tests fail check (a).
112+
"""
113+
if not console.is_terminal:
114+
return False
115+
try:
116+
return sys.stdin.isatty()
117+
except (ValueError, OSError):
118+
return False
119+
120+
102121
class ConsoleEmitter:
103122
"""Renders engine events to the Rich console."""
104123

105124
def __init__(self, console: Console) -> None:
106125
self._console = console
107126
self._live: Live | None = None
127+
self._peek_enabled = _interactive_default_peek(console)
128+
self.wants_agent_output: bool = self._peek_enabled
129+
self._peek_lock = threading.Lock()
130+
# Outer lock that serialises every ``_console.print`` call so that
131+
# reader-thread / keypress-thread writes cannot interleave with
132+
# main-thread event handlers while a Rich ``Live`` region is active.
133+
self._console_lock = threading.Lock()
108134
self._handlers: dict[EventType, Callable[..., None]] = {
109135
EventType.RUN_STARTED: self._on_run_started,
110136
EventType.ITERATION_STARTED: self._on_iteration_started,
@@ -120,40 +146,76 @@ def __init__(self, console: Console) -> None:
120146
EventType.COMMANDS_COMPLETED: self._on_commands_completed,
121147
EventType.LOG_MESSAGE: self._on_log_message,
122148
EventType.RUN_STOPPED: self._on_run_stopped,
149+
EventType.AGENT_OUTPUT_LINE: self._on_agent_output_line,
123150
}
124151

152+
def toggle_peek(self) -> bool:
153+
"""Flip live-output rendering on or off.
154+
155+
Safe to call from a non-main thread (e.g. the keypress listener).
156+
Returns the new peek state. A short status banner is printed so
157+
the user gets visible feedback that the toggle took effect.
158+
159+
The banner print is issued while still holding ``_peek_lock`` so
160+
that two rapid toggles cannot print their banners in an order that
161+
disagrees with the final flag value. ``_console_lock`` is acquired
162+
after ``_peek_lock`` — this is the only nested-lock site, so the
163+
order is uncontested and there is no deadlock risk.
164+
"""
165+
with self._peek_lock:
166+
self._peek_enabled = not self._peek_enabled
167+
enabled = self._peek_enabled
168+
with self._console_lock:
169+
self._console.print(
170+
"[dim]peek on[/]" if enabled else "[dim]peek off[/]"
171+
)
172+
return enabled
173+
174+
def _on_agent_output_line(self, data: AgentOutputLineData) -> None:
175+
if not self._peek_enabled:
176+
return
177+
line = escape_markup(data["line"])
178+
with self._console_lock:
179+
self._console.print(f"[dim]{line}[/]")
180+
125181
def emit(self, event: Event) -> None:
126182
handler = self._handlers.get(event.type)
127183
if handler is not None:
128184
handler(event.data)
129185

130186
def _on_run_started(self, data: RunStartedData) -> None:
131187
ralph_name = data["ralph_name"]
132-
self._console.print(
133-
f"\n[bold {_brand.PURPLE}]▶ Running:[/] [bold]{escape_markup(ralph_name)}[/]"
134-
)
135-
info = _format_run_info(data["timeout"], data["commands"], data["max_iterations"])
136-
if info:
137-
self._console.print(f" [dim]{info}[/]")
188+
with self._console_lock:
189+
self._console.print(
190+
f"\n[bold {_brand.PURPLE}]▶ Running:[/] [bold]{escape_markup(ralph_name)}[/]"
191+
)
192+
info = _format_run_info(
193+
data["timeout"], data["commands"], data["max_iterations"]
194+
)
195+
if info:
196+
self._console.print(f" [dim]{info}[/]")
138197

139198
def _start_live(self) -> None:
140199
spinner = _IterationSpinner()
141-
self._live = Live(
142-
spinner,
143-
console=self._console,
144-
transient=True,
145-
refresh_per_second=_LIVE_REFRESH_RATE,
146-
)
147-
self._live.start()
200+
with self._console_lock:
201+
self._live = Live(
202+
spinner,
203+
console=self._console,
204+
transient=True,
205+
refresh_per_second=_LIVE_REFRESH_RATE,
206+
)
207+
self._live.start()
148208

149209
def _stop_live(self) -> None:
150210
if self._live is not None:
151-
self._live.stop()
152-
self._live = None
211+
with self._console_lock:
212+
self._live.stop()
213+
self._live = None
153214

154215
def _on_iteration_started(self, data: IterationStartedData) -> None:
155216
iteration = data["iteration"]
156-
self._console.print(f"\n[bold {_brand.BLUE}]── Iteration {iteration} ──[/]")
217+
with self._console_lock:
218+
self._console.print(f"\n[bold {_brand.BLUE}]── Iteration {iteration} ──[/]")
157219
self._start_live()
158220

159221
def _on_iteration_ended(
@@ -162,29 +224,34 @@ def _on_iteration_ended(
162224
self._stop_live()
163225
iteration = data["iteration"]
164226
detail = data["detail"]
165-
self._console.print(f"[{color}]{icon} Iteration {iteration} {detail}[/]")
166227
log_file = data["log_file"]
167-
if log_file:
168-
self._console.print(f" [dim]{_ICON_ARROW} {escape_markup(log_file)}[/]")
169228
result_text = data["result_text"]
170-
if result_text:
171-
self._console.print(Markdown(result_text))
229+
with self._console_lock:
230+
self._console.print(f"[{color}]{icon} Iteration {iteration} {detail}[/]")
231+
if log_file:
232+
self._console.print(
233+
f" [dim]{_ICON_ARROW} {escape_markup(log_file)}[/]"
234+
)
235+
if result_text:
236+
self._console.print(Markdown(result_text))
172237

173238
def _on_commands_completed(self, data: CommandsCompletedData) -> None:
174239
count = data["count"]
175240
if count:
176-
self._console.print(f" [bold]Commands:[/] {count} ran")
241+
with self._console_lock:
242+
self._console.print(f" [bold]Commands:[/] {count} ran")
177243

178244
def _on_log_message(self, data: LogMessageData) -> None:
179245
msg = escape_markup(data["message"])
180246
level = data["level"]
181-
if level == LOG_ERROR:
182-
self._console.print(f"[red]{msg}[/]")
183-
tb = data.get("traceback")
184-
if tb:
185-
self._console.print(f"[dim]{escape_markup(tb)}[/]")
186-
else:
187-
self._console.print(f"[dim]{msg}[/]")
247+
with self._console_lock:
248+
if level == LOG_ERROR:
249+
self._console.print(f"[red]{msg}[/]")
250+
tb = data.get("traceback")
251+
if tb:
252+
self._console.print(f"[dim]{escape_markup(tb)}[/]")
253+
else:
254+
self._console.print(f"[dim]{msg}[/]")
188255

189256
def _on_run_stopped(self, data: RunStoppedData) -> None:
190257
self._stop_live()
@@ -194,5 +261,6 @@ def _on_run_stopped(self, data: RunStoppedData) -> None:
194261
summary = _format_summary(
195262
data["total"], data["completed"], data["failed"], data["timed_out_count"]
196263
)
197-
self._console.print(f"\n[bold {_brand.BLUE}]──────────────────────[/]")
198-
self._console.print(f"[bold {_brand.GREEN}]Done:[/] {summary}")
264+
with self._console_lock:
265+
self._console.print(f"\n[bold {_brand.BLUE}]──────────────────────[/]")
266+
self._console.print(f"[bold {_brand.GREEN}]Done:[/] {summary}")

src/ralphify/_events.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class EventType(Enum):
8181

8282
# ── Agent activity (live streaming) ─────────────────────────
8383
AGENT_ACTIVITY = "agent_activity"
84+
AGENT_OUTPUT_LINE = "agent_output_line"
8485

8586
# ── Other ───────────────────────────────────────────────────
8687
LOG_MESSAGE = "log_message"
@@ -139,6 +140,16 @@ class AgentActivityData(TypedDict):
139140
iteration: int
140141

141142

143+
OutputStream = Literal["stdout", "stderr"]
144+
"""Which standard stream an :class:`AgentOutputLineData` event came from."""
145+
146+
147+
class AgentOutputLineData(TypedDict):
148+
line: str
149+
stream: OutputStream
150+
iteration: int
151+
152+
142153
class LogMessageData(TypedDict):
143154
message: str
144155
level: LogLevel
@@ -154,6 +165,7 @@ class LogMessageData(TypedDict):
154165
| CommandsCompletedData
155166
| PromptAssembledData
156167
| AgentActivityData
168+
| AgentOutputLineData
157169
| LogMessageData
158170
)
159171
"""Union of all typed event data payloads."""
@@ -188,13 +200,17 @@ def emit(self, event: Event) -> None: ...
188200
class NullEmitter:
189201
"""Discards all events silently."""
190202

203+
wants_agent_output = False
204+
191205
def emit(self, event: Event) -> None:
192206
pass
193207

194208

195209
class QueueEmitter:
196210
"""Pushes events into a :class:`queue.Queue` for async consumption."""
197211

212+
wants_agent_output = True
213+
198214
def __init__(self, q: queue.Queue[Event] | None = None) -> None:
199215
self.queue: queue.Queue[Event] = q or queue.Queue()
200216

@@ -224,6 +240,7 @@ class BoundEmitter:
224240
def __init__(self, emitter: EventEmitter, run_id: str) -> None:
225241
self._emitter = emitter
226242
self._run_id = run_id
243+
self.wants_agent_output: bool = getattr(emitter, "wants_agent_output", True)
227244

228245
def __call__(
229246
self,
@@ -242,6 +259,15 @@ def log_info(self, message: str) -> None:
242259
"""Emit a ``LOG_MESSAGE`` event at info level."""
243260
self(EventType.LOG_MESSAGE, LogMessageData(message=message, level=LOG_INFO))
244261

262+
def agent_output_line(
263+
self, line: str, stream: OutputStream, iteration: int
264+
) -> None:
265+
"""Emit an ``AGENT_OUTPUT_LINE`` event with a raw line of agent output."""
266+
self(
267+
EventType.AGENT_OUTPUT_LINE,
268+
AgentOutputLineData(line=line, stream=stream, iteration=iteration),
269+
)
270+
245271
def log_error(self, message: str, *, traceback: str | None = None) -> None:
246272
"""Emit a ``LOG_MESSAGE`` event at error level."""
247273
data = LogMessageData(message=message, level=LOG_ERROR)

src/ralphify/engine.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ def _run_agent_phase(
178178
f"Invalid agent command syntax: {config.agent!r}. {_field_hint(FIELD_AGENT)}"
179179
) from exc
180180

181+
on_output_line = (
182+
(lambda line, stream: emit.agent_output_line(line, stream, state.iteration))
183+
if emit.wants_agent_output
184+
else None
185+
)
186+
181187
try:
182188
agent = execute_agent(
183189
cmd,
@@ -189,6 +195,7 @@ def _run_agent_phase(
189195
EventType.AGENT_ACTIVITY,
190196
AgentActivityData(raw=data, iteration=state.iteration),
191197
),
198+
on_output_line=on_output_line,
192199
)
193200
except FileNotFoundError as exc:
194201
raise FileNotFoundError(
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Critical 01 — Capture strategy: three-way branch
2+
3+
**Original findings:** C1 (silent output regression) + M1 (unbounded buffering)
4+
**Severity:** Critical — silently swallows agent output in common setups
5+
**Files:** `src/ralphify/_agent.py`, `src/ralphify/engine.py`, `src/ralphify/_console_emitter.py`
6+
7+
## Problem
8+
9+
`_run_agent_blocking` used to pass `stdout=None, stderr=None` to `subprocess.Popen` whenever `log_path_dir` was `None`, letting the child's fds inherit straight through to the terminal. The live-peek refactor changed it to **always pipe** stdout/stderr and drain via reader threads, capturing everything into `stdout_lines`/`stderr_lines`.
10+
11+
The echo guard is:
12+
13+
```python
14+
# src/ralphify/_agent.py ~line 435
15+
if log_path_dir is not None:
16+
_echo_output(stdout, stderr)
17+
```
18+
19+
with a comment claiming "When logging is disabled, live peek (if enabled) has already shown the lines."
20+
21+
That claim is **false** whenever peek is not active. `_interactive_default_peek` in `_console_emitter.py:105` returns `False` unless both `console.is_terminal` AND `sys.stdin.isatty()` are true.
22+
23+
## Why it matters
24+
25+
Concrete user-visible regressions from `main`:
26+
27+
1. `ralph run my-ralph | cat` → stdout is not a TTY → peek off → no echo → **user sees zero agent output**.
28+
2. `ralph run my-ralph | tee run.log`, `ralph run ... 2>&1 | grep ERROR`, `nohup ralph run`, `ralph run` from a systemd unit — all silently swallow agent output.
29+
3. Interactive user presses `p` to mute peek → subsequent iterations' output is discarded forever (no echo catches it because `log_path_dir is None`).
30+
4. Secondary issue (M1): even when nobody needs the bytes, every iteration accumulates full stdout+stderr into Python `list`s. For a chatty agent running for hours, one iteration can buffer hundreds of MB that is immediately thrown away by `_write_log(None, …)`.
31+
32+
## Fix direction
33+
34+
Replace the binary "always capture" with a three-way branch:
35+
36+
1. **No log, peek unavailable**`stdout=None, stderr=None` (inherit, no reader threads, no capture). Matches pre-refactor behavior and fixes both issues in one move.
37+
2. **Peek available (TTY user wants live output)** → reader threads + `on_output_line` callback. Buffer only if logging needs it.
38+
3. **`log_path_dir` set** → reader threads that accumulate into lists for log writing.
39+
40+
The hard part: `_run_agent_blocking` has to know at spawn time whether peek is enabled. Peek state currently lives in `ConsoleEmitter._peek_enabled`, one layer above the agent. Pick one approach:
41+
42+
- **A — signal via `on_output_line`:** the engine (which has the emitter) passes `on_output_line=None` when no subscriber will render output. `_run_agent_blocking` treats `on_output_line=None AND log_path_dir=None` as "use inheritance." This is the simplest change and dovetails with `medium-01` (event filtering).
43+
- **B — add a capability method to the emitter** (e.g. `emitter.wants_agent_output_lines()`) and have the engine check it. More explicit but more plumbing.
44+
45+
**Prefer option A.** It requires `ConsoleEmitter` to expose peek state so the engine can pass `None` when peek is off, or the engine can simply pass the callback always and let `_run_agent_blocking` decide based on whether peek *might* become enabled mid-iteration (which it can, via `p`). If toggling mid-iteration matters, you have to capture — in which case document it and keep the echo-on-log path, plus add echo when peek was off for the whole iteration.
46+
47+
Simpler user-facing model: **peek being on/off does not change whether the iteration's output is eventually shown.** Echo at iteration end whenever the inherit path wasn't taken AND peek wasn't visible for the full iteration. See `high-01` for the Live spinner coordination this requires.
48+
49+
## Done when
50+
51+
- [ ] `ralph run my-ralph | cat` shows agent output (regression test: subprocess pipe, assert stdout non-empty).
52+
- [ ] `ralph run my-ralph` with `--log-dir` set still writes the log file and the user still sees the output in the terminal (exactly once — see `high-01`).
53+
- [ ] `ralph run my-ralph` in an interactive TTY with peek on shows live output (no regression).
54+
- [ ] No per-iteration unbounded buffering when neither log nor peek is active (verify by checking the `Popen` kwargs in the non-capture branch).
55+
- [ ] `uv run pytest` passes. Add a new test in `tests/test_agent.py` that asserts the non-capture `Popen` path is used when `log_path_dir=None and on_output_line=None`.
56+
- [ ] `uv run ruff check . && uv run ruff format --check . && uv run ty check` all pass.
57+
58+
## Context
59+
60+
- The old `_run_agent_blocking` is in the diff — recover it via `git log -p src/ralphify/_agent.py` to see the pre-refactor shape.
61+
- `_echo_output` is defined at `src/ralphify/_agent.py:153`. It writes directly to `sys.stdout`/`sys.stderr`, which itself is a bug — see `high-01`.
62+
- `_interactive_default_peek` is at `src/ralphify/_console_emitter.py:105`. The checks are `console.is_terminal and sys.stdin.isatty()`.
63+
- `execute_agent` at `src/ralphify/_agent.py:455` is the single entry point; it dispatches to streaming or blocking. Both paths need the same three-way logic, though streaming has fewer escape valves (it always needs to read the JSON stream, so inheritance is only an option for stderr). Keep the scope of this task to the blocking path; the streaming path already captures stdout for JSON parsing, so it's a separate (smaller) consideration.
64+
- Engine wiring: `src/ralphify/engine.py` around `_run_agent_phase` builds the `on_output_line` lambda. That's the place to pass `None` when no subscriber cares.
65+
- **Do not** merge the `medium-01` event-filtering work into this task — that one is strictly an optimization on top of the capability signal introduced here.

0 commit comments

Comments
 (0)