Skip to content

Commit 1c9f96f

Browse files
authored
refactor(acp): Upgrade ACP backend to SACP v11 (#439)
## Summary - upgrade the ACP backend from `sacp` v10 to `sacp` v11.0.0 and align the workspace schema dependency on `agent-client-protocol-schema` v0.11.6 - keep SACP-specific transport details inside `connection/` by switching the backend to a single ordered `ConnectionEvent` inbox and moving non-transport code to schema-crate types - refresh ACP and protocol docs to describe the v11 transport boundary, ordered inbox flow, and local mock-agent test requirement ## Test Plan - [x] `just fmt` - [x] `cargo build -p mock-acp-agent` - [x] `cargo test -p nori-protocol` - [x] `cargo test -p codex-acp` - [x] `cargo test` - [x] `cargo build --bin nori` - [x] `cargo test -p tui-pty-e2e` - [x] tmux smoke test: launch `nori` with `elizacp`, submit `hello`, and confirm the prompt returns - [x] `just fix -p codex-acp -p nori-protocol`
1 parent 0ac5303 commit 1c9f96f

21 files changed

Lines changed: 623 additions & 407 deletions

File tree

codex-rs/Cargo.lock

Lines changed: 85 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ assert_matches = "1.5.0"
8888
async-channel = "2.3.1"
8989
async-stream = "0.3.6"
9090
async-trait = "0.1.89"
91+
agent-client-protocol-schema = "0.11.6"
9192
axum = { version = "0.8", default-features = false }
9293
base64 = "0.22.1"
9394
bytes = "1.10.1"
@@ -152,7 +153,7 @@ regex = "1.12.2"
152153
regex-lite = "0.1.7"
153154
reqwest = "0.12"
154155
rmcp = { version = "0.12.0", default-features = false }
155-
sacp = "10.1.0"
156+
sacp = "11.0.0"
156157
schemars = "0.8.22"
157158
seccompiler = "0.5.0"
158159
sentry = "0.34.0"

codex-rs/acp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ unstable = []
1212
workspace = true
1313

1414
[dependencies]
15-
agent-client-protocol-schema = { version = "0.10.8", features = ["unstable"] }
15+
agent-client-protocol-schema = { workspace = true, features = ["unstable"] }
1616
anyhow = { workspace = true }
1717
sacp = { workspace = true }
1818
base64 = { workspace = true }

codex-rs/acp/docs.md

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ The ACP crate serves as a bridge between:
2727

2828
Key files:
2929
- `registry.rs` - Agent configuration and npm package detection
30-
- `connection/` - SACP v10-based subprocess spawning and JSON-RPC communication
30+
- `connection/` - SACP v11-based subprocess spawning and JSON-RPC communication
3131
- `translator.rs` - User input to ACP `ContentBlock` conversion and related parsing helpers
3232
- `backend/mod.rs` - Implements `ConversationClient` trait from codex-core and emits normalized ACP session events
3333
- `transcript_discovery.rs` - Discovers transcript files for external agents
@@ -532,10 +532,10 @@ Footer renders "Tokens: 45K in / 78K out (32K cached)"
532532
```
533533
**Connection Management** (`connection/`):
534534

535-
The ACP connection layer uses SACP v10 (`sacp` crate) to communicate with agent subprocesses over stdin/stdout JSON-RPC. The central type is `SacpConnection` (in `connection/sacp_connection.rs`), which is `Send + Sync` and runs directly on the main tokio runtime without a dedicated worker thread.
535+
The ACP connection layer uses SACP v11 (`sacp` crate) to communicate with agent subprocesses over stdin/stdout JSON-RPC. The central type is `SacpConnection` (in `connection/sacp_connection.rs`), which is `Send + Sync` and runs directly on the main tokio runtime without a dedicated worker thread.
536536

537537
```
538-
┌─────────────────────────┐ SACP v10 (JSON-RPC) ┌─────────────────────────┐
538+
┌─────────────────────────┐ SACP v11 (JSON-RPC) ┌─────────────────────────┐
539539
│ Main Tokio Runtime │◄────────────────────────►│ ACP Agent Subprocess │
540540
│ │ stdin/stdout │ (spawned child process)│
541541
│ SacpConnection │ │ │
@@ -548,17 +548,17 @@ The ACP connection layer uses SACP v10 (`sacp` crate) to communicate with agent
548548
└─────────────────────────┘ └─────────────────────────┘
549549
```
550550

551-
**Builder-based handler registration:** `SacpConnection::spawn()` uses `ClientToAgent::builder()` with chained `.on_receive_request()` calls to register handlers for `RequestPermissionRequest` (approval flow), `WriteTextFileRequest` (workspace-bounded file writes), and `ReadTextFileRequest` (unrestricted file reads), plus `.on_receive_notification()` for `SessionNotification`. All handlers are registered before `run_until()` is called.
551+
**Builder-based handler registration:** `SacpConnection::spawn()` uses `Client.builder()` with chained `.on_receive_request()` calls to register handlers for `RequestPermissionRequest` (approval flow), `WriteTextFileRequest` (workspace-bounded file writes), and `ReadTextFileRequest` (unrestricted file reads), plus `.on_receive_notification()` for `SessionNotification`. All handlers are registered before `connect_with()` is called.
552552

553-
**Connection initialization:** Inside `run_until()`, the connection sends `InitializeRequest` to the agent, validates the protocol version (minimum V1), and clones the `JrConnectionCx` out of the closure via a oneshot channel. The background task then awaits `futures::future::pending()` to keep the connection alive until the task is aborted on drop.
553+
**Connection initialization:** Inside `connect_with()`, the connection sends `InitializeRequest` to the agent, validates the protocol version (minimum V1), and clones the `ConnectionTo<Agent>` plus agent capabilities out of the callback via a oneshot channel. The background task then awaits `futures::future::pending()` to keep the connection alive until the task is aborted on drop.
554554

555-
**Dynamic update channel routing:** Session notifications from the agent are routed via `Arc<Mutex<Option<Sender<SessionUpdate>>>>`. During a prompt, the sender is set to the caller's `update_tx`; between turns, it is `None` and notifications fall through to the persistent channel. This swap happens in `prompt()` and `load_session()`.
555+
**Ordered transport inbox:** Session notifications, permission requests, and synthetic file-operation updates are all forwarded into one ordered `ConnectionEvent` stream. The backend consumes that single inbox and feeds it through the serialized reducer/runtime path, which avoids ordering ambiguity between notification and approval channels.
556556

557-
**Approval flow:** The `RequestPermissionRequest` handler translates the request to a Codex `ApprovalRequest`, sends it through the `approval_tx` channel, and spawns a concurrent task via `cx.spawn()` to wait for the user's response. The spawn avoids blocking the SACP dispatch loop while the UI collects user input.
557+
**Approval flow:** The `RequestPermissionRequest` handler translates the request to a Codex `ApprovalRequest`, sends it through the ordered inbox, and uses the SACP responder plus `ConnectionTo<Agent>` to send the eventual review decision back without blocking the dispatch loop while the UI collects user input.
558558

559559
**MCP Server Forwarding** (`connection/mcp.rs`):
560560

561-
CLI-configured MCP servers (from `config.toml`) are converted to SACP protocol types and passed to the agent via `NewSessionRequest.mcp_servers` at session creation time. The `to_sacp_mcp_servers()` function in `connection/mcp.rs` bridges `codex_core::config::types::McpServerConfig` to `sacp::schema::McpServer`:
561+
CLI-configured MCP servers (from `config.toml`) are converted to ACP schema types and passed to the agent via `NewSessionRequest.mcp_servers` at session creation time. The `to_sacp_mcp_servers()` function in `connection/mcp.rs` bridges `codex_core::config::types::McpServerConfig` to ACP `McpServer` values inside the transport adapter:
562562

563563
| Transport | SACP Type | Key Fields |
564564
|-----------|-----------|------------|
@@ -736,7 +736,7 @@ Multi-layer cleanup strategy for robust process termination:
736736
1. **Process Group Isolation (Unix)**: Agent spawns in own process group via `setpgid(0, 0)`. Enables killing entire process tree with `killpg()`.
737737
2. **Kernel-Level Parent Death Signal (Linux)**: `PR_SET_PDEATHSIG` set to `SIGTERM`. Guarantees agent receives signal if parent crashes.
738738
3. **Process Group Kill**: On drop, `SIGKILL` is sent to the entire process group via `kill_child_process_group()`, ensuring grandchildren are terminated.
739-
4. **Async Drop**: `SacpConnection::drop()` aborts the connection and stderr tasks, then kills the child process. No blocking wait is required because SACP v10's `ClientToAgent` is `Send + Sync` and runs as a regular tokio task.
739+
4. **Async Drop**: `SacpConnection::drop()` aborts the connection and stderr tasks, then kills the child process. No blocking wait is required because SACP v11's `ConnectionTo<Agent>` is `Send + Sync` and runs as a regular tokio task.
740740

741741
**Environment Isolation** (`sacp_connection.rs`):
742742

@@ -795,32 +795,14 @@ That means update-only provider flows, including Gemini-style shell calls that s
795795

796796
Out-of-phase request-owned updates are treated the same way: the reducer still emits a warning when no request is active, but it forwards the raw ACP update to the normalizer so the user sees both the malformed session state and the underlying tool snapshot.
797797

798-
**Prompt Update Channel Lifecycle** (`sacp_connection.rs`, `user_input.rs`, `submit_and_ops.rs`):
798+
**Transport Event Flow** (`connection/sacp_connection.rs`, `backend/spawn_and_relay.rs`, `backend/session.rs`):
799799

800-
Each prompt/load_session call gets a dedicated `mpsc` channel (`update_tx`/`update_rx`) for receiving `SessionUpdate` notifications from the ACP agent. The connection layer routes notifications through a shared `active_update_tx` slot (an `Arc<Mutex<Option<(u64, Sender)>>>`) that pairs the sender with a monotonic generation counter. The routing logic in the notification handler uses `try_send` with fallthrough: if the per-prompt channel fails (receiver dropped, or channel full), the notification falls through to the `persistent_tx` channel instead of being silently dropped.
800+
The connection layer now exposes exactly one ordered `mpsc::Receiver<ConnectionEvent>`. `SessionNotification` updates, permission requests, and synthetic file-operation updates all flow through that inbox in source order. The backend takes ownership of the receiver once, then either:
801801

802-
The critical invariant is that `prompt()` does **not** clear `active_update_tx` when it returns. This is because `block_task()` (the SACP request/response mechanism) can return before all `SessionNotification` events have been delivered. Instead, callers use a `done_tx`/`done_rx` oneshot to signal the `update_handler` task:
802+
- hands it to `run_connection_event_relay()` for live sessions, where it is merged with reducer prompt results and fed into the serialized runtime, or
803+
- temporarily hands it to the `session/load` collector during resume, buffering replay `ClientEvent`s before returning the receiver to the live backend.
803804

804-
```
805-
prompt() returns
806-
|
807-
v
808-
done_tx.send(()) -- signals update_handler that prompt is done
809-
|
810-
v
811-
update_handler enters drain mode:
812-
tokio::select! switches from waiting on (update_rx OR done_rx)
813-
to waiting on update_rx with a 500ms timeout
814-
|
815-
v
816-
After timeout or channel close, update_handler exits
817-
(dropping update_rx, which causes future try_send to fail)
818-
|
819-
v
820-
Next prompt() overwrites active_update_tx slot with a fresh sender
821-
```
822-
823-
The generation counter on `active_update_tx` prevents stale cleanup: `close_update_channel(generation)` only clears the slot if the generation matches, so it is safe for `load_session` (which is sequential) to clear its own channel without risking a concurrent prompt's channel. `prompt()` callers do not call `close_update_channel` at all — they rely on the done/drain pattern instead.
805+
This keeps the SACP-specific routing logic inside `connection/` and removes the old split between notification and approval channels.
824806

825807
**Turn Interrupt Wiring — Reducer-Owned ACP Phase** (`session_reducer.rs`, `session_runtime_driver.rs`, `submit_and_ops.rs`):
826808

@@ -909,7 +891,7 @@ SessionConfigured event sent to TUI
909891
Deferred replay relay spawned (sends buffered events to backend_event_tx)
910892
```
911893

912-
**Server-side path:** A collect task runs concurrently during `load_session()`, receiving `SessionUpdate` notifications via an `mpsc` channel and buffering the normalized `ClientEvent` stream into a `Vec`. `SacpConnection::load_session()` installs the `update_tx` channel into the shared `active_update_tx` slot before sending the request, ensuring history replay notifications are captured. On `#[cfg(feature = "unstable")]` builds, model state is also extracted from the `LoadSessionResponse` if available. The buffered events are returned as `deferred_replay_events` and a relay task is spawned only *after* all setup events (`SessionConfigured`, `Warning`, etc.) have been sent to the outbound backend-event channel. This deferred-relay pattern prevents a deadlock: the outbound channel is bounded, and the TUI consumer only starts after `resume_session()` returns, so sending replay events before setup events would fill the channel and block `resume_session()` from making progress. If `load_session()` fails at runtime (e.g., the agent advertises the capability but the call itself errors), the collect task is aborted and the method falls back to a fresh session. A `WarningEvent` is emitted to inform the user that the restored session will not have server-side replay.
894+
**Server-side path:** A collect task runs concurrently during `load_session()`, taking ownership of the ordered `ConnectionEvent` receiver and buffering the normalized `ClientEvent` stream into a `Vec`. `SacpConnection::load_session()` reuses that same ordered inbox for the agent's replay notifications, so the collector can observe session updates in source order without a special side channel. On `#[cfg(feature = "unstable")]` builds, model state is also extracted from the `LoadSessionResponse` if available. The buffered events are returned as `deferred_replay_events` and a relay task is spawned only *after* all setup events (`SessionConfigured`, `Warning`, etc.) have been sent to the outbound backend-event channel. This deferred-relay pattern prevents a deadlock: the outbound channel is bounded, and the TUI consumer only starts after `resume_session()` returns, so sending replay events before setup events would fill the channel and block `resume_session()` from making progress. If `load_session()` fails at runtime (e.g., the agent advertises the capability but the call itself errors), the collect task is aborted and the method falls back to a fresh session. A `WarningEvent` is emitted to inform the user that the restored session will not have server-side replay.
913895

914896
**Client-side path:** When the agent does not support `session/load` (e.g., Claude Code's ACP adapter returns `method_not_found`), or when the server-side `load_session()` call fails at runtime, a fresh session is created via `session/new`. The previous conversation is replayed through normalized `ClientEvent::ReplayEntry` items derived from the transcript rather than through `SessionConfigured.initial_messages`. The transcript summary path remains available for context management and `/compact`-style behavior. A `TRANSCRIPT_SUMMARY_WARN_CHARS` threshold (200K chars) logs a warning when summaries are very large; the actual safety net is the agent-side "prompt too long" rejection, which the caller handles gracefully.
915897

0 commit comments

Comments
 (0)