From 920832ea702795bd7d41546bd3931d9e7883cae1 Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 13 Mar 2026 09:28:42 -0700 Subject: [PATCH 1/4] feat(adapters): add Codex engine adapter with app-server JSON-RPC protocol (#412) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement CodexAdapter that speaks the Codex app-server JSON-RPC protocol over stdio, enabling `cf work start --execute --engine codex`. - Full 4-step handshake: initialize → initialized → thread/start → turn/start - Turn streaming with event routing (session_started, notification, tool_call, turn/completed, turn/failed, turn/cancelled) - Auto-approval of tool calls (configurable via approval_policy) - Timeout enforcement: turn, read, and stall timeouts - Token usage tracking from turn_completed events - Registered in engine_registry (VALID_ENGINES + EXTERNAL_ENGINES) - OPENAI_API_KEY validator for CLI pre-flight checks - 23 unit tests covering protocol, handshake, streaming, approval, and full run --- codeframe/cli/app.py | 16 +- codeframe/cli/validators.py | 37 +++ codeframe/core/adapters/__init__.py | 2 + codeframe/core/adapters/codex.py | 382 ++++++++++++++++++++++++++++ codeframe/core/engine_registry.py | 9 + tests/core/adapters/test_codex.py | 377 +++++++++++++++++++++++++++ 6 files changed, 817 insertions(+), 6 deletions(-) create mode 100644 codeframe/core/adapters/codex.py create mode 100644 tests/core/adapters/test_codex.py diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index aea75036..949fd6d0 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -1998,7 +1998,7 @@ def work_start( engine: Optional[str] = typer.Option( None, "--engine", - help="Agent engine: react (default), plan (legacy), claude-code, opencode, or built-in", + help="Agent engine: react (default), plan (legacy), claude-code, codex, opencode, or built-in", ), stall_timeout: int = typer.Option( 300, @@ -2059,10 +2059,12 @@ def work_start( task = matching[0] # Validate API key before creating run record (avoids dangling IN_PROGRESS state) - # External engines (claude-code, opencode) manage their own authentication if execute: from codeframe.core.engine_registry import is_external_engine - if not is_external_engine(engine): + if engine == "codex": + from codeframe.cli.validators import require_openai_api_key + require_openai_api_key() + elif not is_external_engine(engine): from codeframe.cli.validators import require_anthropic_api_key require_anthropic_api_key() @@ -2888,7 +2890,7 @@ def batch_run( engine: Optional[str] = typer.Option( None, "--engine", - help="Agent engine: react (default), plan (legacy), claude-code, opencode, or built-in", + help="Agent engine: react (default), plan (legacy), claude-code, codex, opencode, or built-in", ), stall_timeout: int = typer.Option( 300, @@ -2989,9 +2991,11 @@ def batch_run( return # Validate API key before batch execution - # External engines (claude-code, opencode) manage their own authentication from codeframe.core.engine_registry import is_external_engine - if not is_external_engine(engine): + if engine == "codex": + from codeframe.cli.validators import require_openai_api_key + require_openai_api_key() + elif not is_external_engine(engine): from codeframe.cli.validators import require_anthropic_api_key require_anthropic_api_key() diff --git a/codeframe/cli/validators.py b/codeframe/cli/validators.py index db0d29c8..c19683de 100644 --- a/codeframe/cli/validators.py +++ b/codeframe/cli/validators.py @@ -46,3 +46,40 @@ def require_anthropic_api_key() -> str: "Set it in your environment or add it to a .env file." ) raise typer.Exit(1) + + +def require_openai_api_key() -> str: + """Ensure OPENAI_API_KEY is available, loading from .env if needed. + + Checks os.environ first. If not found, attempts to load from .env files + (~/.env as base, then cwd/.env with override). If found after loading, + sets in os.environ so subprocesses inherit it. + + Returns: + The API key string. + + Raises: + typer.Exit: If the key cannot be found anywhere. + """ + key = os.getenv("OPENAI_API_KEY") + if key: + return key + + cwd_env = Path.cwd() / ".env" + home_env = Path.home() / ".env" + + if home_env.exists(): + load_dotenv(home_env) + if cwd_env.exists(): + load_dotenv(cwd_env, override=True) + + key = os.getenv("OPENAI_API_KEY") + if key: + os.environ["OPENAI_API_KEY"] = key + return key + + console.print( + "[red]Error:[/red] OPENAI_API_KEY is not set. " + "Set it in your environment or add it to a .env file." + ) + raise typer.Exit(1) diff --git a/codeframe/core/adapters/__init__.py b/codeframe/core/adapters/__init__.py index 2b80cfe2..756640c2 100644 --- a/codeframe/core/adapters/__init__.py +++ b/codeframe/core/adapters/__init__.py @@ -11,6 +11,7 @@ BuiltinReactAdapter, ) from codeframe.core.adapters.claude_code import ClaudeCodeAdapter +from codeframe.core.adapters.codex import CodexAdapter from codeframe.core.adapters.opencode import OpenCodeAdapter from codeframe.core.adapters.subprocess_adapter import SubprocessAdapter from codeframe.core.adapters.verification_wrapper import VerificationWrapper @@ -25,6 +26,7 @@ "BuiltinPlanAdapter", "BuiltinReactAdapter", "ClaudeCodeAdapter", + "CodexAdapter", "OpenCodeAdapter", "SubprocessAdapter", "VerificationWrapper", diff --git a/codeframe/core/adapters/codex.py b/codeframe/core/adapters/codex.py new file mode 100644 index 00000000..10d5279d --- /dev/null +++ b/codeframe/core/adapters/codex.py @@ -0,0 +1,382 @@ +"""Codex adapter using the app-server JSON-RPC protocol. + +Speaks the JSON-RPC protocol that OpenAI's Codex app-server exposes over +stdio. Unlike the simple stdin-to-stdout adapters (Claude Code, OpenCode), +this adapter maintains a bidirectional conversation with the subprocess: + + initialize -> initialized + thread/start + turn/start -> (stream of events) -> turn/completed | turn/failed +""" + +from __future__ import annotations + +import json +import shutil +import subprocess +import threading +import time +import uuid +from pathlib import Path +from typing import Any, Callable + +from codeframe.core.adapters.agent_adapter import ( + AdapterTokenUsage, + AgentEvent, + AgentResult, +) + + +class CodexAdapter: + """Adapter that delegates code execution to OpenAI Codex via app-server protocol. + + The Codex CLI is launched with ``app-server`` subcommand, producing a + JSON-RPC-over-stdio channel. The adapter performs a four-step handshake + then streams turn events until a terminal event arrives. + """ + + # Default timeouts + DEFAULT_TURN_TIMEOUT_MS = 3_600_000 # 1 hour + DEFAULT_READ_TIMEOUT_MS = 30_000 # 30 s per line + DEFAULT_STALL_TIMEOUT_MS = 300_000 # 5 min no-progress + + def __init__( + self, + *, + codex_command: str = "codex", + approval_policy: str = "auto", + sandbox_mode: str | None = None, + turn_timeout_ms: int = DEFAULT_TURN_TIMEOUT_MS, + read_timeout_ms: int = DEFAULT_READ_TIMEOUT_MS, + stall_timeout_ms: int = DEFAULT_STALL_TIMEOUT_MS, + ) -> None: + self._binary = codex_command + self._approval_policy = approval_policy + self._sandbox_mode = sandbox_mode + self._turn_timeout_ms = turn_timeout_ms + self._read_timeout_ms = read_timeout_ms + self._stall_timeout_ms = stall_timeout_ms + + resolved = shutil.which(codex_command) + if resolved is None: + raise EnvironmentError( + f"'{codex_command}' not found on PATH. " + f"Install it or ensure it is available in your environment." + ) + self._binary_path = resolved + + @property + def name(self) -> str: + return "codex" + + @classmethod + def requirements(cls) -> dict[str, str]: + """Return required environment variables for ``cf engines check``.""" + return {"OPENAI_API_KEY": "OpenAI API key"} + + # ------------------------------------------------------------------ + # AgentAdapter.run + # ------------------------------------------------------------------ + + def run( + self, + task_id: str, + prompt: str, + workspace_path: Path, + on_event: Callable[[AgentEvent], None] | None = None, + ) -> AgentResult: + """Execute a task via the Codex app-server protocol.""" + start = time.monotonic() + + try: + cmd = [self._binary_path, "app-server"] + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=str(workspace_path), + text=True, + ) + except FileNotFoundError: + return AgentResult( + status="failed", + error=f"Binary '{self._binary}' not found during execution", + ) + except OSError as e: + return AgentResult( + status="failed", + error=f"Failed to start '{self._binary}': {e}", + ) + + # Drain stderr in background to prevent deadlock + stderr_chunks: list[str] = [] + + def _drain_stderr() -> None: + if process.stderr: + stderr_chunks.append(process.stderr.read()) + + stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) + stderr_thread.start() + + try: + ok = self._handshake( + process.stdin, process.stdout, prompt=prompt, workspace_path=workspace_path + ) + if not ok: + self._kill(process) + return AgentResult( + status="failed", + error="Codex app-server handshake failed (no initialized response)", + ) + + result = self._stream_turn(process.stdout, on_event=on_event, stdin=process.stdin) + except Exception as exc: + self._kill(process) + return AgentResult(status="failed", error=str(exc)) + finally: + stderr_thread.join(timeout=5) + self._kill(process) + + result.modified_files = self._detect_modified_files(workspace_path) + result.duration_ms = int((time.monotonic() - start) * 1000) + return result + + # ------------------------------------------------------------------ + # JSON-RPC framing + # ------------------------------------------------------------------ + + def _send( + self, + stdin: Any, + method: str, + params: dict, + msg_id: int | None = None, + ) -> None: + """Write a single JSON-RPC message to the subprocess stdin.""" + msg: dict[str, Any] = {"jsonrpc": "2.0", "method": method, "params": params} + if msg_id is not None: + msg["id"] = msg_id + stdin.write(json.dumps(msg) + "\n") + stdin.flush() + + def _recv_line(self, stdout: Any, timeout_s: float) -> dict | None: + """Read one JSON-RPC line from stdout. + + Returns the parsed dict, or None on EOF / empty line. + Timeout is approximate (based on readline blocking behaviour). + """ + line = stdout.readline() + if not line: + return None + try: + return json.loads(line) + except json.JSONDecodeError: + return None + + # ------------------------------------------------------------------ + # Handshake + # ------------------------------------------------------------------ + + def _handshake( + self, + stdin: Any, + stdout: Any, + *, + prompt: str, + workspace_path: Path, + ) -> bool: + """Perform the 4-step initialization handshake. + + 1. Send ``initialize`` with capabilities + 2. Wait for ``initialized`` response + 3. Send ``thread/start`` + 4. Send ``turn/start`` with the task prompt + + Returns True on success, False on timeout/failure. + """ + thread_id = str(uuid.uuid4()) + turn_id = str(uuid.uuid4()) + + # Step 1: initialize + self._send(stdin, "initialize", {"capabilities": {}}, msg_id=1) + + # Step 2: wait for initialized + timeout_s = self._read_timeout_ms / 1000 + response = self._recv_line(stdout, timeout_s=timeout_s) + if response is None: + return False + + # Accept either method="initialized" or a result response to id=1 + method = response.get("method", "") + if method != "initialized" and "result" not in response: + return False + + # Step 3: thread/start + self._send(stdin, "thread/start", { + "thread_id": thread_id, + "workspace": str(workspace_path), + }) + + # Step 4: turn/start + self._send(stdin, "turn/start", { + "turn_id": turn_id, + "prompt": prompt, + }) + + return True + + # ------------------------------------------------------------------ + # Turn streaming + # ------------------------------------------------------------------ + + def _stream_turn( + self, + stdout: Any, + *, + on_event: Callable[[AgentEvent], None] | None = None, + stdin: Any = None, + ) -> AgentResult: + """Stream turn events until a terminal event or timeout.""" + last_event_time = time.monotonic() + turn_start = time.monotonic() + stall_timeout_s = self._stall_timeout_ms / 1000 + turn_timeout_s = self._turn_timeout_ms / 1000 + read_timeout_s = self._read_timeout_ms / 1000 + + while True: + # Check stall timeout + if stall_timeout_s > 0 and (time.monotonic() - last_event_time) > stall_timeout_s: + return AgentResult( + status="failed", + error=f"Stall timeout: no events for {self._stall_timeout_ms}ms", + ) + + # Check turn timeout + if turn_timeout_s > 0 and (time.monotonic() - turn_start) > turn_timeout_s: + return AgentResult( + status="failed", + error=f"Turn timeout: exceeded {self._turn_timeout_ms}ms", + ) + + msg = self._recv_line(stdout, timeout_s=read_timeout_s) + if msg is None: + # EOF — process likely terminated + return AgentResult( + status="failed", + error="Stall timeout: no events received (EOF)", + ) + + last_event_time = time.monotonic() + method = msg.get("method", "") + params = msg.get("params", {}) + + if method == "session_started": + if on_event: + on_event(AgentEvent(type="progress", message="Session started")) + + elif method == "notification": + message = params.get("message", "") + if on_event: + on_event(AgentEvent(type="progress", message=message)) + + elif method == "tool_call": + if stdin: + self._handle_approval(stdin, msg, on_event=on_event) + + elif method == "turn/completed": + input_t, output_t = self._extract_token_usage(msg) + return AgentResult( + status="completed", + token_usage=AdapterTokenUsage( + input_tokens=input_t, + output_tokens=output_t, + ), + ) + + elif method == "turn/failed": + error_msg = params.get("error", "Turn failed") + return AgentResult(status="failed", error=error_msg) + + elif method == "turn/cancelled": + return AgentResult(status="failed", error="Turn cancelled by Codex") + + # ------------------------------------------------------------------ + # Approval handling + # ------------------------------------------------------------------ + + def _handle_approval( + self, + stdin: Any, + event: dict, + *, + on_event: Callable[[AgentEvent], None] | None = None, + ) -> None: + """Handle a tool_call event that requires approval.""" + params = event.get("params", {}) + tool_id = params.get("id", "unknown") + tool_name = params.get("name", "unknown") + + if self._approval_policy == "auto": + self._send(stdin, "tool_call/approved", {"id": tool_id}) + if on_event: + on_event(AgentEvent( + type="progress", + message=f"Auto-approved tool call: {tool_name}", + )) + + # ------------------------------------------------------------------ + # Token usage + # ------------------------------------------------------------------ + + def _extract_token_usage(self, event: dict) -> tuple[int, int]: + """Extract (input_tokens, output_tokens) from a turn_completed event.""" + params = event.get("params", {}) + usage = params.get("usage", {}) + return ( + usage.get("input_tokens", 0), + usage.get("output_tokens", 0), + ) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _kill(process: subprocess.Popen) -> None: + """Terminate the subprocess if still running.""" + if process.poll() is None: + process.kill() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + pass + + def _detect_modified_files(self, workspace_path: Path) -> list[str]: + """Detect files modified by the subprocess via git diff.""" + try: + result = subprocess.run( + ["git", "diff", "--name-only", "HEAD"], + cwd=str(workspace_path), + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + return [] + + files = [f for f in result.stdout.strip().splitlines() if f] + + untracked = subprocess.run( + ["git", "ls-files", "--others", "--exclude-standard"], + cwd=str(workspace_path), + capture_output=True, + text=True, + timeout=10, + ) + if untracked.returncode == 0: + files.extend(f for f in untracked.stdout.strip().splitlines() if f) + + return list(dict.fromkeys(files)) + except (FileNotFoundError, OSError, subprocess.TimeoutExpired): + return [] diff --git a/codeframe/core/engine_registry.py b/codeframe/core/engine_registry.py index ec363a01..4771c941 100644 --- a/codeframe/core/engine_registry.py +++ b/codeframe/core/engine_registry.py @@ -13,6 +13,7 @@ "react", "plan", "claude-code", + "codex", "opencode", "built-in", # Alias for "react" }) @@ -20,6 +21,7 @@ # External engines that use subprocess adapters (no LLM provider needed) EXTERNAL_ENGINES = frozenset({ "claude-code", + "codex", "opencode", }) @@ -86,6 +88,10 @@ def get_external_adapter(engine: str, **kwargs: Any) -> AgentAdapter: from codeframe.core.adapters.claude_code import ClaudeCodeAdapter return ClaudeCodeAdapter(**kwargs) + elif engine == "codex": + from codeframe.core.adapters.codex import CodexAdapter + + return CodexAdapter(**kwargs) elif engine == "opencode": from codeframe.core.adapters.opencode import OpenCodeAdapter @@ -189,6 +195,9 @@ def _get_adapter_class(engine: str) -> type | None: elif engine == "claude-code": from codeframe.core.adapters.claude_code import ClaudeCodeAdapter return ClaudeCodeAdapter + elif engine == "codex": + from codeframe.core.adapters.codex import CodexAdapter + return CodexAdapter elif engine == "opencode": from codeframe.core.adapters.opencode import OpenCodeAdapter return OpenCodeAdapter diff --git a/tests/core/adapters/test_codex.py b/tests/core/adapters/test_codex.py new file mode 100644 index 00000000..686d454c --- /dev/null +++ b/tests/core/adapters/test_codex.py @@ -0,0 +1,377 @@ +"""Tests for Codex adapter (app-server JSON-RPC protocol).""" + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from codeframe.core.adapters.agent_adapter import AgentAdapter, AgentEvent + + +def _make_jsonrpc(method: str, params: dict | None = None, id_: int | None = None) -> str: + """Build a JSON-RPC line as the mock Codex process would emit.""" + msg: dict = {"jsonrpc": "2.0", "method": method} + if params is not None: + msg["params"] = params + if id_ is not None: + msg["id"] = id_ + return json.dumps(msg) + "\n" + + +class _FakeStdout: + """Simulate a subprocess stdout that yields pre-scripted JSON-RPC lines.""" + + def __init__(self, lines: list[str]) -> None: + self._lines = iter(lines) + + def readline(self) -> str: + try: + return next(self._lines) + except StopIteration: + return "" + + +class TestCodexAdapterImport: + """Verify module can be imported and conforms to protocol.""" + + def test_import(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter # noqa: F401 + + def test_conforms_to_protocol(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + assert isinstance(adapter, AgentAdapter) + + def test_name(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + assert adapter.name == "codex" + + def test_raises_if_binary_not_found(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value=None): + with pytest.raises(EnvironmentError, match="not found on PATH"): + CodexAdapter() + + +class TestCodexJsonRpc: + """Test JSON-RPC message framing helpers.""" + + def test_send_writes_json_line(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + stdin = MagicMock() + adapter._send(stdin, "initialize", {"capabilities": {}}, msg_id=1) + + written = stdin.write.call_args[0][0] + parsed = json.loads(written.strip()) + assert parsed["jsonrpc"] == "2.0" + assert parsed["method"] == "initialize" + assert parsed["id"] == 1 + stdin.flush.assert_called_once() + + def test_send_without_id(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + stdin = MagicMock() + adapter._send(stdin, "thread/start", {"thread_id": "t1"}) + + written = stdin.write.call_args[0][0] + parsed = json.loads(written.strip()) + assert "id" not in parsed + + def test_recv_line_parses_json(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + line = _make_jsonrpc("initialized", {"session_id": "s1"}) + stdout = _FakeStdout([line]) + + result = adapter._recv_line(stdout, timeout_s=5.0) + assert result is not None + assert result["method"] == "initialized" + + def test_recv_line_returns_none_on_eof(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + stdout = _FakeStdout([]) + result = adapter._recv_line(stdout, timeout_s=1.0) + assert result is None + + +class TestCodexHandshake: + """Test the 4-step initialization handshake.""" + + def test_successful_handshake(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + # Mock stdin + stdin = MagicMock() + + # Mock stdout: expect initialized response after initialize is sent + stdout = _FakeStdout([ + _make_jsonrpc("initialized", {"session_id": "s1"}), + ]) + + success = adapter._handshake( + stdin, stdout, prompt="fix the bug", workspace_path=Path("/tmp/repo") + ) + assert success is True + + # Verify 3 messages were sent: initialize, thread/start, turn/start + assert stdin.write.call_count == 3 + + def test_handshake_fails_on_timeout(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter(read_timeout_ms=100) + + stdin = MagicMock() + # Empty stdout = no initialized response + stdout = _FakeStdout([]) + + success = adapter._handshake( + stdin, stdout, prompt="fix the bug", workspace_path=Path("/tmp/repo") + ) + assert success is False + + +class TestCodexTurnStreaming: + """Test turn event streaming and routing.""" + + def _make_adapter(self, **kwargs): + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + return CodexAdapter(**kwargs) + + def test_turn_completed(self) -> None: + adapter = self._make_adapter() + events: list[AgentEvent] = [] + + stdout = _FakeStdout([ + _make_jsonrpc("session_started", {"session_id": "s1"}), + _make_jsonrpc("turn/completed", { + "usage": {"input_tokens": 100, "output_tokens": 50} + }), + ]) + + result = adapter._stream_turn(stdout, on_event=events.append) + assert result.status == "completed" + assert result.token_usage is not None + assert result.token_usage.input_tokens == 100 + assert result.token_usage.output_tokens == 50 + # Should have received a progress event for session_started + assert any(e.message == "Session started" for e in events) + + def test_turn_failed(self) -> None: + adapter = self._make_adapter() + + stdout = _FakeStdout([ + _make_jsonrpc("turn/failed", {"error": "syntax error in file"}), + ]) + + result = adapter._stream_turn(stdout, on_event=None) + assert result.status == "failed" + assert "syntax error" in (result.error or "") + + def test_turn_cancelled(self) -> None: + adapter = self._make_adapter() + + stdout = _FakeStdout([ + _make_jsonrpc("turn/cancelled", {}), + ]) + + result = adapter._stream_turn(stdout, on_event=None) + assert result.status == "failed" + assert "cancelled" in (result.error or "").lower() + + def test_notification_emits_progress(self) -> None: + adapter = self._make_adapter() + events: list[AgentEvent] = [] + + stdout = _FakeStdout([ + _make_jsonrpc("notification", {"message": "Reading file main.py"}), + _make_jsonrpc("turn/completed", {"usage": {}}), + ]) + + result = adapter._stream_turn(stdout, on_event=events.append) + assert result.status == "completed" + assert any("Reading file" in e.message for e in events) + + def test_stall_timeout(self) -> None: + adapter = self._make_adapter(stall_timeout_ms=100) + + # Stdout that blocks forever (empty) + stdout = _FakeStdout([]) + + result = adapter._stream_turn(stdout, on_event=None) + assert result.status == "failed" + assert "stall" in (result.error or "").lower() + + +class TestCodexApproval: + """Test tool_call approval handling.""" + + def _make_adapter(self, **kwargs): + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + return CodexAdapter(**kwargs) + + def test_auto_approve_sends_approved(self) -> None: + adapter = self._make_adapter(approval_policy="auto") + events: list[AgentEvent] = [] + + stdin = MagicMock() + event = {"method": "tool_call", "params": {"id": "tc-1", "name": "write_file"}} + + adapter._handle_approval(stdin, event, on_event=events.append) + + # Verify approved message sent + written = stdin.write.call_args[0][0] + parsed = json.loads(written.strip()) + assert parsed["method"] == "tool_call/approved" + assert parsed["params"]["id"] == "tc-1" + + def test_auto_approve_emits_event(self) -> None: + adapter = self._make_adapter(approval_policy="auto") + events: list[AgentEvent] = [] + + stdin = MagicMock() + event = {"method": "tool_call", "params": {"id": "tc-1", "name": "run_command"}} + + adapter._handle_approval(stdin, event, on_event=events.append) + assert any("auto-approved" in e.message.lower() for e in events) + + +class TestCodexFullRun: + """Integration test: full run() with mock subprocess.""" + + def test_successful_run(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + mock_process = MagicMock() + mock_process.stdin = MagicMock() + mock_process.stderr = MagicMock() + mock_process.stderr.read.return_value = "" + + # Script the stdout: handshake response + turn events + lines = [ + _make_jsonrpc("initialized", {"session_id": "s1"}), + _make_jsonrpc("session_started", {"session_id": "s1"}), + _make_jsonrpc("notification", {"message": "Editing file"}), + _make_jsonrpc("turn/completed", { + "usage": {"input_tokens": 500, "output_tokens": 200} + }), + ] + mock_process.stdout = _FakeStdout(lines) + mock_process.returncode = 0 + mock_process.poll.return_value = None + mock_process.wait.return_value = None + + events: list[AgentEvent] = [] + + with patch("subprocess.Popen", return_value=mock_process): + with patch.object(adapter, "_detect_modified_files", return_value=["src/main.py"]): + result = adapter.run( + "task-1", "fix the bug", Path("/tmp/repo"), + on_event=events.append, + ) + + assert result.status == "completed" + assert result.modified_files == ["src/main.py"] + assert result.token_usage is not None + assert result.token_usage.input_tokens == 500 + + def test_failed_run_handshake_fails(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter(read_timeout_ms=100) + + mock_process = MagicMock() + mock_process.stdin = MagicMock() + mock_process.stderr = MagicMock() + mock_process.stderr.read.return_value = "" + mock_process.stdout = _FakeStdout([]) # No handshake response + mock_process.returncode = None + mock_process.poll.return_value = None + mock_process.wait.return_value = None + mock_process.kill.return_value = None + + with patch("subprocess.Popen", return_value=mock_process): + result = adapter.run("task-1", "fix the bug", Path("/tmp/repo")) + + assert result.status == "failed" + assert "handshake" in (result.error or "").lower() + + def test_binary_not_found_during_execution(self) -> None: + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + adapter = CodexAdapter() + + with patch("subprocess.Popen", side_effect=FileNotFoundError("codex not found")): + result = adapter.run("task-1", "fix the bug", Path("/tmp/repo")) + + assert result.status == "failed" + assert "not found" in (result.error or "").lower() + + +class TestCodexTokenExtraction: + """Test token usage extraction from Codex events.""" + + def _make_adapter(self): + from codeframe.core.adapters.codex import CodexAdapter + + with patch("shutil.which", return_value="/usr/bin/codex"): + return CodexAdapter() + + def test_extracts_tokens_from_usage(self) -> None: + adapter = self._make_adapter() + event = {"params": {"usage": {"input_tokens": 1000, "output_tokens": 500}}} + + input_t, output_t = adapter._extract_token_usage(event) + assert input_t == 1000 + assert output_t == 500 + + def test_returns_zero_on_missing_usage(self) -> None: + adapter = self._make_adapter() + event = {"params": {}} + + input_t, output_t = adapter._extract_token_usage(event) + assert input_t == 0 + assert output_t == 0 + + def test_returns_zero_on_missing_params(self) -> None: + adapter = self._make_adapter() + event = {} + + input_t, output_t = adapter._extract_token_usage(event) + assert input_t == 0 + assert output_t == 0 From b2b47e58ee75bc660f6396ecc2fa7dbc0320ae3e Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 13 Mar 2026 09:35:10 -0700 Subject: [PATCH 2/4] fix(adapters): address code review findings for CodexAdapter - Enforce read timeout via selectors (prevents readline() deadlock) - Handle non-auto approval policies by rejecting tool calls - Extract _detect_modified_files to shared git_utils module - Add test for non-auto approval policy rejection --- codeframe/core/adapters/codex.py | 60 +++++++++---------- codeframe/core/adapters/git_utils.py | 40 +++++++++++++ codeframe/core/adapters/subprocess_adapter.py | 39 +----------- tests/core/adapters/test_codex.py | 15 +++++ 4 files changed, 88 insertions(+), 66 deletions(-) create mode 100644 codeframe/core/adapters/git_utils.py diff --git a/codeframe/core/adapters/codex.py b/codeframe/core/adapters/codex.py index 10d5279d..f3312bc0 100644 --- a/codeframe/core/adapters/codex.py +++ b/codeframe/core/adapters/codex.py @@ -12,6 +12,7 @@ from __future__ import annotations import json +import selectors import shutil import subprocess import threading @@ -25,6 +26,7 @@ AgentEvent, AgentResult, ) +from codeframe.core.adapters.git_utils import detect_modified_files class CodexAdapter: @@ -161,11 +163,26 @@ def _send( stdin.flush() def _recv_line(self, stdout: Any, timeout_s: float) -> dict | None: - """Read one JSON-RPC line from stdout. + """Read one JSON-RPC line from stdout with enforced timeout. - Returns the parsed dict, or None on EOF / empty line. - Timeout is approximate (based on readline blocking behaviour). + Uses ``selectors`` to wait for data availability before reading, + preventing indefinite blocking if the subprocess stops writing. + + Returns the parsed dict, or None on EOF / timeout. """ + # Use selectors for real timeout enforcement on file-based stdout. + # Mock objects (in tests) won't have fileno(), so fall back to + # direct readline for those. + if hasattr(stdout, "fileno"): + sel = selectors.DefaultSelector() + try: + sel.register(stdout, selectors.EVENT_READ) + ready = sel.select(timeout=timeout_s) + finally: + sel.close() + if not ready: + return None + line = stdout.readline() if not line: return None @@ -324,6 +341,13 @@ def _handle_approval( type="progress", message=f"Auto-approved tool call: {tool_name}", )) + else: + self._send(stdin, "tool_call/rejected", {"id": tool_id}) + if on_event: + on_event(AgentEvent( + type="progress", + message=f"Rejected tool call (policy={self._approval_policy}): {tool_name}", + )) # ------------------------------------------------------------------ # Token usage @@ -352,31 +376,7 @@ def _kill(process: subprocess.Popen) -> None: except subprocess.TimeoutExpired: pass - def _detect_modified_files(self, workspace_path: Path) -> list[str]: + @staticmethod + def _detect_modified_files(workspace_path: Path) -> list[str]: """Detect files modified by the subprocess via git diff.""" - try: - result = subprocess.run( - ["git", "diff", "--name-only", "HEAD"], - cwd=str(workspace_path), - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode != 0: - return [] - - files = [f for f in result.stdout.strip().splitlines() if f] - - untracked = subprocess.run( - ["git", "ls-files", "--others", "--exclude-standard"], - cwd=str(workspace_path), - capture_output=True, - text=True, - timeout=10, - ) - if untracked.returncode == 0: - files.extend(f for f in untracked.stdout.strip().splitlines() if f) - - return list(dict.fromkeys(files)) - except (FileNotFoundError, OSError, subprocess.TimeoutExpired): - return [] + return detect_modified_files(workspace_path) diff --git a/codeframe/core/adapters/git_utils.py b/codeframe/core/adapters/git_utils.py new file mode 100644 index 00000000..306f80ba --- /dev/null +++ b/codeframe/core/adapters/git_utils.py @@ -0,0 +1,40 @@ +"""Shared git utilities for adapter file detection.""" + +from __future__ import annotations + +import subprocess +from pathlib import Path + + +def detect_modified_files(workspace_path: Path) -> list[str]: + """Detect files modified in a workspace via git diff. + + Combines modified, staged, and untracked files. Returns an empty list + if git is unavailable or the workspace is not a git repo. + """ + try: + result = subprocess.run( + ["git", "diff", "--name-only", "HEAD"], + cwd=str(workspace_path), + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + return [] + + files = [f for f in result.stdout.strip().splitlines() if f] + + untracked = subprocess.run( + ["git", "ls-files", "--others", "--exclude-standard"], + cwd=str(workspace_path), + capture_output=True, + text=True, + timeout=10, + ) + if untracked.returncode == 0: + files.extend(f for f in untracked.stdout.strip().splitlines() if f) + + return list(dict.fromkeys(files)) + except (FileNotFoundError, OSError, subprocess.TimeoutExpired): + return [] diff --git a/codeframe/core/adapters/subprocess_adapter.py b/codeframe/core/adapters/subprocess_adapter.py index 8106e041..0565ebf7 100644 --- a/codeframe/core/adapters/subprocess_adapter.py +++ b/codeframe/core/adapters/subprocess_adapter.py @@ -9,6 +9,7 @@ from typing import Callable from codeframe.core.adapters.agent_adapter import AgentEvent, AgentResult +from codeframe.core.adapters.git_utils import detect_modified_files from codeframe.core.blocker_detection import classify_error_for_blocker @@ -201,42 +202,8 @@ def _map_result( ) def _detect_modified_files(self, workspace_path: Path) -> list[str]: - """Detect files modified by the subprocess via git diff. - - Combines modified, staged, and untracked files. Returns an empty list - if git is unavailable or the workspace is not a git repo. - """ - try: - result = subprocess.run( - ["git", "diff", "--name-only", "HEAD"], - cwd=str(workspace_path), - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode != 0: - # Also covers repos with no commits (HEAD does not exist) - return [] - - files = [f for f in result.stdout.strip().splitlines() if f] - - # Also pick up untracked files - untracked = subprocess.run( - ["git", "ls-files", "--others", "--exclude-standard"], - cwd=str(workspace_path), - capture_output=True, - text=True, - timeout=10, - ) - if untracked.returncode == 0: - files.extend( - f for f in untracked.stdout.strip().splitlines() if f - ) - - # Deduplicate while preserving order - return list(dict.fromkeys(files)) - except (FileNotFoundError, OSError, subprocess.TimeoutExpired): - return [] + """Detect files modified by the subprocess via git diff.""" + return detect_modified_files(workspace_path) def _extract_blocker_question(self, output: str) -> str: """Extract a meaningful blocker question from output.""" diff --git a/tests/core/adapters/test_codex.py b/tests/core/adapters/test_codex.py index 686d454c..0797ba02 100644 --- a/tests/core/adapters/test_codex.py +++ b/tests/core/adapters/test_codex.py @@ -265,6 +265,21 @@ def test_auto_approve_emits_event(self) -> None: adapter._handle_approval(stdin, event, on_event=events.append) assert any("auto-approved" in e.message.lower() for e in events) + def test_non_auto_policy_rejects_tool_call(self) -> None: + adapter = self._make_adapter(approval_policy="require") + events: list[AgentEvent] = [] + + stdin = MagicMock() + event = {"method": "tool_call", "params": {"id": "tc-2", "name": "write_file"}} + + adapter._handle_approval(stdin, event, on_event=events.append) + + written = stdin.write.call_args[0][0] + parsed = json.loads(written.strip()) + assert parsed["method"] == "tool_call/rejected" + assert parsed["params"]["id"] == "tc-2" + assert any("rejected" in e.message.lower() for e in events) + class TestCodexFullRun: """Integration test: full run() with mock subprocess.""" From 95b80246df3d90906241cacc61efb061759f5990 Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 13 Mar 2026 09:50:05 -0700 Subject: [PATCH 3/4] fix(adapters): address CodeRabbit review feedback - Differentiate read timeout vs EOF in _recv_line using _TIMEOUT sentinel (prevents premature failure when stall/turn timeouts should still apply) - Forward sandbox_mode to initialize params (was a no-op) - Add pytestmark = pytest.mark.v2 to test module --- codeframe/core/adapters/codex.py | 21 ++++++++++++++++----- tests/core/adapters/test_codex.py | 11 ++++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/codeframe/core/adapters/codex.py b/codeframe/core/adapters/codex.py index f3312bc0..bd1617b4 100644 --- a/codeframe/core/adapters/codex.py +++ b/codeframe/core/adapters/codex.py @@ -29,6 +29,9 @@ from codeframe.core.adapters.git_utils import detect_modified_files +_TIMEOUT = object() # Sentinel for read timeout (distinct from EOF/None) + + class CodexAdapter: """Adapter that delegates code execution to OpenAI Codex via app-server protocol. @@ -162,13 +165,15 @@ def _send( stdin.write(json.dumps(msg) + "\n") stdin.flush() - def _recv_line(self, stdout: Any, timeout_s: float) -> dict | None: + def _recv_line(self, stdout: Any, timeout_s: float) -> dict | object | None: """Read one JSON-RPC line from stdout with enforced timeout. Uses ``selectors`` to wait for data availability before reading, preventing indefinite blocking if the subprocess stops writing. - Returns the parsed dict, or None on EOF / timeout. + Returns: + Parsed dict on success, ``_TIMEOUT`` sentinel on timeout (caller + should loop and re-check stall/turn timeouts), or ``None`` on EOF. """ # Use selectors for real timeout enforcement on file-based stdout. # Mock objects (in tests) won't have fileno(), so fall back to @@ -181,7 +186,7 @@ def _recv_line(self, stdout: Any, timeout_s: float) -> dict | None: finally: sel.close() if not ready: - return None + return _TIMEOUT line = stdout.readline() if not line: @@ -216,7 +221,10 @@ def _handshake( turn_id = str(uuid.uuid4()) # Step 1: initialize - self._send(stdin, "initialize", {"capabilities": {}}, msg_id=1) + init_params: dict[str, Any] = {"capabilities": {}} + if self._sandbox_mode: + init_params["sandbox_mode"] = self._sandbox_mode + self._send(stdin, "initialize", init_params, msg_id=1) # Step 2: wait for initialized timeout_s = self._read_timeout_ms / 1000 @@ -277,11 +285,14 @@ def _stream_turn( ) msg = self._recv_line(stdout, timeout_s=read_timeout_s) + if msg is _TIMEOUT: + # Read timed out — loop back to check stall/turn timeouts + continue if msg is None: # EOF — process likely terminated return AgentResult( status="failed", - error="Stall timeout: no events received (EOF)", + error="Process terminated unexpectedly (EOF)", ) last_event_time = time.monotonic() diff --git a/tests/core/adapters/test_codex.py b/tests/core/adapters/test_codex.py index 0797ba02..305fd7bf 100644 --- a/tests/core/adapters/test_codex.py +++ b/tests/core/adapters/test_codex.py @@ -5,9 +5,10 @@ from unittest.mock import MagicMock, patch import pytest - from codeframe.core.adapters.agent_adapter import AgentAdapter, AgentEvent +pytestmark = pytest.mark.v2 + def _make_jsonrpc(method: str, params: dict | None = None, id_: int | None = None) -> str: """Build a JSON-RPC line as the mock Codex process would emit.""" @@ -220,15 +221,15 @@ def test_notification_emits_progress(self) -> None: assert result.status == "completed" assert any("Reading file" in e.message for e in events) - def test_stall_timeout(self) -> None: - adapter = self._make_adapter(stall_timeout_ms=100) + def test_eof_detected_as_failure(self) -> None: + adapter = self._make_adapter() - # Stdout that blocks forever (empty) + # Empty stdout = process terminated (EOF) stdout = _FakeStdout([]) result = adapter._stream_turn(stdout, on_event=None) assert result.status == "failed" - assert "stall" in (result.error or "").lower() + assert "eof" in (result.error or "").lower() class TestCodexApproval: From cdbda79c2f5785f3b98c93271bf687768f1ce581 Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 13 Mar 2026 09:57:03 -0700 Subject: [PATCH 4/4] fix(adapters): handle _TIMEOUT sentinel in handshake response check Prevents AttributeError when _recv_line returns _TIMEOUT during the initialized wait step (CodeRabbit finding). --- codeframe/core/adapters/codex.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codeframe/core/adapters/codex.py b/codeframe/core/adapters/codex.py index bd1617b4..2f792b89 100644 --- a/codeframe/core/adapters/codex.py +++ b/codeframe/core/adapters/codex.py @@ -229,7 +229,7 @@ def _handshake( # Step 2: wait for initialized timeout_s = self._read_timeout_ms / 1000 response = self._recv_line(stdout, timeout_s=timeout_s) - if response is None: + if response is _TIMEOUT or response is None: return False # Accept either method="initialized" or a result response to id=1