Skip to content

Commit a67f4cd

Browse files
committed
feat(acp-nats-agent): add server-side ACP agent framework
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent c8e1839 commit a67f4cd

7 files changed

Lines changed: 932 additions & 1 deletion

File tree

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[package]
2+
name = "acp-nats-agent"
3+
version = "0.0.1"
4+
edition = "2024"
5+
6+
[lints]
7+
workspace = true
8+
9+
[dependencies]
10+
acp-nats = { workspace = true }
11+
agent-client-protocol = { workspace = true, features = [
12+
"unstable_auth_methods",
13+
"unstable_boolean_config",
14+
"unstable_cancel_request",
15+
"unstable_message_id",
16+
"unstable_session_close",
17+
"unstable_session_fork",
18+
"unstable_session_model",
19+
"unstable_session_resume",
20+
"unstable_session_usage",
21+
] }
22+
async-nats = { workspace = true }
23+
async-trait = { workspace = true }
24+
bytes = { workspace = true }
25+
futures = { workspace = true }
26+
serde = { workspace = true }
27+
serde_json = { workspace = true }
28+
tokio = { workspace = true, features = ["rt", "macros", "sync"] }
29+
tracing = { workspace = true }
30+
trogon-nats = { workspace = true }
31+
trogon-std = { workspace = true }
32+
33+
[dev-dependencies]
34+
tokio = { workspace = true, features = ["test-util"] }
35+
trogon-nats = { workspace = true, features = ["test-support"] }
36+
trogon-std = { workspace = true, features = ["test-support"] }
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
use crate::dispatch::{AgentMethod, parse_agent_subject};
2+
use acp_nats::acp_prefix::AcpPrefix;
3+
use acp_nats::client_proxy::{NatsClientProxy, NatsClientProxyConfig};
4+
use acp_nats::session_id::AcpSessionId;
5+
use agent_client_protocol::{
6+
Agent, AuthenticateRequest, CancelNotification, CloseSessionRequest, ExtRequest,
7+
ForkSessionRequest, InitializeRequest, ListSessionsRequest, LoadSessionRequest,
8+
NewSessionRequest, PromptRequest, ResumeSessionRequest, SetSessionConfigOptionRequest,
9+
SetSessionModeRequest, SetSessionModelRequest,
10+
};
11+
use async_nats::Message;
12+
use bytes::Bytes;
13+
use futures::future::LocalBoxFuture;
14+
use futures::StreamExt;
15+
use std::rc::Rc;
16+
use std::time::Duration;
17+
use tracing::{error, info, warn};
18+
use trogon_nats::{FlushClient, PublishClient, RequestClient, SubscribeClient};
19+
20+
pub struct AgentSideNatsConnectionConfig {
21+
pub acp_prefix: AcpPrefix,
22+
pub operation_timeout: Duration,
23+
}
24+
25+
impl AgentSideNatsConnectionConfig {
26+
pub fn new(acp_prefix: AcpPrefix) -> Self {
27+
Self {
28+
acp_prefix,
29+
operation_timeout: Duration::from_secs(30),
30+
}
31+
}
32+
33+
pub fn with_operation_timeout(mut self, timeout: Duration) -> Self {
34+
self.operation_timeout = timeout;
35+
self
36+
}
37+
}
38+
39+
pub struct AgentSideNatsConnection<N> {
40+
nats: N,
41+
config: AgentSideNatsConnectionConfig,
42+
}
43+
44+
impl<N> AgentSideNatsConnection<N>
45+
where
46+
N: SubscribeClient + RequestClient + PublishClient + FlushClient + Clone + 'static,
47+
{
48+
/// Creates a new agent-side NATS connection.
49+
///
50+
/// Mirrors the ACP SDK's `AgentSideConnection::new` pattern:
51+
/// - Takes an `Agent` impl to handle incoming requests
52+
/// - Takes a NATS client instead of byte streams
53+
/// - Takes a `spawn` function for concurrency control
54+
/// - Returns `(Self, io_task)` where `Self` implements `Client`-creation per session
55+
///
56+
/// The returned future subscribes to NATS agent subjects, dispatches incoming
57+
/// messages to the agent via the spawn function, and runs until the subscriptions end.
58+
pub fn new(
59+
agent: impl Agent + 'static,
60+
nats: N,
61+
config: AgentSideNatsConnectionConfig,
62+
spawn: impl Fn(LocalBoxFuture<'static, ()>) + 'static,
63+
) -> (Self, impl std::future::Future<Output = ()>) {
64+
let nats_for_serve = nats.clone();
65+
let prefix = config.acp_prefix.as_str().to_string();
66+
67+
let io_task = async move {
68+
serve(agent, nats_for_serve, &prefix, spawn).await;
69+
};
70+
71+
let conn = Self { nats, config };
72+
(conn, io_task)
73+
}
74+
75+
pub fn client_for_session(&self, session_id: AcpSessionId) -> NatsClientProxy<N> {
76+
NatsClientProxy::new(
77+
self.nats.clone(),
78+
NatsClientProxyConfig {
79+
session_id,
80+
prefix: self.config.acp_prefix.clone(),
81+
timeout: self.config.operation_timeout,
82+
},
83+
)
84+
}
85+
}
86+
87+
async fn serve<N, A>(
88+
agent: A,
89+
nats: N,
90+
prefix: &str,
91+
spawn: impl Fn(LocalBoxFuture<'static, ()>) + 'static,
92+
) where
93+
N: SubscribeClient + PublishClient + Clone + 'static,
94+
A: Agent + 'static,
95+
{
96+
let global_wildcard = acp_nats::nats::agent::wildcards::all(prefix);
97+
let session_wildcard = acp_nats::nats::agent::wildcards::all_sessions(prefix);
98+
99+
info!(
100+
global = %global_wildcard,
101+
session = %session_wildcard,
102+
"Starting agent-side NATS connection"
103+
);
104+
105+
let global_sub = match nats.subscribe(global_wildcard).await {
106+
Ok(sub) => sub,
107+
Err(e) => {
108+
error!(error = %e, "Failed to subscribe to global agent subjects");
109+
return;
110+
}
111+
};
112+
113+
let session_sub = match nats.subscribe(session_wildcard).await {
114+
Ok(sub) => sub,
115+
Err(e) => {
116+
error!(error = %e, "Failed to subscribe to session agent subjects");
117+
return;
118+
}
119+
};
120+
121+
let mut merged = futures::stream::select(global_sub, session_sub);
122+
let agent = Rc::new(agent);
123+
let nats = Rc::new(nats);
124+
125+
while let Some(msg) = merged.next().await {
126+
let agent = agent.clone();
127+
let nats = nats.clone();
128+
spawn(Box::pin(async move {
129+
dispatch_message(msg, agent.as_ref(), nats.as_ref()).await;
130+
}));
131+
}
132+
133+
info!("Agent-side NATS connection ended");
134+
}
135+
136+
async fn dispatch_message<N: PublishClient, A: Agent>(msg: Message, agent: &A, nats: &N) {
137+
let subject = msg.subject.as_str();
138+
139+
let parsed = match parse_agent_subject(subject) {
140+
Some(p) => p,
141+
None => {
142+
warn!(subject, "Failed to parse agent subject");
143+
return;
144+
}
145+
};
146+
147+
let result = match parsed.method {
148+
AgentMethod::Initialize => {
149+
handle_request(&msg, nats, |req: InitializeRequest| agent.initialize(req)).await
150+
}
151+
AgentMethod::Authenticate => {
152+
handle_request(&msg, nats, |req: AuthenticateRequest| {
153+
agent.authenticate(req)
154+
})
155+
.await
156+
}
157+
AgentMethod::SessionNew => {
158+
handle_request(&msg, nats, |req: NewSessionRequest| agent.new_session(req)).await
159+
}
160+
AgentMethod::SessionList => {
161+
handle_request(&msg, nats, |req: ListSessionsRequest| {
162+
agent.list_sessions(req)
163+
})
164+
.await
165+
}
166+
AgentMethod::SessionLoad => {
167+
handle_request(&msg, nats, |req: LoadSessionRequest| {
168+
agent.load_session(req)
169+
})
170+
.await
171+
}
172+
AgentMethod::SessionPrompt => {
173+
handle_request(&msg, nats, |req: PromptRequest| agent.prompt(req)).await
174+
}
175+
AgentMethod::SessionCancel => {
176+
handle_notification(&msg, |req: CancelNotification| agent.cancel(req)).await
177+
}
178+
AgentMethod::SessionSetMode => {
179+
handle_request(&msg, nats, |req: SetSessionModeRequest| {
180+
agent.set_session_mode(req)
181+
})
182+
.await
183+
}
184+
AgentMethod::SessionSetConfigOption => {
185+
handle_request(&msg, nats, |req: SetSessionConfigOptionRequest| {
186+
agent.set_session_config_option(req)
187+
})
188+
.await
189+
}
190+
AgentMethod::SessionSetModel => {
191+
handle_request(&msg, nats, |req: SetSessionModelRequest| {
192+
agent.set_session_model(req)
193+
})
194+
.await
195+
}
196+
AgentMethod::SessionFork => {
197+
handle_request(&msg, nats, |req: ForkSessionRequest| {
198+
agent.fork_session(req)
199+
})
200+
.await
201+
}
202+
AgentMethod::SessionResume => {
203+
handle_request(&msg, nats, |req: ResumeSessionRequest| {
204+
agent.resume_session(req)
205+
})
206+
.await
207+
}
208+
AgentMethod::SessionClose => {
209+
handle_request(&msg, nats, |req: CloseSessionRequest| {
210+
agent.close_session(req)
211+
})
212+
.await
213+
}
214+
AgentMethod::Ext(_) => {
215+
handle_request(&msg, nats, |req: ExtRequest| agent.ext_method(req)).await
216+
}
217+
};
218+
219+
if let Err(e) = result {
220+
warn!(
221+
subject,
222+
session_id = parsed.session_id.as_ref().map(|s| s.as_str()).unwrap_or("-"),
223+
error = %e,
224+
"Error handling agent request"
225+
);
226+
}
227+
}
228+
229+
async fn handle_request<N, Resp, ReqT, F>(
230+
msg: &Message,
231+
nats: &N,
232+
handler: impl FnOnce(ReqT) -> F,
233+
) -> std::result::Result<(), String>
234+
where
235+
N: PublishClient,
236+
ReqT: serde::de::DeserializeOwned,
237+
F: std::future::Future<Output = agent_client_protocol::Result<Resp>>,
238+
Resp: serde::Serialize,
239+
{
240+
let reply_to = msg
241+
.reply
242+
.as_ref()
243+
.ok_or_else(|| "no reply subject".to_string())?;
244+
245+
let request: ReqT =
246+
serde_json::from_slice(&msg.payload).map_err(|e| format!("deserialize request: {}", e))?;
247+
248+
let response = handler(request).await;
249+
250+
let response_bytes: Bytes = match response {
251+
Ok(resp) => serde_json::to_vec(&resp)
252+
.map_err(|e| format!("serialize response: {}", e))?
253+
.into(),
254+
Err(err) => serde_json::to_vec(&err)
255+
.map_err(|e| format!("serialize error: {}", e))?
256+
.into(),
257+
};
258+
259+
nats.publish_with_headers(
260+
reply_to.to_string(),
261+
async_nats::HeaderMap::new(),
262+
response_bytes,
263+
)
264+
.await
265+
.map_err(|e| format!("respond: {}", e))
266+
}
267+
268+
async fn handle_notification<ReqT, F>(
269+
msg: &Message,
270+
handler: impl FnOnce(ReqT) -> F,
271+
) -> std::result::Result<(), String>
272+
where
273+
ReqT: serde::de::DeserializeOwned,
274+
F: std::future::Future<Output = agent_client_protocol::Result<()>>,
275+
{
276+
let request: ReqT = serde_json::from_slice(&msg.payload)
277+
.map_err(|e| format!("deserialize notification: {}", e))?;
278+
279+
handler(request)
280+
.await
281+
.map_err(|e| format!("notification handler: {}", e))
282+
}

0 commit comments

Comments
 (0)