Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions lib/provider_backends/codex/bridge_runtime/init_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Codex TUI ready detection probe (Q3 Stage 1a).

Implements provider-specific InitGateProbe for Codex CLI cold-start
detection — checks for banner dissipation and input prompt readiness.
"""
from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from provider_core.init_gate import InitGateProbe


# Banner/welcome screen strings that indicate TUI is NOT yet ready.
# Captured from Codex v0.124.x cold-start welcome screen.
# If Codex CLI upgrades change these, patch this constant.
CODEX_INIT_BANNERS: tuple[str, ...] = (
"OpenAI Codex",
"Sign in with ChatGPT",
"Trust this workspace",
"Choose a model",
"✦ Welcome to",
)


class CodexInitProbe:
"""Probe Codex TUI for 'input box ready' state.

Implements InitGateProbe protocol for use with InitGate.

Ready criteria (AND — all must be true):
S1: Welcome banner strings NOT present in visible screen
S2: Last non-empty line starts with "› " (idle input prompt)

Uses visible-screen-only capture (no scrollback) to avoid
false negatives from historical banner in scrollback.
"""

def __init__(
self,
*,
pane_id: str,
tmux_run_fn: Callable[[list[str]], str],
) -> None:
"""Initialize probe.

Args:
pane_id: Tmux pane identifier (e.g., "%3")
tmux_run_fn: Callable that runs tmux command and returns stdout.
Expected signature: (args: list[str]) -> str
"""
self._pane_id = pane_id
self._tmux_run = tmux_run_fn

def detect(self) -> bool:
"""Return True if Codex TUI is ready for input.

Conservative: any error or ambiguity returns False.
"""
try:
capture = self._capture_visible()
except Exception:
# tmux failure or capture error — conservative fail
return False

return self._banner_gone(capture) and self._prompt_on_last_line(capture)

def _capture_visible(self) -> str:
"""Capture visible screen (no scrollback) from pane.

Uses `capture-pane -p -t <pane>` without `-S` parameter,
so only currently visible lines are returned.
"""
# Note: no -S flag — we want visible screen only, not scrollback
args = ["capture-pane", "-p", "-t", self._pane_id]
return self._tmux_run(args)

def _banner_gone(self, capture: str) -> bool:
"""S1: Check that welcome banner strings are NOT present."""
capture_lower = capture.lower()
for banner in CODEX_INIT_BANNERS:
if banner.lower() in capture_lower:
return False
return True

def _prompt_on_last_line(self, capture: str) -> bool:
"""S2: Check that last non-empty line is idle input prompt.

Codex idle prompt: line starting with "› " (caret + space).
Tolerates placeholder hint after the caret (e.g.,
"› Improve documentation in @filename") — this is normal idle.
"""
# Filter to non-empty lines only
lines = [ln for ln in capture.splitlines() if ln.strip()]
if not lines:
return False

last = lines[-1]
# Strip leading whitespace (pane may have indent), check prefix
return last.lstrip().startswith("› ")

def capture_visible_for_diagnostics(self) -> str:
"""Expose visible capture for InitGate diagnostics.

Returns empty string on any error (conservative).
"""
try:
return self._capture_visible()
except Exception:
return ""
79 changes: 76 additions & 3 deletions lib/provider_backends/codex/bridge_runtime/runtime_state.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
"""Codex bridge runtime state (Q3 Stage 1a — Init Gate integrated)."""
from __future__ import annotations

from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING

from provider_backends.codex.runtime_artifacts import ensure_runtime_artifact_layout

from .binding import CodexBindingTracker
from .session import TerminalCodexSession

if TYPE_CHECKING:
from provider_core.init_gate import InitGate


@dataclass(frozen=True)
class BridgePaths:
"""Paths for bridge runtime."""

runtime_dir: Path
input_fifo: Path
completion_dir: Path
Expand All @@ -19,14 +26,39 @@ class BridgePaths:
bridge_log: Path


@dataclass(frozen=True)
@dataclass
class BridgeRuntimeState:
"""Mutable runtime state for the bridge process.

Contains initialization gate and FIFO holder for lifecycle management.
"""

paths: BridgePaths
binding_tracker: CodexBindingTracker
codex_session: TerminalCodexSession
init_gate: InitGate = field(default=None) # type: ignore[assignment]
fifo_holder_fd: int | None = None


def build_bridge_runtime_state(
runtime_dir: Path,
*,
pane_id: str,
tmux_backend,
) -> BridgeRuntimeState:
"""Build runtime state with Init Gate constructed.

Args:
runtime_dir: Directory for bridge runtime files.
pane_id: Tmux pane identifier.
tmux_backend: Backend for tmux operations.

Returns:
BridgeRuntimeState with InitGate constructed.
"""
from provider_core.init_gate import InitGate, load_init_gate_env
from provider_backends.codex.bridge_runtime.init_probe import CodexInitProbe

def build_bridge_runtime_state(runtime_dir: Path, *, pane_id: str) -> BridgeRuntimeState:
artifacts = ensure_runtime_artifact_layout(runtime_dir)
paths = BridgePaths(
runtime_dir=artifacts.runtime_dir,
Expand All @@ -36,10 +68,51 @@ def build_bridge_runtime_state(runtime_dir: Path, *, pane_id: str) -> BridgeRunt
history_file=artifacts.history_file,
bridge_log=artifacts.bridge_log,
)

# Adapter: TmuxBackend._tmux_run returns subprocess.CompletedProcess,
# but CodexInitProbe expects tmux_run_fn(args) -> str. Extract stdout
# (decode bytes if needed) and return "" on any failure (conservative;
# probe.detect already treats "" as not-ready).
def _tmux_run_str(args: list[str]) -> str:
result = tmux_backend._tmux_run(args, capture=True, timeout=2.0, check=False)
if result is None:
return ""
out = getattr(result, "stdout", "")
if isinstance(out, bytes):
try:
return out.decode("utf-8", errors="replace")
except Exception:
return ""
return out if isinstance(out, str) else ""

# Construct Init Gate with Codex probe
probe = CodexInitProbe(
pane_id=pane_id,
tmux_run_fn=_tmux_run_str,
)

def capture_fn() -> str:
"""Capture visible pane for InitGate diagnostics."""
try:
return probe.capture_visible_for_diagnostics()
except Exception:
return ""

gate_kwargs = load_init_gate_env("codex")
init_gate = InitGate(
probe=probe,
provider="codex",
runtime_dir=runtime_dir,
capture_fn=capture_fn,
log_fn=lambda msg: print(msg, flush=True),
**gate_kwargs,
)

return BridgeRuntimeState(
paths=paths,
binding_tracker=CodexBindingTracker(runtime_dir),
codex_session=TerminalCodexSession(pane_id),
init_gate=init_gate,
)


Expand Down
76 changes: 70 additions & 6 deletions lib/provider_backends/codex/bridge_runtime/service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Claude ↔ Codex bridge main process (Q3 Stage 1a — Init Gate integrated)."""
from __future__ import annotations

import os
Expand All @@ -12,15 +13,38 @@


class DualBridge:
"""Claude ↔ Codex bridge main process"""
"""Claude ↔ Codex bridge main process."""

def __init__(self, runtime_dir: Path):
pane_id = os.environ.get('CODEX_TMUX_SESSION')
if not pane_id:
raise RuntimeError('Missing CODEX_TMUX_SESSION environment variable')

self._runtime = build_bridge_runtime_state(runtime_dir, pane_id=pane_id)
# Import here to avoid circular imports
from terminal_runtime.tmux_backend import TmuxBackend

# Create tmux backend for init probe capture (no-arg constructor;
# pane id is bound via tmux_run_fn closure inside build_bridge_runtime_state)
tmux_backend = TmuxBackend()

self._runtime = build_bridge_runtime_state(
runtime_dir,
pane_id=pane_id,
tmux_backend=tmux_backend,
)
self._running = True

# Q3-S1a.3: Open FIFO holder before Init Gate starts
# This allows upstream writer to proceed without blocking on open(O_WRONLY)
try:
self._runtime.fifo_holder_fd = os.open(
str(self._runtime.paths.input_fifo),
os.O_RDONLY | os.O_NONBLOCK,
)
except FileNotFoundError:
self._log_console("input.fifo not found at init; proceeding without holder")
self._runtime.fifo_holder_fd = None

signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)

Expand Down Expand Up @@ -53,17 +77,52 @@ def codex_session(self):
return self._runtime.codex_session

def _handle_signal(self, signum: int, _: Any) -> None:
"""Handle termination signals."""
self._running = False
self.binding_tracker.stop()
self._log_console(f'Received signal {signum}, exiting...')
self._teardown()

def _teardown(self) -> None:
"""Cleanup holder fd + stop binding tracker (idempotent)."""
# Close FIFO holder if open
fd = self._runtime.fifo_holder_fd
if fd is not None:
try:
os.close(fd)
except OSError as exc:
self._log_console(f"fifo holder close failed: {exc}")
self._runtime.fifo_holder_fd = None

# Stop binding tracker (may not have been started)
try:
self.binding_tracker.stop()
except Exception as exc:
self._log_console(f"binding tracker stop failed: {exc}")

def run(self) -> int:
"""Run the bridge main loop.

Returns:
0 on normal exit, 3 on INIT_FAIL.
"""
self._log_console('Codex bridge started, waiting for Claude commands...')

# Q3-S1a.3: Init Gate — wait for TUI to be ready before entering main loop
if not self._runtime.init_gate.wait_until_ready():
self._log_console(
f"[InitGate] INIT_FAIL: {self._runtime.init_gate.last_reason}"
)
self._teardown()
return 3

# Init Gate passed — enter normal main loop
self.binding_tracker.start()

idle_sleep = env_float('CCB_BRIDGE_IDLE_SLEEP', 0.05)
error_backoff_min = env_float('CCB_BRIDGE_ERROR_BACKOFF_MIN', 0.05)
error_backoff_max = env_float('CCB_BRIDGE_ERROR_BACKOFF_MAX', 0.2)
error_backoff = max(0.0, min(error_backoff_min, error_backoff_max))

try:
while self._running:
try:
Expand All @@ -73,7 +132,9 @@ def run(self) -> int:
time.sleep(idle_sleep)
continue
self._process_request(payload)
error_backoff = max(0.0, min(error_backoff_min, error_backoff_max))
error_backoff = max(
0.0, min(error_backoff_min, error_backoff_max)
)
except KeyboardInterrupt:
self._running = False
except Exception as exc:
Expand All @@ -82,9 +143,12 @@ def run(self) -> int:
if error_backoff:
time.sleep(error_backoff)
if error_backoff_max:
error_backoff = min(error_backoff_max, max(error_backoff_min, error_backoff * 2))
error_backoff = min(
error_backoff_max,
max(error_backoff_min, error_backoff * 2)
)
finally:
self.binding_tracker.stop()
self._teardown()

self._log_console('Codex bridge exited')
return 0
Expand Down
Loading