Skip to content

Commit aa47c80

Browse files
committed
fix(acp): Fixup WireApi::ACP basic loop
1 parent 155db08 commit aa47c80

6 files changed

Lines changed: 213 additions & 13 deletions

File tree

codex-rs/acp/src/agent.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl AgentProcess {
138138
debug!("Initializing ACP agent");
139139

140140
let request = InitializeRequest {
141-
protocol_version: agent_client_protocol::V0, // Gemini uses protocol version 0
141+
protocol_version: agent_client_protocol::V0, // Gemini uses protocol version 0
142142
client_capabilities: serde_json::from_value(client_capabilities.clone())
143143
.context("Invalid client capabilities")?,
144144
client_info: None,
@@ -147,7 +147,10 @@ impl AgentProcess {
147147

148148
// Log the initialization request
149149
match serde_json::to_string_pretty(&request) {
150-
Ok(json) => debug!("=== INITIALIZE REQUEST JSON ===\n{}\n=== END REQUEST ===", json),
150+
Ok(json) => debug!(
151+
"=== INITIALIZE REQUEST JSON ===\n{}\n=== END REQUEST ===",
152+
json
153+
),
151154
Err(e) => debug!("Failed to serialize init request to JSON: {}", e),
152155
}
153156

@@ -159,7 +162,10 @@ impl AgentProcess {
159162

160163
// Log the full initialization response
161164
match serde_json::to_string_pretty(&response) {
162-
Ok(json) => debug!("=== INITIALIZE RESPONSE JSON ===\n{}\n=== END RESPONSE ===", json),
165+
Ok(json) => debug!(
166+
"=== INITIALIZE RESPONSE JSON ===\n{}\n=== END RESPONSE ===",
167+
json
168+
),
163169
Err(e) => debug!("Failed to serialize init response to JSON: {}", e),
164170
}
165171

@@ -187,7 +193,10 @@ impl AgentProcess {
187193

188194
// Serialize request to JSON for debugging
189195
match serde_json::to_string_pretty(&request) {
190-
Ok(json) => debug!("=== NEW_SESSION REQUEST JSON ===\n{}\n=== END REQUEST ===", json),
196+
Ok(json) => debug!(
197+
"=== NEW_SESSION REQUEST JSON ===\n{}\n=== END REQUEST ===",
198+
json
199+
),
191200
Err(e) => debug!("Failed to serialize request to JSON: {}", e),
192201
}
193202

@@ -197,16 +206,22 @@ impl AgentProcess {
197206
error!("Protocol error creating session: {:?}", e);
198207

199208
// Try to extract and log error details as JSON if available
200-
if let Some(err_str) = format!("{:?}", e).split("data:").nth(1) {
201-
error!("=== ERROR RESPONSE DETAILS ===\n{}\n=== END ERROR ===", err_str);
209+
if let Some(err_str) = format!("{e:?}").split("data:").nth(1) {
210+
error!(
211+
"=== ERROR RESPONSE DETAILS ===\n{}\n=== END ERROR ===",
212+
err_str
213+
);
202214
}
203215

204216
anyhow::anyhow!("Failed to create session: {e}")
205217
})?;
206218

207219
// Log successful response as JSON
208220
match serde_json::to_string_pretty(&response) {
209-
Ok(json) => debug!("=== NEW_SESSION RESPONSE JSON ===\n{}\n=== END RESPONSE ===", json),
221+
Ok(json) => debug!(
222+
"=== NEW_SESSION RESPONSE JSON ===\n{}\n=== END RESPONSE ===",
223+
json
224+
),
210225
Err(e) => debug!("Failed to serialize response to JSON: {}", e),
211226
}
212227

codex-rs/core/src/client.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,10 @@ impl ModelClient {
224224
debug!("Looking up ACP agent for model: {}", &self.config.model);
225225
let agent_config = codex_acp::get_agent_config(&self.config.model)
226226
.map_err(|e| CodexErr::Fatal(format!("ACP agent config error: {e}")))?;
227-
debug!("Resolved ACP provider: {}, command: {}",
228-
agent_config.provider,
229-
agent_config.command);
227+
debug!(
228+
"Resolved ACP provider: {}, command: {}",
229+
agent_config.provider, agent_config.command
230+
);
230231

231232
// Create ACP model client
232233
let acp_client = codex_acp::AcpModelClient::new(
@@ -251,12 +252,57 @@ impl ModelClient {
251252

252253
tokio::spawn(async move {
253254
use futures::StreamExt;
255+
let mut created_sent = false;
256+
let mut assistant_item_sent = false;
257+
let mut reasoning_item_sent = false;
258+
254259
while let Some(acp_event_result) = acp_stream.next().await {
260+
// Send Created event at stream start
261+
if !created_sent {
262+
if tx.send(Ok(ResponseEvent::Created)).await.is_err() {
263+
break;
264+
}
265+
created_sent = true;
266+
}
267+
255268
let response_event = match acp_event_result {
256269
Ok(codex_acp::AcpEvent::TextDelta(text)) => {
270+
// Send OutputItemAdded before first TextDelta
271+
if !assistant_item_sent {
272+
let item = ResponseItem::Message {
273+
id: None,
274+
role: "assistant".to_string(),
275+
content: vec![],
276+
};
277+
if tx
278+
.send(Ok(ResponseEvent::OutputItemAdded(item)))
279+
.await
280+
.is_err()
281+
{
282+
break;
283+
}
284+
assistant_item_sent = true;
285+
}
257286
Ok(ResponseEvent::OutputTextDelta(text))
258287
}
259288
Ok(codex_acp::AcpEvent::ReasoningDelta(text)) => {
289+
// Send OutputItemAdded before first ReasoningDelta
290+
if !reasoning_item_sent {
291+
let item = ResponseItem::Reasoning {
292+
id: String::new(),
293+
summary: Vec::new(),
294+
content: Some(vec![]),
295+
encrypted_content: None,
296+
};
297+
if tx
298+
.send(Ok(ResponseEvent::OutputItemAdded(item)))
299+
.await
300+
.is_err()
301+
{
302+
break;
303+
}
304+
reasoning_item_sent = true;
305+
}
260306
Ok(ResponseEvent::ReasoningContentDelta {
261307
delta: text,
262308
content_index: 0,

codex-rs/core/src/codex.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2235,7 +2235,7 @@ async fn try_run_turn(
22352235
sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event))
22362236
.await;
22372237
} else {
2238-
error_or_panic("ReasoningSummaryDelta without active item".to_string());
2238+
error_or_panic("OutputTextDelta without active item".to_string());
22392239
}
22402240
}
22412241
ResponseEvent::ReasoningSummaryDelta {

codex-rs/core/tests/acp_integration.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,107 @@ async fn test_acp_stream_with_mock_agent() {
121121
.any(|e| matches!(e, ResponseEvent::Completed { .. }));
122122
assert!(completed, "Should receive Completed event");
123123
}
124+
125+
#[tokio::test]
126+
async fn test_acp_event_ordering() {
127+
// Create ACP provider for mock-acp-agent
128+
let provider = ModelProviderInfo {
129+
name: "mock-acp".into(),
130+
base_url: None,
131+
env_key: None,
132+
env_key_instructions: None,
133+
experimental_bearer_token: None,
134+
wire_api: WireApi::Acp,
135+
query_params: None,
136+
http_headers: None,
137+
env_http_headers: None,
138+
request_max_retries: Some(0),
139+
stream_max_retries: Some(0),
140+
stream_idle_timeout_ms: Some(5_000),
141+
requires_openai_auth: false,
142+
};
143+
144+
// Load default config
145+
let codex_home = TempDir::new().expect("Failed to create temp dir");
146+
let mut config = load_default_config_for_test(&codex_home);
147+
config.model = "mock-model".to_string();
148+
config.model_provider_id = provider.name.clone();
149+
config.model_provider = provider.clone();
150+
let effort = config.model_reasoning_effort;
151+
let summary = config.model_reasoning_summary;
152+
let config = Arc::new(config);
153+
154+
let conversation_id = ConversationId::new();
155+
156+
let otel_event_manager = OtelEventManager::new(
157+
conversation_id,
158+
config.model.as_str(),
159+
config.model_family.slug.as_str(),
160+
None,
161+
Some("test@test.com".to_string()),
162+
Some(AuthMode::ChatGPT),
163+
false,
164+
"test".to_string(),
165+
);
166+
167+
let client = ModelClient::new(
168+
Arc::clone(&config),
169+
None,
170+
otel_event_manager,
171+
provider,
172+
effort,
173+
summary,
174+
conversation_id,
175+
SessionSource::Exec,
176+
);
177+
178+
let mut prompt = Prompt::default();
179+
prompt.input = vec![ResponseItem::Message {
180+
id: None,
181+
role: "user".to_string(),
182+
content: vec![ContentItem::InputText {
183+
text: "Hello".to_string(),
184+
}],
185+
}];
186+
187+
// Stream response
188+
let mut stream = client.stream(&prompt).await.expect("Stream should start");
189+
190+
// Collect events
191+
let mut events = Vec::new();
192+
while let Some(event_result) = stream.next().await {
193+
let event = event_result.expect("Event should not be error");
194+
events.push(event);
195+
}
196+
197+
// Verify event ordering follows Created -> OutputItemAdded -> Deltas pattern
198+
assert!(!events.is_empty(), "Should receive events from mock agent");
199+
200+
// First event should be Created
201+
assert!(
202+
matches!(events[0], ResponseEvent::Created),
203+
"First event should be Created, got: {:?}",
204+
events[0]
205+
);
206+
207+
// Find first OutputItemAdded event
208+
let output_item_added_index = events
209+
.iter()
210+
.position(|e| matches!(e, ResponseEvent::OutputItemAdded(_)))
211+
.expect("Should have OutputItemAdded event");
212+
213+
// OutputItemAdded should come before any deltas
214+
for (i, event) in events.iter().enumerate() {
215+
match event {
216+
ResponseEvent::OutputTextDelta(_) | ResponseEvent::ReasoningContentDelta { .. } => {
217+
assert!(
218+
i > output_item_added_index,
219+
"Delta event at index {} should come after OutputItemAdded at index {}",
220+
i,
221+
output_item_added_index
222+
);
223+
}
224+
_ => {}
225+
}
226+
}
227+
}

codex-rs/tui-integration-tests/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl Drop for TuiSession {
5656
}
5757

5858
impl TuiSession {
59-
/// Spawn codex with mock-acp-agent in a temporary directory
59+
/// Spawn codex using mock-acp-agent binary in a temporary directory
6060
pub fn spawn(rows: u16, cols: u16) -> Result<Self> {
6161
let temp_dir = tempfile::tempdir()?;
6262
let hello_py = temp_dir.path().join("hello.py");
@@ -409,7 +409,7 @@ impl Default for SessionConfig {
409409
impl SessionConfig {
410410
pub fn new() -> Self {
411411
Self {
412-
model: "mock-acp".to_string(),
412+
model: "mock-model".to_string(),
413413
mock_agent_env: HashMap::new(),
414414
no_color: true,
415415
approval_policy: Some(ApprovalPolicy::OnFailure),
@@ -419,6 +419,11 @@ impl SessionConfig {
419419
}
420420
}
421421

422+
pub fn with_model(mut self, model: String) -> Self {
423+
self.model = model;
424+
self
425+
}
426+
422427
pub fn with_mock_response(mut self, response: impl Into<String>) -> Self {
423428
self.mock_agent_env
424429
.insert("MOCK_AGENT_RESPONSE".to_string(), response.into());

codex-rs/tui-integration-tests/tests/prompt_flow.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,36 @@ fn test_submit_prompt_default_response() {
4343
assert_snapshot!("prompt_submitted", session.screen_contents());
4444
}
4545

46+
#[test]
47+
fn test_submit_prompt_missing_model() {
48+
let mut session = TuiSession::spawn_with_config(
49+
24,
50+
80,
51+
SessionConfig::new().with_model("nonexistent".to_owned()),
52+
)
53+
.expect("Failed to spawn codex");
54+
55+
session.wait_for_text("? for shortcuts", TIMEOUT).unwrap();
56+
57+
// Type prompt
58+
session.send_str("Hello").unwrap();
59+
std::thread::sleep(Duration::from_millis(100));
60+
session.wait_for_text("Hello", TIMEOUT).unwrap();
61+
62+
// Submit
63+
session.send_key(Key::Enter).unwrap();
64+
std::thread::sleep(Duration::from_millis(100));
65+
66+
session
67+
.wait_for_text(
68+
"ACP agent config error: Unknown ACP model: nonexistent-acp",
69+
Duration::from_secs(10),
70+
)
71+
.unwrap();
72+
73+
assert_snapshot!("missing_model", session.screen_contents());
74+
}
75+
4676
// #[test]
4777
// fn test_submit_prompt_custom_response() {
4878
// let config = SessionConfig::new()

0 commit comments

Comments
 (0)