Skip to content
77 changes: 64 additions & 13 deletions src/ralphify/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from __future__ import annotations

import json
import os
import signal
import subprocess
import sys
import time
Expand Down Expand Up @@ -43,6 +45,43 @@
_LOG_TIMESTAMP_FORMAT = "%Y%m%d-%H%M%S"
_LOG_ITERATION_PAD_WIDTH = 3

# Subprocess kwargs that isolate agent processes in their own session/group.
# On POSIX this uses start_new_session so the agent and all its children
# form a separate process group that can be killed together.
_SESSION_KWARGS: dict[str, Any] = (
{} if sys.platform == "win32"
else {"start_new_session": True}
)


def _kill_process_group(proc: subprocess.Popen[Any]) -> None:
"""Kill the agent process and its entire process group.

On POSIX, sends SIGTERM then SIGKILL to the process group — but only when
the process is actually a session leader (its pgid equals its pid). This
guard prevents accidentally killing the *caller's* process group when the
child was not started with ``start_new_session=True`` (e.g. in tests).
Falls back to ``proc.kill()`` on Windows or if the group kill fails.
"""
if sys.platform != "win32" and proc.poll() is None:
try:
pgid = os.getpgid(proc.pid)
except (OSError, ProcessLookupError):
pgid = None

if pgid == proc.pid:
try:
os.killpg(pgid, signal.SIGTERM)
try:
proc.wait(timeout=3)
return
except subprocess.TimeoutExpired:
os.killpg(pgid, signal.SIGKILL)
return
except (OSError, ProcessLookupError):
pass
proc.kill()


@dataclass
class AgentResult(ProcessResult):
Expand Down Expand Up @@ -176,6 +215,7 @@ def _run_agent_streaming(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**SUBPROCESS_TEXT_KWARGS,
**_SESSION_KWARGS,
)
try:
# Popen with PIPE guarantees non-None streams; guard explicitly
Expand All @@ -189,13 +229,13 @@ def _run_agent_streaming(
stream = _read_agent_stream(proc.stdout, deadline, on_activity)

if stream.timed_out:
proc.kill()
_kill_process_group(proc)
proc.wait()

stderr_data = proc.stderr.read()
finally:
if proc.poll() is None:
proc.kill()
_kill_process_group(proc)
proc.wait()

log_file = _write_log(log_path_dir, iteration, "".join(stream.stdout_lines), stderr_data)
Expand All @@ -222,6 +262,10 @@ def _run_agent_blocking(
then echoed to stdout/stderr so the user still sees it live. When unset,
output streams directly to the terminal (no capture overhead).

The subprocess is started in its own process group so that on
``KeyboardInterrupt`` or timeout the entire child tree can be killed
via :func:`_kill_process_group`.

Returns ``returncode=None`` when the process times out.
Raises ``FileNotFoundError`` if the command binary does not exist.
"""
Expand All @@ -230,20 +274,27 @@ def _run_agent_blocking(
timed_out = False
stdout: str | bytes | None = None
stderr: str | bytes | None = None
capture = log_path_dir is not None

proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE if capture else None,
stderr=subprocess.PIPE if capture else None,
**SUBPROCESS_TEXT_KWARGS,
**_SESSION_KWARGS,
)
try:
result = subprocess.run(
cmd,
input=prompt,
**SUBPROCESS_TEXT_KWARGS,
timeout=timeout,
capture_output=log_path_dir is not None,
)
returncode = result.returncode
stdout, stderr = result.stdout, result.stderr
except subprocess.TimeoutExpired as exc:
stdout, stderr = proc.communicate(input=prompt, timeout=timeout)
returncode = proc.returncode
except subprocess.TimeoutExpired:
_kill_process_group(proc)
stdout, stderr = proc.communicate()
timed_out = True
stdout, stderr = exc.stdout, exc.stderr
except KeyboardInterrupt:
_kill_process_group(proc)
proc.wait()
raise

log_file = _write_log(log_path_dir, iteration, stdout, stderr)
if log_path_dir:
Expand Down
20 changes: 19 additions & 1 deletion src/ralphify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import shlex
import shutil
import signal
import sys
from pathlib import Path
from typing import Any, NoReturn
Expand Down Expand Up @@ -465,4 +466,21 @@ def run(
state = RunState(run_id=generate_run_id())
emitter = ConsoleEmitter(_console)

run_loop(config, state, emitter)
ctrl_c_count = 0
original_handler = signal.getsignal(signal.SIGINT)

def _sigint_handler(signum: int, frame: Any) -> None:
nonlocal ctrl_c_count
ctrl_c_count += 1
if ctrl_c_count == 1:
state.request_stop()
_console.print("\n[yellow]Finishing current iteration… (Ctrl+C again to force stop)[/yellow]")
else:
signal.signal(signal.SIGINT, original_handler)
raise KeyboardInterrupt

signal.signal(signal.SIGINT, _sigint_handler)
try:
run_loop(config, state, emitter)
finally:
signal.signal(signal.SIGINT, original_handler)
39 changes: 37 additions & 2 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

# ── Patch targets ─────────────────────────────────────────────────────

MOCK_SUBPROCESS = "ralphify._agent.subprocess.run"
"""Patch target for subprocess.run inside the agent module."""
MOCK_SUBPROCESS = "ralphify._agent.subprocess.Popen"
"""Patch target for subprocess.Popen inside the agent module (blocking path)."""

MOCK_POPEN = "ralphify._agent.subprocess.Popen"
"""Patch target for subprocess.Popen inside the agent module (streaming path)."""
Expand Down Expand Up @@ -111,6 +111,7 @@ def ok_result(

Works as a direct factory — ``ok_result(stdout="out\\n")`` — and as a
``side_effect`` callable where mock call args are silently absorbed.
Used by runner tests that mock ``subprocess.run``.
"""
return _make_completed_process(returncode=0, stdout=stdout, stderr=stderr)

Expand All @@ -126,6 +127,40 @@ def fail_result(
return _make_completed_process(returncode=1, stdout=stdout, stderr=stderr)


def _make_mock_proc(returncode: int = 0, stdout: str = "", stderr: str = "") -> MagicMock:
"""Build a MagicMock that mimics Popen for the agent blocking path."""
proc = MagicMock()
proc.returncode = returncode
proc.communicate.return_value = (stdout, stderr)
proc.wait.return_value = returncode
proc.poll.return_value = returncode
proc.pid = 12345
return proc


def ok_proc(*_args: Any, stdout: str = "", stderr: str = "", **_kwargs: Any) -> MagicMock:
"""Popen mock with exit code 0. Works as a factory and ``side_effect``."""
return _make_mock_proc(returncode=0, stdout=stdout, stderr=stderr)


def fail_proc(*_args: Any, stdout: str = "", stderr: str = "", **_kwargs: Any) -> MagicMock:
"""Popen mock with exit code 1."""
return _make_mock_proc(returncode=1, stdout=stdout, stderr=stderr)


def timeout_proc(
*_args: Any, timeout: float = 5, stdout: str = "", stderr: str = "", **_kwargs: Any,
) -> MagicMock:
"""Popen mock whose communicate() raises TimeoutExpired."""
proc = _make_mock_proc(returncode=0)
proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="agent", timeout=timeout),
(stdout, stderr),
]
proc.poll.return_value = None
return proc


def ok_run_result(
output: str = "",
) -> RunResult:
Expand Down
Loading
Loading