Skip to content

Commit 09ccaad

Browse files
committed
Add ACP session resume support
1 parent edaf5fb commit 09ccaad

14 files changed

Lines changed: 1285 additions & 402 deletions

File tree

anycode-backend/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ dirs = "6.0.0"
4646
shell-words = "1.1.0"
4747
lsp-types = "0.97.0"
4848
similar = "2.7.0"
49-
agent-client-protocol = "0.9.0"
50-
agent-client-protocol-schema = "0.10.1"
49+
agent-client-protocol = { version = "0.9.0", features = ["unstable_session_list", "unstable_session_resume"] }
50+
agent-client-protocol-schema = { version = "0.10.1", features = ["unstable_session_list", "unstable_session_resume"] }
5151
async-trait = "0.1"
5252
tokio-util = { version = "0.7.13", features = ["compat"] }
5353
unicode-segmentation = "1.12.0"

anycode-backend/src/acp.rs

Lines changed: 699 additions & 229 deletions
Large diffs are not rendered by default.

anycode-backend/src/handlers/acp_handler.rs

Lines changed: 150 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,79 @@
1-
use serde::{Deserialize, Serialize};
2-
use serde_json::{self, json};
3-
use socketioxide::{extract::{AckSender, Data, SocketRef, State}};
4-
use tracing::{info, error};
5-
use anyhow::anyhow;
1+
use crate::acp::{AcpMessage, AcpPermissionMode};
62
use crate::app_state::AppState;
73
use crate::error_ack;
8-
use crate::acp::{AcpMessage, AcpPermissionMode};
4+
use serde::{Deserialize, Serialize};
5+
use serde_json::{self, json};
6+
use socketioxide::extract::{AckSender, Data, SocketRef, State};
97
use tokio::sync::broadcast;
8+
use tracing::{error, info};
109

1110
#[derive(Debug, Serialize, Deserialize, Clone)]
1211
pub struct AcpStartRequest {
1312
pub agent_id: String,
1413
pub agent_name: String,
1514
pub command: String,
1615
pub args: Vec<String>,
16+
pub resume_session_id: Option<String>,
1717
}
1818

1919
pub async fn handle_acp_start(
2020
socket: SocketRef,
2121
Data(request): Data<AcpStartRequest>,
2222
ack: AckSender,
23-
state: State<AppState>
23+
state: State<AppState>,
2424
) {
2525
info!("handle_acp_start {:?}", request);
26-
let AcpStartRequest { agent_id, agent_name, command, args } = request;
27-
let mut acp_manager = state.acp_manager.lock().await;
28-
29-
// Check if agent already exists
30-
if acp_manager.get_agent(&agent_id).is_some() {
31-
error_ack!(ack, &agent_id, "Agent {} already running", agent_id);
32-
}
26+
let AcpStartRequest {
27+
agent_id,
28+
agent_name,
29+
command,
30+
args,
31+
resume_session_id,
32+
} = request;
33+
let (start_result, msg_rx) = {
34+
let mut acp_manager = state.acp_manager.lock().await;
35+
36+
// Check if agent already exists
37+
if acp_manager.get_agent(&agent_id).is_some() {
38+
error_ack!(ack, &agent_id, "Agent {} already running", agent_id);
39+
}
3340

34-
// Start the agent
35-
let start_result = acp_manager.start_agent(
36-
agent_id.clone(), agent_name, &command, &args
37-
).await;
41+
let start_result = acp_manager
42+
.start_agent(
43+
agent_id.clone(),
44+
agent_name,
45+
&command,
46+
&args,
47+
resume_session_id,
48+
)
49+
.await;
50+
51+
let msg_rx = if start_result.is_ok() {
52+
acp_manager.subscribe(&agent_id)
53+
} else {
54+
None
55+
};
56+
57+
(start_result, msg_rx)
58+
};
3859

3960
match start_result {
40-
Ok(_) => {
61+
Ok(session_id) => {
4162
info!("ACP agent {} started successfully", agent_id);
42-
43-
// Subscribe to agent messages
44-
let msg_rx = acp_manager.subscribe(&agent_id);
45-
63+
64+
send_agent_history(&socket, &state, &agent_id).await;
65+
4666
// Set up message forwarding
4767
if let Some(msg_rx) = msg_rx {
4868
let agent_id = agent_id.clone();
69+
let state = state.0.clone();
4970
tokio::spawn(async move {
50-
forward_agent_messages(socket, agent_id, msg_rx).await;
71+
forward_agent_messages(socket, state, agent_id, msg_rx).await;
5172
});
5273
}
5374

54-
ack.send(&json!({ "success": true, "agent_id": agent_id })).ok();
75+
ack.send(&json!({ "success": true, "agent_id": agent_id, "session_id": session_id }))
76+
.ok();
5577
}
5678
Err(e) => {
5779
error!("Failed to start ACP agent {}: {}", agent_id, e);
@@ -62,7 +84,10 @@ pub async fn handle_acp_start(
6284

6385
/// Forward new agent messages to the socket
6486
pub(crate) async fn forward_agent_messages(
65-
socket: SocketRef, agent_id: String, mut msg_rx: broadcast::Receiver<AcpMessage>,
87+
socket: SocketRef,
88+
state: AppState,
89+
agent_id: String,
90+
mut msg_rx: broadcast::Receiver<AcpMessage>,
6691
) {
6792
loop {
6893
match msg_rx.recv().await {
@@ -71,30 +96,43 @@ pub(crate) async fn forward_agent_messages(
7196
let result = socket.emit("acp:message", &data);
7297
if result.is_err() {
7398
error!(
74-
"Failed to forward agent message for agent {}: {} to socket {}",
75-
agent_id, result.err().unwrap(), socket.id
99+
"Failed to forward agent message for agent {}: {} to socket {}",
100+
agent_id,
101+
result.err().unwrap(),
102+
socket.id
76103
);
77104
break; // Socket disconnected
78105
}
79106
}
80107
Err(broadcast::error::RecvError::Closed) => {
81-
error!(
82-
"Channel closed for agent {}: {}",
83-
agent_id, socket.id
84-
);
108+
error!("Channel closed for agent {}: {}", agent_id, socket.id);
85109
break; // Channel closed
86110
}
87-
Err(broadcast::error::RecvError::Lagged(_)) => {
111+
Err(broadcast::error::RecvError::Lagged(skipped)) => {
88112
error!(
89-
"Lagged behind for agent {}: {}",
90-
agent_id, socket.id
113+
"Lagged behind for agent {}: {} (skipped {} messages), sending full history resync",
114+
agent_id, socket.id, skipped
91115
);
116+
send_agent_history(&socket, &State(state.clone()), &agent_id).await;
92117
continue; // Lagged behind, continue
93118
}
94119
}
95120
}
96121
}
97122

123+
async fn send_agent_history(socket: &SocketRef, state: &State<AppState>, agent_id: &str) {
124+
let mut acp_manager = state.acp_manager.lock().await;
125+
if let Some(history) = acp_manager.get_agent_history(agent_id).await {
126+
let data = json!({ "agent_id": agent_id, "history": history });
127+
if let Err(err) = socket.emit("acp:history", &data) {
128+
error!(
129+
"Failed to send ACP history for agent {} to socket {}: {}",
130+
agent_id, socket.id, err
131+
);
132+
}
133+
}
134+
}
135+
98136
#[derive(Debug, Serialize, Deserialize, Clone)]
99137
pub struct AcpPromptRequest {
100138
pub agent_id: String,
@@ -104,7 +142,7 @@ pub struct AcpPromptRequest {
104142
pub async fn handle_acp_prompt(
105143
Data(request): Data<AcpPromptRequest>,
106144
ack: AckSender,
107-
state: State<AppState>
145+
state: State<AppState>,
108146
) {
109147
info!("handle_acp_prompt {:?}", request);
110148
let AcpPromptRequest { agent_id, prompt } = request;
@@ -138,7 +176,7 @@ pub struct AcpStopRequest {
138176
pub async fn handle_acp_stop(
139177
Data(request): Data<AcpStopRequest>,
140178
ack: AckSender,
141-
state: State<AppState>
179+
state: State<AppState>,
142180
) {
143181
info!("handle_acp_stop {:?}", request);
144182
let AcpStopRequest { agent_id } = request;
@@ -157,7 +195,7 @@ pub struct AcpCancelRequest {
157195
pub async fn handle_acp_cancel(
158196
Data(request): Data<AcpCancelRequest>,
159197
ack: AckSender,
160-
state: State<AppState>
198+
state: State<AppState>,
161199
) {
162200
info!("handle_acp_cancel {:?}", request);
163201
let AcpCancelRequest { agent_id } = request;
@@ -175,22 +213,48 @@ pub async fn handle_acp_cancel(
175213
}
176214
}
177215

178-
pub async fn handle_acp_list(
179-
ack: AckSender,
180-
state: State<AppState>
181-
) {
216+
pub async fn handle_acp_list(ack: AckSender, state: State<AppState>) {
182217
info!("handle_acp_list");
183218

184219
let acp_manager = state.acp_manager.lock().await;
185220
let agents = acp_manager.list_agents();
186-
187-
let agents_json: Vec<serde_json::Value> = agents.iter()
221+
222+
let agents_json: Vec<serde_json::Value> = agents
223+
.iter()
188224
.map(|(id, name)| json!({ "id": id, "name": name }))
189225
.collect();
190226

191-
ack.send(&json!({ "success": true, "agents": agents_json })).ok();
227+
ack.send(&json!({ "success": true, "agents": agents_json }))
228+
.ok();
192229
}
193230

231+
#[derive(Debug, Serialize, Deserialize, Clone)]
232+
pub struct AcpSessionsListRequest {
233+
pub command: String,
234+
pub args: Vec<String>,
235+
}
236+
237+
pub async fn handle_acp_sessions_list(
238+
Data(request): Data<AcpSessionsListRequest>,
239+
ack: AckSender,
240+
state: State<AppState>,
241+
) {
242+
info!("handle_acp_sessions_list {:?}", request);
243+
let AcpSessionsListRequest { command, args } = request;
244+
let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
245+
246+
let acp_manager = state.acp_manager.lock().await;
247+
match acp_manager.list_sessions(&command, &args, cwd).await {
248+
Ok(sessions) => {
249+
ack.send(&json!({ "success": true, "sessions": sessions }))
250+
.ok();
251+
}
252+
Err(err) => {
253+
error!("Failed to list ACP sessions: {}", err);
254+
error_ack!(ack, &command, "Failed to list ACP sessions: {}", err);
255+
}
256+
}
257+
}
194258

195259
#[derive(Debug, Serialize, Deserialize, Clone)]
196260
pub struct AcpPermissionResponseRequest {
@@ -207,10 +271,14 @@ pub struct AcpPermissionModeRequest {
207271
pub async fn handle_acp_permission_response(
208272
Data(request): Data<AcpPermissionResponseRequest>,
209273
ack: AckSender,
210-
state: State<AppState>
274+
state: State<AppState>,
211275
) {
212276
info!("handle_acp_permission_response {:?}", request);
213-
let AcpPermissionResponseRequest { agent_id, permission_id, option_id } = request;
277+
let AcpPermissionResponseRequest {
278+
agent_id,
279+
permission_id,
280+
option_id,
281+
} = request;
214282

215283
let mut acp_manager = state.acp_manager.lock().await;
216284

@@ -221,13 +289,22 @@ pub async fn handle_acp_permission_response(
221289
}
222290
};
223291

224-
match agent.send_permission_response(&permission_id, option_id).await {
292+
match agent
293+
.send_permission_response(&permission_id, option_id)
294+
.await
295+
{
225296
Ok(true) => {
226-
info!("Permission response sent for agent {} permission {}", agent_id, permission_id);
297+
info!(
298+
"Permission response sent for agent {} permission {}",
299+
agent_id, permission_id
300+
);
227301
ack.send(&json!({ "success": true })).ok();
228302
}
229303
Ok(false) => {
230-
error!("Permission {} not found for agent {}", permission_id, agent_id);
304+
error!(
305+
"Permission {} not found for agent {}",
306+
permission_id, agent_id
307+
);
231308
error_ack!(ack, &agent_id, "Permission {} not found", permission_id);
232309
}
233310
Err(e) => {
@@ -240,7 +317,7 @@ pub async fn handle_acp_permission_response(
240317
pub async fn handle_acp_permission_mode(
241318
Data(request): Data<AcpPermissionModeRequest>,
242319
ack: AckSender,
243-
state: State<AppState>
320+
state: State<AppState>,
244321
) {
245322
info!("handle_acp_permission_mode {:?}", request);
246323
let AcpPermissionModeRequest { mode } = request;
@@ -256,19 +333,16 @@ pub async fn handle_acp_permission_mode(
256333
let acp_manager = state.acp_manager.lock().await;
257334
acp_manager.set_permission_mode(permission_mode);
258335

259-
ack.send(&json!({ "success": true, "mode": permission_mode.as_str() })).ok();
336+
ack.send(&json!({ "success": true, "mode": permission_mode.as_str() }))
337+
.ok();
260338
}
261339

262-
pub async fn handle_acp_reconnect(
263-
socket: SocketRef,
264-
ack: AckSender,
265-
state: State<AppState>
266-
) {
340+
pub async fn handle_acp_reconnect(socket: SocketRef, ack: AckSender, state: State<AppState>) {
267341
info!("handle_acp_reconnect for socket {}", socket.id);
268-
342+
269343
let mut acp_manager = state.acp_manager.lock().await;
270344
let agents = acp_manager.list_agents();
271-
345+
272346
// Re-subscribe to all running agents
273347
for (agent_id, _) in &agents {
274348
// Get history and send it if not empty
@@ -282,22 +356,25 @@ pub async fn handle_acp_reconnect(
282356
}
283357
}
284358
}
285-
359+
286360
// Subscribe to new messages in a separate task
287361
if let Some(msg_rx) = acp_manager.subscribe(agent_id) {
288362
let socket_clone = socket.clone();
289363
let agent_id_clone = agent_id.clone();
364+
let state_clone = state.0.clone();
290365
tokio::spawn(async move {
291-
forward_agent_messages(socket_clone, agent_id_clone, msg_rx).await;
366+
forward_agent_messages(socket_clone, state_clone, agent_id_clone, msg_rx).await;
292367
});
293368
}
294369
}
295-
296-
let agents_json: Vec<serde_json::Value> = agents.iter()
370+
371+
let agents_json: Vec<serde_json::Value> = agents
372+
.iter()
297373
.map(|(id, name)| json!({ "id": id, "name": name }))
298374
.collect();
299375

300-
ack.send(&json!({ "success": true, "agents": agents_json })).ok();
376+
ack.send(&json!({ "success": true, "agents": agents_json }))
377+
.ok();
301378
}
302379

303380
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -312,15 +389,21 @@ pub struct AcpUndoRequest {
312389
pub async fn handle_acp_undo(
313390
Data(request): Data<AcpUndoRequest>,
314391
ack: AckSender,
315-
state: State<AppState>
392+
state: State<AppState>,
316393
) {
317394
info!("handle_acp_undo {:?}", request);
318-
let AcpUndoRequest { agent_id, checkpoint_id, prompt } = request;
395+
let AcpUndoRequest {
396+
agent_id,
397+
checkpoint_id,
398+
prompt,
399+
} = request;
319400

320401
let acp_manager = state.acp_manager.lock().await;
321402

322403
let result = if let Some(checkpoint_id) = checkpoint_id {
323-
acp_manager.restore_to_checkpoint_id(&agent_id, &checkpoint_id).await
404+
acp_manager
405+
.restore_to_checkpoint_id(&agent_id, &checkpoint_id)
406+
.await
324407
} else if let Some(prompt) = prompt {
325408
acp_manager.restore_to_prompt(&agent_id, &prompt).await
326409
} else {

0 commit comments

Comments
 (0)