Skip to content

Commit 3a81ba2

Browse files
authored
feat(acp): Implement ACP client streaming for codex-core (#57)
## Summary 🤖 Generated with [Nori](https://www.npmjs.com/package/nori-ai) - Implement ACP client integration enabling CLI/TUI communication with ACP-compliant agent subprocesses via JSON-RPC 2.0 over stdin/stdout - Add thread-safe `AcpConnection` wrapper with dedicated worker thread for `!Send` `LocalBoxFuture` support - Create `translator` module for bidirectional type conversion between ACP and codex-protocol types - Integrate `stream_acp()` path in `Client.stream()` for `wire_api=Acp` providers - Update docs.md files with architecture diagrams and implementation details ## Test Plan - [x] All 10 codex-acp tests pass (8 unit + 1 integration + 1 doctest) - [x] TUI e2e prompt_flow tests pass with mock-acp-agent - [x] 430+ codex-core tests pass - [x] Clippy passes with `-D warnings` - [x] CI pipeline passes Share Nori with your team: https://www.npmjs.com/package/nori-ai
1 parent 4c0b334 commit 3a81ba2

16 files changed

Lines changed: 1394 additions & 35 deletions

codex-rs/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/acp/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ workspace = true
99
[dependencies]
1010
agent-client-protocol = "0.7"
1111
anyhow = { workspace = true }
12+
codex-protocol = { path = "../protocol" }
1213
async-trait = { workspace = true }
1314
futures = { workspace = true }
1415
serde = { workspace = true, features = ["derive"] }

codex-rs/acp/PLAN.md

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
# ACP Client Implementation Plan
2+
3+
## Overview
4+
5+
This plan describes the implementation of an ACP (Agent Client Protocol) client in the `codex-acp` crate. The client will enable the CLI/TUI to communicate with ACP-compliant agents via JSON-RPC over stdio.
6+
7+
## Design Decisions
8+
9+
Based on scoping discussions, the following decisions have been made:
10+
11+
1. **Connection lifecycle**: Spawn a fresh subprocess per session (simpler implementation)
12+
2. **Permission handling**: Bridge ACP permissions to existing codex approval system (consistent UX)
13+
3. **Tool output**: Pass through to TUI for rendering (avoid duplicating TUI logic)
14+
4. **Scope**: Core features (Phases 1-3) plus cancellation support (Phase 6)
15+
16+
## Architecture
17+
18+
### Current State
19+
20+
The `codex-acp` crate currently has:
21+
- `registry.rs`: Agent configuration registry (`AcpAgentConfig`, `AcpAgentRegistry`)
22+
- `lib.rs`: Re-exports registry and provides `get_agent_config()` helper
23+
24+
The integration point is in `codex-core/src/client.rs:173` which has:
25+
```rust
26+
todo!("ACP streaming not yet implemented")
27+
```
28+
29+
### Target Architecture
30+
31+
```
32+
┌─────────────────────────────────────────────────────────────────┐
33+
│ codex-core │
34+
│ ┌─────────────────────────────────────────────────────────────┐ │
35+
│ │ ModelClient │ │
36+
│ │ stream() → ResponseStream<ResponseEvent> │ │
37+
│ └─────────────────────────────────────────────────────────────┘ │
38+
│ │ │
39+
│ ▼ │
40+
│ ┌─────────────────────────────────────────────────────────────┐ │
41+
│ │ AcpClientAdapter │ │
42+
│ │ Converts ACP SessionUpdate → ResponseEvent │ │
43+
│ └─────────────────────────────────────────────────────────────┘ │
44+
└─────────────────────────────────────────────────────────────────┘
45+
46+
47+
┌─────────────────────────────────────────────────────────────────┐
48+
│ codex-acp │
49+
│ ┌─────────────────────────────────────────────────────────────┐ │
50+
│ │ AcpConnection │ │
51+
│ │ - Manages agent subprocess lifecycle │ │
52+
│ │ - Handles initialization handshake │ │
53+
│ │ - Routes JSON-RPC messages │ │
54+
│ └─────────────────────────────────────────────────────────────┘ │
55+
│ │ │
56+
│ ┌─────────────────────────────────────────────────────────────┐ │
57+
│ │ AcpSession │ │
58+
│ │ - Per-session state (modes, models) │ │
59+
│ │ - Session-scoped operations │ │
60+
│ └─────────────────────────────────────────────────────────────┘ │
61+
│ │ │
62+
│ ┌─────────────────────────────────────────────────────────────┐ │
63+
│ │ ClientDelegate │ │
64+
│ │ - Implements acp::Client trait │ │
65+
│ │ - Handles agent→client requests │ │
66+
│ │ - Permission requests, file I/O, terminals │ │
67+
│ └─────────────────────────────────────────────────────────────┘ │
68+
└─────────────────────────────────────────────────────────────────┘
69+
70+
71+
┌─────────────────┐
72+
│ Agent Process │
73+
│ (via stdio) │
74+
└─────────────────┘
75+
```
76+
77+
## Implementation Tasks
78+
79+
### Phase 1: Create `AcpConnection` struct
80+
**File:** `codex-rs/acp/src/connection.rs`
81+
82+
```rust
83+
pub struct AcpConnection {
84+
connection: acp::ClientSideConnection,
85+
agent_capabilities: acp::AgentCapabilities,
86+
child: tokio::process::Child,
87+
_io_task: tokio::task::JoinHandle<Result<(), acp::Error>>,
88+
_stderr_task: tokio::task::JoinHandle<()>,
89+
}
90+
91+
impl AcpConnection {
92+
pub async fn spawn(config: &AcpAgentConfig, cwd: &Path) -> Result<Self>;
93+
pub fn capabilities(&self) -> &acp::AgentCapabilities;
94+
}
95+
```
96+
97+
**Key responsibilities:**
98+
- Spawn agent subprocess with proper environment
99+
- Initialize JSON-RPC transport over stdin/stdout
100+
- Perform ACP initialization handshake
101+
- Version negotiation (minimum V1)
102+
- Store agent capabilities for later use
103+
104+
**Reference:** `zed/crates/agent_servers/src/acp.rs:82-220`
105+
106+
### Phase 2: Create `AcpSession` struct
107+
**File:** `codex-rs/acp/src/session.rs`
108+
109+
```rust
110+
pub struct AcpSession {
111+
session_id: acp::SessionId,
112+
modes: Option<acp::SessionModeState>,
113+
update_tx: mpsc::Sender<acp::SessionUpdate>,
114+
}
115+
```
116+
117+
**Key responsibilities:**
118+
- Track per-session state
119+
- Store optional session modes
120+
- Provide channel for update streaming
121+
122+
### Phase 3: Implement `ClientDelegate`
123+
**File:** `codex-rs/acp/src/client_delegate.rs`
124+
125+
```rust
126+
pub struct ClientDelegate {
127+
sessions: Arc<RwLock<HashMap<acp::SessionId, AcpSession>>>,
128+
permission_handler: Box<dyn PermissionHandler>,
129+
file_handler: Box<dyn FileHandler>,
130+
}
131+
132+
#[async_trait]
133+
impl acp::Client for ClientDelegate {
134+
async fn request_permission(&self, req: RequestPermissionRequest)
135+
-> Result<RequestPermissionResponse, acp::Error>;
136+
async fn write_text_file(&self, req: WriteTextFileRequest)
137+
-> Result<WriteTextFileResponse, acp::Error>;
138+
async fn read_text_file(&self, req: ReadTextFileRequest)
139+
-> Result<ReadTextFileResponse, acp::Error>;
140+
async fn session_notification(&self, notif: SessionNotification)
141+
-> Result<(), acp::Error>;
142+
// Terminal methods (initially stubbed)
143+
async fn create_terminal(&self, req: CreateTerminalRequest)
144+
-> Result<CreateTerminalResponse, acp::Error>;
145+
async fn terminal_output(&self, req: TerminalOutputRequest)
146+
-> Result<TerminalOutputResponse, acp::Error>;
147+
async fn kill_terminal_command(&self, req: KillTerminalCommandRequest)
148+
-> Result<KillTerminalCommandResponse, acp::Error>;
149+
async fn release_terminal(&self, req: ReleaseTerminalRequest)
150+
-> Result<ReleaseTerminalResponse, acp::Error>;
151+
async fn wait_for_terminal_exit(&self, req: WaitForTerminalExitRequest)
152+
-> Result<WaitForTerminalExitResponse, acp::Error>;
153+
}
154+
```
155+
156+
### Phase 4: Create `SessionUpdateTranslator`
157+
**File:** `codex-rs/acp/src/translator.rs`
158+
159+
Maps ACP `SessionUpdate` variants to codex `ResponseEvent` and `EventMsg`:
160+
161+
| ACP SessionUpdate | ResponseEvent / EventMsg |
162+
|-------------------|-------------------------|
163+
| `AgentMessageChunk(ContentBlock::Text)` | `OutputTextDelta(String)` |
164+
| `AgentMessageChunk(ContentBlock::Resource)` | `OutputItemDone(ResponseItem::Resource)` |
165+
| `AgentThoughtChunk` | `ReasoningContentDelta` |
166+
| `ToolCall` | `EventMsg::ExecCommandBegin` / custom tool events |
167+
| `ToolCallUpdate` | `EventMsg::ExecCommandEnd` / tool result events |
168+
| `Plan` | Custom plan event handling |
169+
| `UserMessageChunk` | Echo handling (typically ignored) |
170+
| `CurrentModeUpdate` | Mode change notification |
171+
| `AvailableCommandsUpdate` | Slash command updates |
172+
173+
```rust
174+
pub struct SessionUpdateTranslator;
175+
176+
impl SessionUpdateTranslator {
177+
pub fn translate(update: acp::SessionUpdate) -> Vec<ResponseEvent>;
178+
fn translate_tool_call(tc: acp::ToolCall) -> Vec<ResponseEvent>;
179+
fn translate_tool_call_update(tcu: acp::ToolCallUpdate) -> Vec<ResponseEvent>;
180+
}
181+
```
182+
183+
### Phase 5: Create `AcpStreamAdapter`
184+
**File:** `codex-rs/acp/src/stream_adapter.rs`
185+
186+
```rust
187+
pub struct AcpStreamAdapter {
188+
update_rx: mpsc::Receiver<acp::SessionUpdate>,
189+
translator: SessionUpdateTranslator,
190+
}
191+
192+
impl Stream for AcpStreamAdapter {
193+
type Item = Result<ResponseEvent>;
194+
195+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
196+
}
197+
```
198+
199+
### Phase 6: Integration with codex-core
200+
201+
#### Implement `stream_acp()` function
202+
**File:** `codex-rs/core/src/client.rs`
203+
204+
Replace the `todo!()` at line 173 with:
205+
206+
```rust
207+
async fn stream_acp(
208+
config: &AcpAgentConfig,
209+
messages: Vec<ChatMessage>,
210+
cwd: &Path,
211+
permission_handler: impl PermissionHandler,
212+
) -> Result<ResponseStream> {
213+
let (tx, rx) = mpsc::channel(32);
214+
215+
// Spawn fresh connection for this session
216+
let connection = AcpConnection::spawn(config, cwd).await?;
217+
218+
// Create session
219+
let session_id = connection.create_session(cwd).await?;
220+
221+
// Convert messages to ACP prompt format
222+
let prompt = convert_to_acp_prompt(&messages)?;
223+
224+
// Spawn prompt task
225+
tokio::spawn(async move {
226+
let result = connection.prompt(session_id, prompt, tx).await;
227+
// Handle completion - connection dropped when task ends
228+
});
229+
230+
Ok(ResponseStream { rx_event: rx })
231+
}
232+
```
233+
234+
### Phase 7: Cancellation Support
235+
236+
#### Implement cancellation
237+
**File:** `codex-rs/acp/src/connection.rs`
238+
239+
```rust
240+
impl AcpConnection {
241+
pub async fn cancel(&self, session_id: &acp::SessionId) -> Result<()> {
242+
self.connection.cancel(acp::CancelNotification::new(session_id.clone())).await
243+
}
244+
}
245+
```
246+
247+
Integrate with codex's existing cancellation mechanism (Ctrl+C handling).
248+
249+
## File Structure
250+
251+
```
252+
codex-rs/acp/src/
253+
├── lib.rs # Module exports
254+
├── registry.rs # Existing: agent config registry
255+
├── connection.rs # NEW: AcpConnection (subprocess management)
256+
├── session.rs # NEW: AcpSession (session state)
257+
├── client_delegate.rs # NEW: acp::Client implementation
258+
├── translator.rs # NEW: SessionUpdate → ResponseEvent
259+
├── stream_adapter.rs # NEW: Stream wrapper for ResponseStream
260+
└── handlers.rs # NEW: Permission/File handler traits
261+
```
262+
263+
## Dependencies
264+
265+
Add to `codex-rs/acp/Cargo.toml`:
266+
```toml
267+
[dependencies]
268+
agent-client-protocol = "0.7" # Already present
269+
tokio = { workspace = true, features = ["process", "sync"] }
270+
futures = { workspace = true }
271+
async-trait = { workspace = true }
272+
```
273+
274+
## Testing Strategy
275+
276+
### Unit Tests
277+
1. `SessionUpdateTranslator` - Test all mapping cases
278+
2. `ClientDelegate` - Mock permission/file handlers
279+
3. Connection initialization handshake
280+
281+
### Integration Tests
282+
1. Use `mock-acp-agent` for full protocol tests
283+
2. Test session lifecycle (new → prompt → cancel)
284+
3. Test permission request flow
285+
286+
### E2E Tests
287+
The reference tests are in `codex-rs/tui-pty-e2e/tests/prompt_flow.rs`. These tests spawn the full TUI and verify that prompts flow through to the mock agent and responses are displayed.
288+
289+
## Out of Scope (Deferred)
290+
291+
1. **Authentication** - Agents requiring auth can authenticate out-of-band initially
292+
2. **Session Loading** - `session/load` for resuming sessions
293+
3. **Session Listing** - `session/list` capability (unstable feature)
294+
4. **Model Selection** - `session/set_model` (unstable feature)
295+
5. **Terminal rendering** - Terminal UI handled by TUI, not ACP client
296+
6. **MCP server configuration** - Pass empty MCP servers initially
297+
7. **Connection pooling** - Spawn per session instead
298+
8. **Advanced permission bridge** - Basic bridge only
299+
9. **Advanced tool call mapping** - Basic mapping only
300+
301+
## Implementation Order
302+
303+
1. **Phase 1**: Create `AcpConnection` struct (connection.rs)
304+
2. **Phase 2**: Create `AcpSession` struct (session.rs)
305+
3. **Phase 3**: Implement `ClientDelegate` (client_delegate.rs)
306+
4. **Phase 4**: Create `SessionUpdateTranslator` (translator.rs)
307+
5. **Phase 5**: Create `AcpStreamAdapter` (stream_adapter.rs)
308+
6. **Phase 6**: Integration with codex-core client.rs (replace todo!())
309+
7. **Phase 7**: Cancellation support
310+
311+
## Verification Criteria
312+
313+
1. E2E tests in `codex-rs/tui-pty-e2e/tests/prompt_flow.rs` pass
314+
2. Can connect to mock-acp-agent and complete a prompt turn
315+
3. SessionUpdate events properly translate to ResponseEvent stream
316+
4. Permission requests properly flow through to TUI/CLI
317+
5. Cancellation (Ctrl+C) properly terminates agent operations
318+
319+
## Key Decision Points (Resolved)
320+
321+
1. **Connection lifecycle**: ✅ Spawn per session (simpler, fresh state)
322+
323+
2. **Permission handler interface**: ✅ Bridge to existing codex approval types (consistent UX)
324+
325+
3. **Tool call content**: ✅ Pass through to TUI for rendering (avoid duplication)
326+
327+
4. **Error handling**: Map to `anyhow::Error` with context for consistency

0 commit comments

Comments
 (0)