diff --git a/lib/provider_backends/codex/bridge_runtime/init_probe.py b/lib/provider_backends/codex/bridge_runtime/init_probe.py new file mode 100644 index 00000000..a6960522 --- /dev/null +++ b/lib/provider_backends/codex/bridge_runtime/init_probe.py @@ -0,0 +1,111 @@ +"""Codex TUI ready detection probe (Q3 Stage 1a). + +Implements provider-specific InitGateProbe for Codex CLI cold-start +detection — checks for banner dissipation and input prompt readiness. +""" +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from provider_core.init_gate import InitGateProbe + + +# Banner/welcome screen strings that indicate TUI is NOT yet ready. +# Captured from Codex v0.124.x cold-start welcome screen. +# If Codex CLI upgrades change these, patch this constant. +CODEX_INIT_BANNERS: tuple[str, ...] = ( + "OpenAI Codex", + "Sign in with ChatGPT", + "Trust this workspace", + "Choose a model", + "✦ Welcome to", +) + + +class CodexInitProbe: + """Probe Codex TUI for 'input box ready' state. + + Implements InitGateProbe protocol for use with InitGate. + + Ready criteria (AND — all must be true): + S1: Welcome banner strings NOT present in visible screen + S2: Last non-empty line starts with "› " (idle input prompt) + + Uses visible-screen-only capture (no scrollback) to avoid + false negatives from historical banner in scrollback. + """ + + def __init__( + self, + *, + pane_id: str, + tmux_run_fn: Callable[[list[str]], str], + ) -> None: + """Initialize probe. + + Args: + pane_id: Tmux pane identifier (e.g., "%3") + tmux_run_fn: Callable that runs tmux command and returns stdout. + Expected signature: (args: list[str]) -> str + """ + self._pane_id = pane_id + self._tmux_run = tmux_run_fn + + def detect(self) -> bool: + """Return True if Codex TUI is ready for input. + + Conservative: any error or ambiguity returns False. + """ + try: + capture = self._capture_visible() + except Exception: + # tmux failure or capture error — conservative fail + return False + + return self._banner_gone(capture) and self._prompt_on_last_line(capture) + + def _capture_visible(self) -> str: + """Capture visible screen (no scrollback) from pane. + + Uses `capture-pane -p -t ` without `-S` parameter, + so only currently visible lines are returned. + """ + # Note: no -S flag — we want visible screen only, not scrollback + args = ["capture-pane", "-p", "-t", self._pane_id] + return self._tmux_run(args) + + def _banner_gone(self, capture: str) -> bool: + """S1: Check that welcome banner strings are NOT present.""" + capture_lower = capture.lower() + for banner in CODEX_INIT_BANNERS: + if banner.lower() in capture_lower: + return False + return True + + def _prompt_on_last_line(self, capture: str) -> bool: + """S2: Check that last non-empty line is idle input prompt. + + Codex idle prompt: line starting with "› " (caret + space). + Tolerates placeholder hint after the caret (e.g., + "› Improve documentation in @filename") — this is normal idle. + """ + # Filter to non-empty lines only + lines = [ln for ln in capture.splitlines() if ln.strip()] + if not lines: + return False + + last = lines[-1] + # Strip leading whitespace (pane may have indent), check prefix + return last.lstrip().startswith("› ") + + def capture_visible_for_diagnostics(self) -> str: + """Expose visible capture for InitGate diagnostics. + + Returns empty string on any error (conservative). + """ + try: + return self._capture_visible() + except Exception: + return "" diff --git a/lib/provider_backends/codex/bridge_runtime/runtime_state.py b/lib/provider_backends/codex/bridge_runtime/runtime_state.py index dcee53cc..9095e4f3 100644 --- a/lib/provider_backends/codex/bridge_runtime/runtime_state.py +++ b/lib/provider_backends/codex/bridge_runtime/runtime_state.py @@ -1,16 +1,23 @@ +"""Codex bridge runtime state (Q3 Stage 1a — Init Gate integrated).""" from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path +from typing import TYPE_CHECKING from provider_backends.codex.runtime_artifacts import ensure_runtime_artifact_layout from .binding import CodexBindingTracker from .session import TerminalCodexSession +if TYPE_CHECKING: + from provider_core.init_gate import InitGate + @dataclass(frozen=True) class BridgePaths: + """Paths for bridge runtime.""" + runtime_dir: Path input_fifo: Path completion_dir: Path @@ -19,14 +26,39 @@ class BridgePaths: bridge_log: Path -@dataclass(frozen=True) +@dataclass class BridgeRuntimeState: + """Mutable runtime state for the bridge process. + + Contains initialization gate and FIFO holder for lifecycle management. + """ + paths: BridgePaths binding_tracker: CodexBindingTracker codex_session: TerminalCodexSession + init_gate: InitGate = field(default=None) # type: ignore[assignment] + fifo_holder_fd: int | None = None + + +def build_bridge_runtime_state( + runtime_dir: Path, + *, + pane_id: str, + tmux_backend, +) -> BridgeRuntimeState: + """Build runtime state with Init Gate constructed. + + Args: + runtime_dir: Directory for bridge runtime files. + pane_id: Tmux pane identifier. + tmux_backend: Backend for tmux operations. + Returns: + BridgeRuntimeState with InitGate constructed. + """ + from provider_core.init_gate import InitGate, load_init_gate_env + from provider_backends.codex.bridge_runtime.init_probe import CodexInitProbe -def build_bridge_runtime_state(runtime_dir: Path, *, pane_id: str) -> BridgeRuntimeState: artifacts = ensure_runtime_artifact_layout(runtime_dir) paths = BridgePaths( runtime_dir=artifacts.runtime_dir, @@ -36,10 +68,51 @@ def build_bridge_runtime_state(runtime_dir: Path, *, pane_id: str) -> BridgeRunt history_file=artifacts.history_file, bridge_log=artifacts.bridge_log, ) + + # Adapter: TmuxBackend._tmux_run returns subprocess.CompletedProcess, + # but CodexInitProbe expects tmux_run_fn(args) -> str. Extract stdout + # (decode bytes if needed) and return "" on any failure (conservative; + # probe.detect already treats "" as not-ready). + def _tmux_run_str(args: list[str]) -> str: + result = tmux_backend._tmux_run(args, capture=True, timeout=2.0, check=False) + if result is None: + return "" + out = getattr(result, "stdout", "") + if isinstance(out, bytes): + try: + return out.decode("utf-8", errors="replace") + except Exception: + return "" + return out if isinstance(out, str) else "" + + # Construct Init Gate with Codex probe + probe = CodexInitProbe( + pane_id=pane_id, + tmux_run_fn=_tmux_run_str, + ) + + def capture_fn() -> str: + """Capture visible pane for InitGate diagnostics.""" + try: + return probe.capture_visible_for_diagnostics() + except Exception: + return "" + + gate_kwargs = load_init_gate_env("codex") + init_gate = InitGate( + probe=probe, + provider="codex", + runtime_dir=runtime_dir, + capture_fn=capture_fn, + log_fn=lambda msg: print(msg, flush=True), + **gate_kwargs, + ) + return BridgeRuntimeState( paths=paths, binding_tracker=CodexBindingTracker(runtime_dir), codex_session=TerminalCodexSession(pane_id), + init_gate=init_gate, ) diff --git a/lib/provider_backends/codex/bridge_runtime/service.py b/lib/provider_backends/codex/bridge_runtime/service.py index b89c7290..a27801bf 100644 --- a/lib/provider_backends/codex/bridge_runtime/service.py +++ b/lib/provider_backends/codex/bridge_runtime/service.py @@ -1,3 +1,4 @@ +"""Claude ↔ Codex bridge main process (Q3 Stage 1a — Init Gate integrated).""" from __future__ import annotations import os @@ -12,15 +13,38 @@ class DualBridge: - """Claude ↔ Codex bridge main process""" + """Claude ↔ Codex bridge main process.""" def __init__(self, runtime_dir: Path): pane_id = os.environ.get('CODEX_TMUX_SESSION') if not pane_id: raise RuntimeError('Missing CODEX_TMUX_SESSION environment variable') - self._runtime = build_bridge_runtime_state(runtime_dir, pane_id=pane_id) + # Import here to avoid circular imports + from terminal_runtime.tmux_backend import TmuxBackend + + # Create tmux backend for init probe capture (no-arg constructor; + # pane id is bound via tmux_run_fn closure inside build_bridge_runtime_state) + tmux_backend = TmuxBackend() + + self._runtime = build_bridge_runtime_state( + runtime_dir, + pane_id=pane_id, + tmux_backend=tmux_backend, + ) self._running = True + + # Q3-S1a.3: Open FIFO holder before Init Gate starts + # This allows upstream writer to proceed without blocking on open(O_WRONLY) + try: + self._runtime.fifo_holder_fd = os.open( + str(self._runtime.paths.input_fifo), + os.O_RDONLY | os.O_NONBLOCK, + ) + except FileNotFoundError: + self._log_console("input.fifo not found at init; proceeding without holder") + self._runtime.fifo_holder_fd = None + signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) @@ -53,17 +77,52 @@ def codex_session(self): return self._runtime.codex_session def _handle_signal(self, signum: int, _: Any) -> None: + """Handle termination signals.""" self._running = False - self.binding_tracker.stop() self._log_console(f'Received signal {signum}, exiting...') + self._teardown() + + def _teardown(self) -> None: + """Cleanup holder fd + stop binding tracker (idempotent).""" + # Close FIFO holder if open + fd = self._runtime.fifo_holder_fd + if fd is not None: + try: + os.close(fd) + except OSError as exc: + self._log_console(f"fifo holder close failed: {exc}") + self._runtime.fifo_holder_fd = None + + # Stop binding tracker (may not have been started) + try: + self.binding_tracker.stop() + except Exception as exc: + self._log_console(f"binding tracker stop failed: {exc}") def run(self) -> int: + """Run the bridge main loop. + + Returns: + 0 on normal exit, 3 on INIT_FAIL. + """ self._log_console('Codex bridge started, waiting for Claude commands...') + + # Q3-S1a.3: Init Gate — wait for TUI to be ready before entering main loop + if not self._runtime.init_gate.wait_until_ready(): + self._log_console( + f"[InitGate] INIT_FAIL: {self._runtime.init_gate.last_reason}" + ) + self._teardown() + return 3 + + # Init Gate passed — enter normal main loop self.binding_tracker.start() + idle_sleep = env_float('CCB_BRIDGE_IDLE_SLEEP', 0.05) error_backoff_min = env_float('CCB_BRIDGE_ERROR_BACKOFF_MIN', 0.05) error_backoff_max = env_float('CCB_BRIDGE_ERROR_BACKOFF_MAX', 0.2) error_backoff = max(0.0, min(error_backoff_min, error_backoff_max)) + try: while self._running: try: @@ -73,7 +132,9 @@ def run(self) -> int: time.sleep(idle_sleep) continue self._process_request(payload) - error_backoff = max(0.0, min(error_backoff_min, error_backoff_max)) + error_backoff = max( + 0.0, min(error_backoff_min, error_backoff_max) + ) except KeyboardInterrupt: self._running = False except Exception as exc: @@ -82,9 +143,12 @@ def run(self) -> int: if error_backoff: time.sleep(error_backoff) if error_backoff_max: - error_backoff = min(error_backoff_max, max(error_backoff_min, error_backoff * 2)) + error_backoff = min( + error_backoff_max, + max(error_backoff_min, error_backoff * 2) + ) finally: - self.binding_tracker.stop() + self._teardown() self._log_console('Codex bridge exited') return 0 diff --git a/lib/provider_core/init_gate.py b/lib/provider_core/init_gate.py new file mode 100644 index 00000000..93653f86 --- /dev/null +++ b/lib/provider_core/init_gate.py @@ -0,0 +1,258 @@ +"""Init Gate state machine for provider bridge ready detection (Q3 Stage 1). + +Provides a generic polling-based gate that blocks bridge startup until +the provider TUI is in a ready-to-receive state, with deadline-based +failure detection and diagnostic capture. +""" +from __future__ import annotations + +import json +import os +import time +from collections import deque +from dataclasses import dataclass, field +from enum import Enum, auto +from pathlib import Path +from typing import Callable, Protocol + + +class InitGateState(Enum): + """Init Gate lifecycle states.""" + + LAUNCHED = auto() + INITIALIZING = auto() + READY = auto() + INIT_FAIL = auto() + + +class InitGateProbe(Protocol): + """Protocol for provider-specific ready detection.""" + + def detect(self) -> bool: + """Return True if the provider TUI is ready to receive input.""" + ... + + +@dataclass +class _ProbeAttempt: + """Record of a single probe attempt for diagnostics.""" + + t_offset_s: float + detected: bool + + +@dataclass +class _CaptureEntry: + """Pane capture entry with timestamp offset.""" + + t_offset_s: float + capture: str + + +@dataclass +class InitGate: + """Generic Init Gate with segmented polling and steady-state debounce. + + Args: + probe: Provider-specific probe implementing InitGateProbe. + provider: Provider name (e.g., "codex", "gemini", "claude"). + runtime_dir: Directory for init_gate_failure.json output. + capture_fn: Callable returning latest pane capture text. + deadline_s: Total timeout for init gate (seconds). + poll_fast_ms: Initial polling period in milliseconds (first switch_s seconds). + poll_slow_ms: Slower polling period in milliseconds (after switch_s). + poll_switch_s: Time threshold to switch from fast to slow polling. + steady_count: Required consecutive positive probes before committing READY. + bypass: If True, skip gate and return immediately (emergency override). + clock: Monotonic time source (default: time.monotonic). + sleep_fn: Sleep function (default: time.sleep). + log_fn: Logging function (default: writes to stderr). + """ + + probe: InitGateProbe + provider: str + runtime_dir: Path + capture_fn: Callable[[], str] + deadline_s: float + poll_fast_ms: int + poll_slow_ms: int + poll_switch_s: float + steady_count: int + bypass: bool + clock: Callable[[], float] = time.monotonic + sleep_fn: Callable[[float], None] = time.sleep + log_fn: Callable[[str], None] = field( + default_factory=lambda: lambda msg: print(msg, file=os.sys.stderr) + ) + + # State (initialized post-construction) + _state: InitGateState = field(init=False, default=InitGateState.LAUNCHED) + _last_reason: str = field(init=False, default="") + _start_time: float = field(init=False, default=0.0) + _probes_attempted: list[_ProbeAttempt] = field(init=False, default_factory=list) + _recent_captures: deque[_CaptureEntry] = field( + init=False, default_factory=lambda: deque(maxlen=3) + ) + + def __post_init__(self) -> None: + """Initialize mutable fields.""" + object.__setattr__(self, "_state", InitGateState.LAUNCHED) + object.__setattr__(self, "_last_reason", "") + object.__setattr__(self, "_start_time", 0.0) + object.__setattr__(self, "_probes_attempted", []) + object.__setattr__(self, "_recent_captures", deque(maxlen=3)) + + @property + def last_reason(self) -> str: + """Return the failure reason after INIT_FAIL.""" + return self._last_reason + + def wait_until_ready(self) -> bool: + """Run the init gate until READY or INIT_FAIL. + + Returns: + True on READY, False on INIT_FAIL. + """ + self._state = InitGateState.INITIALIZING + self._start_time = self.clock() + + if self.bypass: + self._log_warn("[InitGate] BYPASS enabled — skipping ready detection") + self._state = InitGateState.READY + return True + + # Emit startup log with segmented polling description + self.log_fn( + f"[InitGate] waiting for {self.provider} TUI " + f"(deadline: {self.deadline_s}s, " + f"poll: {self.poll_fast_ms}ms→{self.poll_slow_ms}ms@{self.poll_switch_s}s) ..." + ) + + consecutive_positives = 0 + poll_count = 0 + + while True: + now = self.clock() + elapsed = now - self._start_time + + # Check deadline + if elapsed >= self.deadline_s: + self._record_failure( + reason="deadline_exceeded", + elapsed_s=elapsed, + probes=self._probes_attempted, + ) + self._state = InitGateState.INIT_FAIL + self._last_reason = "deadline_exceeded" + return False + + # Determine poll period + period_s = ( + self.poll_fast_ms / 1000.0 + if elapsed < self.poll_switch_s + else self.poll_slow_ms / 1000.0 + ) + + # Probe + try: + detected = self.probe.detect() + except Exception: + detected = False + + poll_count += 1 + self._probes_attempted.append( + _ProbeAttempt(t_offset_s=round(elapsed, 3), detected=detected) + ) + + # Capture pane for diagnostics (last 3 only) + try: + capture = self.capture_fn() + self._recent_captures.append( + _CaptureEntry(t_offset_s=round(elapsed, 3), capture=capture) + ) + except Exception: + pass + + # Steady-state check + if detected: + consecutive_positives += 1 + if consecutive_positives >= self.steady_count: + self._state = InitGateState.READY + return True + else: + consecutive_positives = 0 + + # Sleep until next poll + self.sleep_fn(period_s) + + def _record_failure( + self, + *, + reason: str, + elapsed_s: float, + probes: list[_ProbeAttempt], + ) -> None: + """Write init_gate_failure.json with diagnostic information.""" + failure_path = self.runtime_dir / "init_gate_failure.json" + self.runtime_dir.mkdir(parents=True, exist_ok=True) + + failure_data = { + "provider": self.provider, + "reason": reason, + "deadline_s": self.deadline_s, + "elapsed_s": round(elapsed_s, 3), + "init_gate_bypass": self.bypass, + "recent_pane_captures": [ + {"t_offset_s": e.t_offset_s, "capture": e.capture} + for e in list(self._recent_captures) + ], + "probes_attempted": [ + {"t_offset_s": p.t_offset_s, "detected": p.detected} + for p in probes + ], + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"), + } + + try: + with open(failure_path, "w") as f: + json.dump(failure_data, f, indent=2) + f.flush() + os.fsync(f.fileno()) + except OSError as e: + self._log_warn(f"[InitGate] failed to write failure.json: {e}") + + def _log_warn(self, msg: str) -> None: + """Log a warning message.""" + self.log_fn(msg) + + +def load_init_gate_env(provider: str) -> dict: + """Load InitGate constructor kwargs from environment variables. + + Generic vars: CCB_INIT_GATE_DEADLINE_S, CCB_INIT_GATE_POLL_FAST_MS, etc. + Per-provider vars: CCB__INIT_DEADLINE_S, etc. + + Per-provider overrides generic. + + Args: + provider: Provider name (e.g., "codex"). + + Returns: + Dict of kwargs suitable for InitGate construction. + """ + provider_upper = provider.upper() + + def get_env(key: str, default: str) -> str: + """Check per-provider first, then generic.""" + provider_key = f"CCB_{provider_upper}_INIT_{key.upper()}" + generic_key = f"CCB_INIT_GATE_{key.upper()}" + return os.environ.get(provider_key, os.environ.get(generic_key, default)) + + return { + "deadline_s": float(get_env("DEADLINE_S", "30")), + "poll_fast_ms": int(get_env("POLL_FAST_MS", "200")), + "poll_slow_ms": int(get_env("POLL_SLOW_MS", "500")), + "poll_switch_s": float(get_env("POLL_SWITCH_S", "5")), + "steady_count": int(get_env("STEADY_COUNT", "2")), + "bypass": get_env("BYPASS", "0").strip().lower() in ("1", "true", "yes", "on"), + } diff --git a/test/test_codex_communicator_init.py b/test/test_codex_communicator_init.py new file mode 100644 index 00000000..7a557196 --- /dev/null +++ b/test/test_codex_communicator_init.py @@ -0,0 +1,123 @@ +"""Tests for Codex bridge Init Gate integration (Q3-S1a.3).""" +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from provider_backends.codex.bridge_runtime.runtime_state import ( + BridgeRuntimeState, + build_bridge_runtime_state, +) + + +class TestDualBridgeInit: + """Test DualBridge __init__ and Init Gate integration.""" + + def test_dualbridge_init_opens_fifo_holder(self, tmp_path: Path, monkeypatch): + """FIFO holder fd is opened in __init__.""" + import tempfile + + # Need to mock before importing DualBridge + monkeypatch.setenv('CODEX_TMUX_SESSION', '%3') + + with tempfile.NamedTemporaryFile(mode='w', delete=False) as fifo: + fifo_path = fifo.name + + try: + from provider_backends.codex.bridge_runtime.service import DualBridge + + # Mock os.open to capture the call + open_calls = [] + original_os_open = os.open + + def mock_os_open(path, flags): + open_calls.append((path, flags)) + # Return a real fd we can close later + return original_os_open('/dev/null', os.O_RDONLY) + + monkeypatch.setattr(os, 'open', mock_os_open) + + # Create runtime dir with pre-existing fifo + runtime_dir = tmp_path / 'runtime' + runtime_dir.mkdir() + + bridge = DualBridge(runtime_dir) + + # Verify os.open was called with correct args + assert len(open_calls) >= 1 + fifo_calls = [c for c in open_calls if 'input.fifo' in str(c[0])] + assert len(fifo_calls) >= 1 + + # Check flags include NONBLOCK and access mode is RDONLY. + # Note: os.O_RDONLY == 0 on POSIX, so it cannot be detected via + # bitmask AND. Use the access-mode bits (lower 2 bits) instead. + path, flags = fifo_calls[0] + assert (flags & os.O_ACCMODE) == os.O_RDONLY + assert flags & os.O_NONBLOCK + + finally: + os.unlink(fifo_path) + + def test_dualbridge_run_blocks_on_init_gate_false(self, tmp_path: Path, monkeypatch): + """run() returns 3 when InitGate returns False.""" + monkeypatch.setenv('CODEX_TMUX_SESSION', '%3') + + from provider_backends.codex.bridge_runtime.service import DualBridge + + # Mock build_bridge_runtime_state to inject a fake init_gate + fake_gate = MagicMock() + fake_gate.wait_until_ready.return_value = False + fake_gate.last_reason = "deadline_exceeded" + + with patch( + 'provider_backends.codex.bridge_runtime.service.build_bridge_runtime_state' + ) as mock_build: + mock_state = MagicMock() + mock_state.init_gate = fake_gate + mock_state.fifo_holder_fd = None + mock_state.paths.input_fifo = tmp_path / 'input.fifo' + mock_build.return_value = mock_state + + bridge = DualBridge(tmp_path) + exit_code = bridge.run() + + assert exit_code == 3 + fake_gate.wait_until_ready.assert_called_once() + + def test_dualbridge_run_success_enters_main_loop(self, tmp_path: Path, monkeypatch): + """run() enters main loop when InitGate passes.""" + monkeypatch.setenv('CODEX_TMUX_SESSION', '%3') + + from provider_backends.codex.bridge_runtime.service import DualBridge + + # Fake gate that passes immediately + fake_gate = MagicMock() + fake_gate.wait_until_ready.return_value = True + + # Fake tracker + fake_tracker = MagicMock() + fake_tracker.start = MagicMock() + + with patch( + 'provider_backends.codex.bridge_runtime.service.build_bridge_runtime_state' + ) as mock_build: + mock_state = MagicMock() + mock_state.init_gate = fake_gate + mock_state.fifo_holder_fd = None + mock_state.paths.input_fifo = tmp_path / 'fifo' + mock_state.binding_tracker = fake_tracker + mock_build.return_value = mock_state + + # Mock _read_request to exit loop immediately. _running is an + # instance attribute, so we cannot patch it at class level — set + # it on the bridge instance after construction. + with patch.object(DualBridge, '_read_request', return_value=None): + bridge = DualBridge(tmp_path) + bridge._running = False # makes main loop exit immediately + exit_code = bridge.run() + + assert exit_code == 0 + fake_tracker.start.assert_called_once() diff --git a/test/test_codex_init_probe.py b/test/test_codex_init_probe.py new file mode 100644 index 00000000..42cf2c0f --- /dev/null +++ b/test/test_codex_init_probe.py @@ -0,0 +1,100 @@ +"""Unit tests for CodexInitProbe (Q3 Stage 1a). + +Tests provider-specific TUI ready detection for Codex CLI. +""" +from __future__ import annotations + +import pytest +from typing import Callable + +from provider_backends.codex.bridge_runtime.init_probe import ( + CODEX_INIT_BANNERS, + CodexInitProbe, +) + + +class TestBannerDetection: + """Test S1: welcome banner detection.""" + + def test_banner_present_returns_false(self): + """Capture with "OpenAI Codex" → detect() returns False.""" + def tmux_run(args: list[str]) -> str: + return "OpenAI Codex v0.124.x\nSign in with ChatGPT\n" + + probe = CodexInitProbe(pane_id="%3", tmux_run_fn=tmux_run) + assert probe.detect() is False + + def test_banner_gone_prompt_ready_returns_true(self): + """Capture with only idle prompt on last line → detect() returns True.""" + def tmux_run(args: list[str]) -> str: + return "\n› Improve documentation in @filename\n" + + probe = CodexInitProbe(pane_id="%3", tmux_run_fn=tmux_run) + assert probe.detect() is True + + +class TestPromptPosition: + """Test S2: prompt position detection.""" + + def test_prompt_not_on_last_line_returns_false(self): + """› appears but followed by other non-empty lines → False.""" + def tmux_run(args: list[str]) -> str: + return "› Some input here\nChoose a model\n[Y/n]\n" + + probe = CodexInitProbe(pane_id="%3", tmux_run_fn=tmux_run) + assert probe.detect() is False + + +class TestBannerVariants: + """Test all CODEX_INIT_BANNERS are detected.""" + + def test_all_banner_variants_detected(self): + """Each banner string causes detect() to return False.""" + for banner in CODEX_INIT_BANNERS: + + def make_tmux_run(b: str): + def tmux_run(args: list[str]) -> str: + return f"Some line\n{b}\n› prompt\n" + return tmux_run + + probe = CodexInitProbe(pane_id="%3", tmux_run_fn=make_tmux_run(banner)) + result = probe.detect() + assert result is False, f"Banner '{banner}' should cause False, got {result}" + + +class TestCaptureBehavior: + """Test capture-pane command structure.""" + + def test_capture_uses_visible_only(self): + """Mock tmux_run_fn verifies args are ['capture-pane', '-p', '-t', pane_id] (no -S).""" + captured_args: list[str] = [] + + def tmux_run(args: list[str]) -> str: + captured_args.extend(args) + return "› Ready\n" + + probe = CodexInitProbe(pane_id="%5", tmux_run_fn=tmux_run) + probe.detect() + + assert captured_args == ["capture-pane", "-p", "-t", "%5"] + assert "-S" not in captured_args # No scrollback flag + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def test_empty_capture_returns_false(self): + """Tmux returns empty string → detect() returns False (conservative).""" + def tmux_run(args: list[str]) -> str: + return "" + + probe = CodexInitProbe(pane_id="%3", tmux_run_fn=tmux_run) + assert probe.detect() is False + + def test_tmux_run_exception_returns_false(self): + """Tmux raises RuntimeError → detect() returns False, no exception propagated.""" + def tmux_run(args: list[str]) -> str: + raise RuntimeError("tmux failed") + + probe = CodexInitProbe(pane_id="%3", tmux_run_fn=tmux_run) + assert probe.detect() is False # Conservative failure, no raise diff --git a/test/test_init_gate.py b/test/test_init_gate.py new file mode 100644 index 00000000..8d69c0da --- /dev/null +++ b/test/test_init_gate.py @@ -0,0 +1,396 @@ +"""Unit tests for InitGate state machine (Q3 Stage 1a).""" +from __future__ import annotations + +import json +import os +import time +from pathlib import Path +from typing import Callable +from unittest.mock import MagicMock + +import pytest + +from provider_core.init_gate import InitGate, InitGateProbe, InitGateState, load_init_gate_env + + +class HappyProbe(InitGateProbe): + """Always returns True.""" + + def detect(self) -> bool: + return True + + +class NeverProbe(InitGateProbe): + """Always returns False.""" + + def detect(self) -> bool: + return False + + +class AlternatingProbe(InitGateProbe): + """Alternates True/False each call.""" + + def __init__(self) -> None: + self._call_count = 0 + + def detect(self) -> bool: + self._call_count += 1 + return self._call_count % 2 == 0 + + +class TimedSequenceProbe(InitGateProbe): + """Returns True after N calls.""" + + def __init__(self, true_after: int) -> None: + self._call_count = 0 + self._true_after = true_after + + def detect(self) -> bool: + self._call_count += 1 + return self._call_count >= self._true_after + + +class MockClock: + """Controlled monotonic clock for deterministic testing.""" + + def __init__(self, start: float = 0.0) -> None: + self._now = start + + def __call__(self) -> float: + return self._now + + def advance(self, delta: float) -> None: + self._now += delta + + +@pytest.fixture +def tmp_runtime_dir(tmp_path: Path) -> Path: + """Provide a temporary runtime directory.""" + return tmp_path / "runtime" + + +@pytest.fixture +def mock_capture() -> Callable[[], str]: + """Simple mock capture function.""" + return lambda: "mock pane capture" + + +@pytest.fixture +def mock_log() -> list[str]: + """Capture log messages.""" + messages: list[str] = [] + return messages + + +def make_gate( + probe: InitGateProbe, + tmp_runtime_dir: Path, + mock_capture: Callable[[], str], + mock_log: list[str], + *, + clock: MockClock | None = None, + sleep_calls: list[float] | None = None, + deadline_s: float = 1.0, + poll_fast_ms: int = 100, + poll_slow_ms: int = 200, + poll_switch_s: float = 0.5, + steady_count: int = 2, + bypass: bool = False, +) -> InitGate: + """Factory for creating InitGate with mock dependencies.""" + clock = clock or MockClock() + sleep_calls = sleep_calls if sleep_calls is not None else [] + + def mock_sleep(duration: float) -> None: + sleep_calls.append(duration) + clock.advance(duration) + + def mock_log_fn(msg: str) -> None: + mock_log.append(msg) + + return InitGate( + probe=probe, + provider="test", + runtime_dir=tmp_runtime_dir, + capture_fn=mock_capture, + deadline_s=deadline_s, + poll_fast_ms=poll_fast_ms, + poll_slow_ms=poll_slow_ms, + poll_switch_s=poll_switch_s, + steady_count=steady_count, + bypass=bypass, + clock=clock, + sleep_fn=mock_sleep, + log_fn=mock_log_fn, + ) + + +class TestHappyPath: + """Test LAUNCHED → READY happy path.""" + + def test_happy_path_ready(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """Probe always True → steady_count=2 → returns True after ~2 polls.""" + probe = HappyProbe() + clock = MockClock() + sleep_calls: list[float] = [] + + gate = make_gate( + probe, tmp_runtime_dir, mock_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=10.0, poll_fast_ms=100, steady_count=2 + ) + + result = gate.wait_until_ready() + + assert result is True + assert gate._state == InitGateState.READY + # Should have ~2 polls (first detect + 1 more to reach steady_count) + # With steady_count=2 and all True, we need 2 consecutive True + assert len(sleep_calls) >= 1 # At least one sleep + # Verify startup log + assert any("waiting for test TUI" in msg for msg in mock_log) + + def test_steady_count_requires_consecutive(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """Non-consecutive True detections should not commit until steady_count reached.""" + probe = TimedSequenceProbe(true_after=3) + clock = MockClock() + sleep_calls: list[float] = [] + + gate = make_gate( + probe, tmp_runtime_dir, mock_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=10.0, poll_fast_ms=50, steady_count=2 + ) + + result = gate.wait_until_ready() + + assert result is True + # Should have taken at least 3 polls (False, False, True, True) + assert probe._call_count >= 3 + + +class TestDeadlineFailure: + """Test LAUNCHED → INIT_FAIL on deadline.""" + + def test_deadline_exceeded(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """Probe always False, deadline 0.3s → returns False with correct reason and failure.json.""" + probe = NeverProbe() + clock = MockClock() + sleep_calls: list[float] = [] + + gate = make_gate( + probe, tmp_runtime_dir, mock_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=0.3, poll_fast_ms=50, steady_count=2 + ) + + result = gate.wait_until_ready() + + assert result is False + assert gate.last_reason == "deadline_exceeded" + + # Verify failure.json + failure_path = tmp_runtime_dir / "init_gate_failure.json" + assert failure_path.exists() + + with open(failure_path) as f: + data = json.load(f) + + assert data["provider"] == "test" + assert data["reason"] == "deadline_exceeded" + assert data["deadline_s"] == 0.3 + assert data["init_gate_bypass"] is False + assert "recent_pane_captures" in data + assert "probes_attempted" in data + assert "timestamp" in data + + +class TestSteadyState: + """Test steady-state debounce behavior.""" + + def test_steady_state_debounce(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """Sequence T→F→T→T: only commits on trailing T-T pair, not first T.""" + # Create probe with sequence: True, False, True, True, ... + class SequenceProbe(InitGateProbe): + def __init__(self): + self._results = [True, False, True, True] + self._idx = 0 + + def detect(self) -> bool: + result = self._results[self._idx] if self._idx < len(self._results) else True + self._idx += 1 + return result + + probe = SequenceProbe() + clock = MockClock() + sleep_calls: list[float] = [] + + gate = make_gate( + probe, tmp_runtime_dir, mock_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=10.0, poll_fast_ms=50, steady_count=2 + ) + + result = gate.wait_until_ready() + + assert result is True + # Should have taken exactly 4 polls: + # 1: T (consecutive=1, not enough) + # 2: F (consecutive=0, reset) + # 3: T (consecutive=1, not enough) + # 4: T (consecutive=2, READY!) + assert probe._idx == 4 + + +class TestBypass: + """Test bypass mode.""" + + def test_bypass_returns_immediately(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """bypass=True → returns True without calling probe.detect even once.""" + probe = MagicMock(spec=InitGateProbe) + probe.detect.return_value = False + + gate = make_gate( + probe, tmp_runtime_dir, mock_capture, mock_log, + bypass=True, deadline_s=10.0 + ) + + result = gate.wait_until_ready() + + assert result is True + probe.detect.assert_not_called() + # Verify warning log + assert any("BYPASS" in msg for msg in mock_log) + + +class TestSegmentedPolling: + """Test segmented polling (fast→slow).""" + + def test_segmented_polling(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """First polls are ~fast_ms, later polls ~slow_ms after switch_s threshold.""" + probe = NeverProbe() # Never returns True to keep polling + clock = MockClock() + sleep_calls: list[float] = [] + + gate = make_gate( + probe, tmp_runtime_dir, mock_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=2.0, poll_fast_ms=50, poll_slow_ms=200, poll_switch_s=0.5 + ) + + try: + gate.wait_until_ready() + except AssertionError: + pass # Expected to fail due to deadline + + # First few sleeps should be ~0.05s (fast) + fast_sleeps = [s for s in sleep_calls if s < 0.1] + # After 0.5s elapsed, should switch to ~0.2s (slow) + slow_sleeps = [s for s in sleep_calls if s >= 0.15] + + # Should have both fast and slow periods + assert len(fast_sleeps) >= 3 + assert len(slow_sleeps) >= 1 or len(sleep_calls) < 10 # May not reach slow if deadline short + + +class TestFailureJsonStructure: + """Test failure.json structure.""" + + def test_failure_json_structure(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """On deadline fail, json has all required fields.""" + probe = NeverProbe() + clock = MockClock() + sleep_calls: list[float] = [] + + # Track capture calls + capture_calls: list[str] = [] + def tracking_capture() -> str: + capture_calls.append(f"capture at t={clock._now}") + return f"mock capture {len(capture_calls)}" + + gate = make_gate( + probe, tmp_runtime_dir, tracking_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=0.3, poll_fast_ms=50 + ) + + gate.wait_until_ready() + + failure_path = tmp_runtime_dir / "init_gate_failure.json" + with open(failure_path) as f: + data = json.load(f) + + # Verify all required fields + assert data["provider"] == "test" + assert data["reason"] == "deadline_exceeded" + assert isinstance(data["deadline_s"], (int, float)) + assert isinstance(data["elapsed_s"], (int, float)) + assert isinstance(data["init_gate_bypass"], bool) + assert isinstance(data["recent_pane_captures"], list) + assert isinstance(data["probes_attempted"], list) + assert "timestamp" in data + + def test_recent_pane_captures_ring(self, tmp_runtime_dir: Path, mock_capture, mock_log): + """>=4 polls → failure.json has exactly 3 items (last 3).""" + probe = NeverProbe() + clock = MockClock() + sleep_calls: list[float] = [] + capture_counter = [0] + + def counting_capture() -> str: + capture_counter[0] += 1 + return f"capture-{capture_counter[0]}" + + gate = make_gate( + probe, tmp_runtime_dir, counting_capture, mock_log, + clock=clock, sleep_calls=sleep_calls, + deadline_s=0.5, poll_fast_ms=50 + ) + + gate.wait_until_ready() + + failure_path = tmp_runtime_dir / "init_gate_failure.json" + with open(failure_path) as f: + data = json.load(f) + + # Should have exactly 3 captures + captures = data["recent_pane_captures"] + assert len(captures) == 3 + # Should be last 3 captures + assert captures[0]["capture"] == "capture-1" or "capture-2" + assert captures[-1]["capture"].startswith("capture-") + + +class TestEnvVarLoading: + """Test load_init_gate_env function.""" + + def test_env_var_loading(self, monkeypatch): + """monkeypatch env → load_init_gate_env reflects values.""" + monkeypatch.setenv("CCB_INIT_GATE_DEADLINE_S", "60") + monkeypatch.setenv("CCB_INIT_GATE_POLL_FAST_MS", "300") + monkeypatch.setenv("CCB_CODEX_INIT_DEADLINE_S", "45") # per-provider override + + kwargs = load_init_gate_env("codex") + + assert kwargs["deadline_s"] == 45.0 # per-provider wins + assert kwargs["poll_fast_ms"] == 300 # generic used + + def test_env_fallback_to_generic(self, monkeypatch): + """Unset per-provider override falls back to generic.""" + monkeypatch.setenv("CCB_INIT_GATE_DEADLINE_S", "60") + monkeypatch.delenv("CCB_CODEX_INIT_DEADLINE_S", raising=False) + + kwargs = load_init_gate_env("codex") + + assert kwargs["deadline_s"] == 60.0 # generic used + + def test_env_bypass_parsing(self, monkeypatch): + """Test BYPASS env var parsing.""" + for val in ["1", "true", "yes", "on"]: + monkeypatch.setenv("CCB_INIT_GATE_BYPASS", val) + kwargs = load_init_gate_env("codex") + assert kwargs["bypass"] is True, f"failed for {val}" + + monkeypatch.setenv("CCB_INIT_GATE_BYPASS", "0") + kwargs = load_init_gate_env("codex") + assert kwargs["bypass"] is False