Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 64 additions & 13 deletions src/ralphify/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from __future__ import annotations

import json
import os
import signal
import subprocess
import sys
import time
Expand Down Expand Up @@ -43,6 +45,43 @@
_LOG_TIMESTAMP_FORMAT = "%Y%m%d-%H%M%S"
_LOG_ITERATION_PAD_WIDTH = 3

# Subprocess kwargs that isolate agent processes in their own session/group.
# On POSIX this uses start_new_session so the agent and all its children
# form a separate process group that can be killed together.
_SESSION_KWARGS: dict[str, Any] = (
{} if sys.platform == "win32"
else {"start_new_session": True}
)


def _kill_process_group(proc: subprocess.Popen[Any]) -> None:
"""Kill the agent process and its entire process group.

On POSIX, sends SIGTERM then SIGKILL to the process group — but only when
the process is actually a session leader (its pgid equals its pid). This
guard prevents accidentally killing the *caller's* process group when the
child was not started with ``start_new_session=True`` (e.g. in tests).
Falls back to ``proc.kill()`` on Windows or if the group kill fails.
"""
if sys.platform != "win32" and proc.poll() is None:
try:
pgid = os.getpgid(proc.pid)
except (OSError, ProcessLookupError):
pgid = None

if pgid == proc.pid:
try:
os.killpg(pgid, signal.SIGTERM)
try:
proc.wait(timeout=3)
return
except subprocess.TimeoutExpired:
os.killpg(pgid, signal.SIGKILL)
return
except (OSError, ProcessLookupError):
pass
proc.kill()


@dataclass
class AgentResult(ProcessResult):
Expand Down Expand Up @@ -176,6 +215,7 @@ def _run_agent_streaming(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**SUBPROCESS_TEXT_KWARGS,
**_SESSION_KWARGS,
)
try:
# Popen with PIPE guarantees non-None streams; guard explicitly
Expand All @@ -189,13 +229,13 @@ def _run_agent_streaming(
stream = _read_agent_stream(proc.stdout, deadline, on_activity)

if stream.timed_out:
proc.kill()
_kill_process_group(proc)
proc.wait()

stderr_data = proc.stderr.read()
finally:
if proc.poll() is None:
proc.kill()
_kill_process_group(proc)
proc.wait()

log_file = _write_log(log_path_dir, iteration, "".join(stream.stdout_lines), stderr_data)
Expand All @@ -222,6 +262,10 @@ def _run_agent_blocking(
then echoed to stdout/stderr so the user still sees it live. When unset,
output streams directly to the terminal (no capture overhead).

The subprocess is started in its own process group so that on
``KeyboardInterrupt`` or timeout the entire child tree can be killed
via :func:`_kill_process_group`.

Returns ``returncode=None`` when the process times out.
Raises ``FileNotFoundError`` if the command binary does not exist.
"""
Expand All @@ -230,20 +274,27 @@ def _run_agent_blocking(
timed_out = False
stdout: str | bytes | None = None
stderr: str | bytes | None = None
capture = log_path_dir is not None

proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE if capture else None,
stderr=subprocess.PIPE if capture else None,
**SUBPROCESS_TEXT_KWARGS,
**_SESSION_KWARGS,
)
try:
result = subprocess.run(
cmd,
input=prompt,
**SUBPROCESS_TEXT_KWARGS,
timeout=timeout,
capture_output=log_path_dir is not None,
)
returncode = result.returncode
stdout, stderr = result.stdout, result.stderr
except subprocess.TimeoutExpired as exc:
stdout, stderr = proc.communicate(input=prompt, timeout=timeout)
returncode = proc.returncode
except subprocess.TimeoutExpired:
_kill_process_group(proc)
stdout, stderr = proc.communicate()
timed_out = True
stdout, stderr = exc.stdout, exc.stderr
except KeyboardInterrupt:
_kill_process_group(proc)
proc.wait()
raise

log_file = _write_log(log_path_dir, iteration, stdout, stderr)
if log_path_dir:
Expand Down
39 changes: 37 additions & 2 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

# ── Patch targets ─────────────────────────────────────────────────────

MOCK_SUBPROCESS = "ralphify._agent.subprocess.run"
"""Patch target for subprocess.run inside the agent module."""
MOCK_SUBPROCESS = "ralphify._agent.subprocess.Popen"
"""Patch target for subprocess.Popen inside the agent module (blocking path)."""

MOCK_POPEN = "ralphify._agent.subprocess.Popen"
"""Patch target for subprocess.Popen inside the agent module (streaming path)."""
Expand Down Expand Up @@ -111,6 +111,7 @@ def ok_result(

Works as a direct factory — ``ok_result(stdout="out\\n")`` — and as a
``side_effect`` callable where mock call args are silently absorbed.
Used by runner tests that mock ``subprocess.run``.
"""
return _make_completed_process(returncode=0, stdout=stdout, stderr=stderr)

Expand All @@ -126,6 +127,40 @@ def fail_result(
return _make_completed_process(returncode=1, stdout=stdout, stderr=stderr)


def _make_mock_proc(returncode: int = 0, stdout: str = "", stderr: str = "") -> MagicMock:
"""Build a MagicMock that mimics Popen for the agent blocking path."""
proc = MagicMock()
proc.returncode = returncode
proc.communicate.return_value = (stdout, stderr)
proc.wait.return_value = returncode
proc.poll.return_value = returncode
proc.pid = 12345
return proc


def ok_proc(*_args: Any, stdout: str = "", stderr: str = "", **_kwargs: Any) -> MagicMock:
"""Popen mock with exit code 0. Works as a factory and ``side_effect``."""
return _make_mock_proc(returncode=0, stdout=stdout, stderr=stderr)


def fail_proc(*_args: Any, stdout: str = "", stderr: str = "", **_kwargs: Any) -> MagicMock:
"""Popen mock with exit code 1."""
return _make_mock_proc(returncode=1, stdout=stdout, stderr=stderr)


def timeout_proc(
*_args: Any, timeout: float = 5, stdout: str = "", stderr: str = "", **_kwargs: Any,
) -> MagicMock:
"""Popen mock whose communicate() raises TimeoutExpired."""
proc = _make_mock_proc(returncode=0)
proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="agent", timeout=timeout),
(stdout, stderr),
]
proc.poll.return_value = None
return proc


def ok_run_result(
output: str = "",
) -> RunResult:
Expand Down
117 changes: 85 additions & 32 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Tests for the _agent module — subprocess execution, log writing, and stream parsing."""

import io
import signal
import subprocess
import sys
from pathlib import Path
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from helpers import MOCK_POPEN, MOCK_SUBPROCESS, fail_result, make_mock_popen, ok_result
from helpers import MOCK_POPEN, MOCK_SUBPROCESS, fail_proc, make_mock_popen, ok_proc, timeout_proc

from ralphify._agent import (
AgentResult,
_kill_process_group,
_read_agent_stream,
_run_agent_streaming,
_supports_stream_json,
Expand Down Expand Up @@ -169,35 +172,32 @@ def test_last_result_wins(self):


class TestExecuteAgentBlocking:
@patch(MOCK_SUBPROCESS)
def test_success(self, mock_run):
mock_run.return_value = ok_result()
@patch(MOCK_POPEN, side_effect=ok_proc)
def test_success(self, mock_popen):
result = execute_agent(["echo"], "prompt", timeout=None, log_path_dir=None, iteration=1)

assert result.returncode == 0
assert result.timed_out is False
assert result.log_file is None
assert result.elapsed >= 0

@patch(MOCK_SUBPROCESS)
def test_failure(self, mock_run):
mock_run.return_value = fail_result()
@patch(MOCK_POPEN, side_effect=fail_proc)
def test_failure(self, mock_popen):
result = execute_agent(["echo"], "prompt", timeout=None, log_path_dir=None, iteration=1)

assert result.returncode == 1
assert result.timed_out is False

@patch(MOCK_SUBPROCESS)
def test_timeout(self, mock_run):
mock_run.side_effect = subprocess.TimeoutExpired(cmd="echo", timeout=5)
@patch(MOCK_POPEN, side_effect=timeout_proc)
def test_timeout(self, mock_popen):
result = execute_agent(["echo"], "prompt", timeout=5, log_path_dir=None, iteration=1)

assert result.returncode is None
assert result.timed_out is True

@patch(MOCK_SUBPROCESS)
def test_writes_log_on_success(self, mock_run, tmp_path):
mock_run.return_value = ok_result(stdout="agent output\n")
@patch(MOCK_POPEN)
def test_writes_log_on_success(self, mock_popen, tmp_path):
mock_popen.return_value = ok_proc(stdout="agent output\n")
result = execute_agent(
["echo"], "prompt", timeout=None, log_path_dir=tmp_path, iteration=3,
)
Expand All @@ -207,27 +207,21 @@ def test_writes_log_on_success(self, mock_run, tmp_path):
assert result.log_file.name.startswith("003_")
assert "agent output" in result.log_file.read_text()

@patch(MOCK_SUBPROCESS)
def test_writes_log_on_timeout(self, mock_run, tmp_path):
exc = subprocess.TimeoutExpired(cmd="echo", timeout=5)
exc.stdout = "partial"
exc.stderr = "err"
mock_run.side_effect = exc
@patch(MOCK_POPEN)
def test_writes_log_on_timeout(self, mock_popen, tmp_path):
mock_popen.return_value = timeout_proc(stdout="partial", stderr="err")
result = execute_agent(
["echo"], "prompt", timeout=5, log_path_dir=tmp_path, iteration=1,
)

assert result.log_file is not None
assert result.log_file.exists()

@patch(MOCK_SUBPROCESS)
def test_timeout_echoes_captured_output(self, mock_run, tmp_path, capsys):
@patch(MOCK_POPEN)
def test_timeout_echoes_captured_output(self, mock_popen, tmp_path, capsys):
"""When logging is enabled and the agent times out, partial output
should be echoed to the terminal — same as on normal completion."""
exc = subprocess.TimeoutExpired(cmd="echo", timeout=5)
exc.stdout = "partial stdout"
exc.stderr = "partial stderr"
mock_run.side_effect = exc
mock_popen.return_value = timeout_proc(stdout="partial stdout", stderr="partial stderr")
execute_agent(
["echo"], "prompt", timeout=5, log_path_dir=tmp_path, iteration=1,
)
Expand All @@ -236,16 +230,15 @@ def test_timeout_echoes_captured_output(self, mock_run, tmp_path, capsys):
assert "partial stdout" in captured.out
assert "partial stderr" in captured.err

@patch(MOCK_SUBPROCESS)
def test_no_log_when_dir_not_set(self, mock_run):
mock_run.return_value = ok_result()
@patch(MOCK_POPEN, side_effect=ok_proc)
def test_no_log_when_dir_not_set(self, mock_popen):
result = execute_agent(["echo"], "prompt", timeout=None, log_path_dir=None, iteration=1)

assert result.log_file is None

@patch(MOCK_SUBPROCESS)
def test_file_not_found_propagates(self, mock_run):
mock_run.side_effect = FileNotFoundError("not found")
@patch(MOCK_POPEN)
def test_file_not_found_propagates(self, mock_popen):
mock_popen.side_effect = FileNotFoundError("not found")

with pytest.raises(FileNotFoundError):
execute_agent(
Expand Down Expand Up @@ -423,3 +416,63 @@ def test_timeout_kills_process(self, mock_popen, mock_time):
assert result.timed_out is True
assert result.returncode is None
proc.kill.assert_called()


class TestProcessGroupCleanup:
"""Process group cleanup, isolation, and _kill_process_group tests."""

pytestmark = pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only behavior")

@patch("ralphify._agent.os.killpg")
@patch("ralphify._agent.os.getpgid")
def test_session_leader_gets_sigterm(self, mock_getpgid, mock_killpg):
proc = MagicMock(pid=42, poll=MagicMock(return_value=None))
mock_getpgid.return_value = 42

_kill_process_group(proc)

mock_killpg.assert_any_call(42, signal.SIGTERM)
proc.wait.assert_called_once_with(timeout=3)

@patch("ralphify._agent.os.killpg")
@patch("ralphify._agent.os.getpgid")
def test_escalates_to_sigkill_on_timeout(self, mock_getpgid, mock_killpg):
proc = MagicMock(pid=42, poll=MagicMock(return_value=None))
proc.wait.side_effect = subprocess.TimeoutExpired(cmd="agent", timeout=3)
mock_getpgid.return_value = 42

_kill_process_group(proc)

mock_killpg.assert_any_call(42, signal.SIGTERM)
mock_killpg.assert_any_call(42, signal.SIGKILL)

@patch("ralphify._agent.os.killpg")
@patch("ralphify._agent.os.getpgid")
def test_not_session_leader_falls_back_to_kill(self, mock_getpgid, mock_killpg):
proc = MagicMock(pid=42, poll=MagicMock(return_value=None))
mock_getpgid.return_value = 1

_kill_process_group(proc)

mock_killpg.assert_not_called()
proc.kill.assert_called_once()

def test_already_exited_falls_back_to_kill(self):
proc = MagicMock(pid=42, poll=MagicMock(return_value=0))

_kill_process_group(proc)

proc.kill.assert_called_once()

@patch(MOCK_POPEN, side_effect=ok_proc)
def test_blocking_uses_start_new_session(self, mock_popen):
execute_agent(["echo"], "prompt", timeout=None, log_path_dir=None, iteration=1)
assert mock_popen.call_args[1].get("start_new_session") is True

@patch(MOCK_POPEN)
def test_streaming_uses_start_new_session(self, mock_popen):
mock_popen.return_value = make_mock_popen(returncode=0)
_run_agent_streaming(
["claude", "-p"], "prompt", timeout=None, log_path_dir=None, iteration=1,
)
assert mock_popen.call_args[1].get("start_new_session") is True
Loading
Loading