Skip to content

Commit 21afc43

Browse files
committed
refactor(acp-nats): split AgentMethod into GlobalAgentMethod and SessionAgentMethod
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 43f9048 commit 21afc43

3 files changed

Lines changed: 238 additions & 131 deletions

File tree

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use acp_nats::acp_prefix::AcpPrefix;
22
use acp_nats::client_proxy::NatsClientProxy;
3-
use acp_nats::nats::{AgentMethod, parse_agent_subject};
3+
use acp_nats::nats::{
4+
GlobalAgentMethod, ParsedAgentSubject, SessionAgentMethod, parse_agent_subject,
5+
};
46
use acp_nats::session_id::AcpSessionId;
57
use agent_client_protocol::{
68
Agent, AuthenticateRequest, CancelNotification, CloseSessionRequest, ExtNotification,
@@ -163,89 +165,114 @@ async fn dispatch_message<N: PublishClient + FlushClient, A: Agent>(
163165
None => return,
164166
};
165167

166-
let result = match parsed.method {
167-
AgentMethod::Initialize => {
168-
handle_request(&msg, nats, |req: InitializeRequest| agent.initialize(req)).await
168+
let (result, session_id) = match parsed {
169+
ParsedAgentSubject::Global(method) => {
170+
let r = dispatch_global(method, &msg, agent, nats).await;
171+
(r, None)
172+
}
173+
ParsedAgentSubject::Session { session_id, method } => {
174+
let r = dispatch_session(method, &msg, agent, nats).await;
175+
(r, Some(session_id))
169176
}
170-
AgentMethod::Authenticate => {
171-
handle_request(&msg, nats, |req: AuthenticateRequest| {
177+
};
178+
179+
if let Err(e) = result {
180+
let sid = session_id.as_ref().map(|s| s.as_str()).unwrap_or("-");
181+
warn!(subject, session_id = sid, error = %e, "Error handling agent request");
182+
}
183+
}
184+
185+
async fn dispatch_global<N: PublishClient + FlushClient, A: Agent>(
186+
method: GlobalAgentMethod,
187+
msg: &Message,
188+
agent: &A,
189+
nats: &N,
190+
) -> Result<(), DispatchError> {
191+
match method {
192+
GlobalAgentMethod::Initialize => {
193+
handle_request(msg, nats, |req: InitializeRequest| agent.initialize(req)).await
194+
}
195+
GlobalAgentMethod::Authenticate => {
196+
handle_request(msg, nats, |req: AuthenticateRequest| {
172197
agent.authenticate(req)
173198
})
174199
.await
175200
}
176-
AgentMethod::SessionNew => {
177-
handle_request(&msg, nats, |req: NewSessionRequest| agent.new_session(req)).await
201+
GlobalAgentMethod::SessionNew => {
202+
handle_request(msg, nats, |req: NewSessionRequest| agent.new_session(req)).await
178203
}
179-
AgentMethod::SessionList => {
180-
handle_request(&msg, nats, |req: ListSessionsRequest| {
204+
GlobalAgentMethod::SessionList => {
205+
handle_request(msg, nats, |req: ListSessionsRequest| {
181206
agent.list_sessions(req)
182207
})
183208
.await
184209
}
185-
AgentMethod::SessionLoad => {
186-
handle_request(&msg, nats, |req: LoadSessionRequest| {
187-
agent.load_session(req)
188-
})
189-
.await
210+
GlobalAgentMethod::Ext(_) => {
211+
if msg.reply.is_some() {
212+
handle_request(msg, nats, |req: ExtRequest| agent.ext_method(req)).await
213+
} else {
214+
handle_notification(msg, |req: ExtNotification| agent.ext_notification(req)).await
215+
}
216+
}
217+
}
218+
}
219+
220+
async fn dispatch_session<N: PublishClient + FlushClient, A: Agent>(
221+
method: SessionAgentMethod,
222+
msg: &Message,
223+
agent: &A,
224+
nats: &N,
225+
) -> Result<(), DispatchError> {
226+
match method {
227+
SessionAgentMethod::Load => {
228+
handle_request(msg, nats, |req: LoadSessionRequest| agent.load_session(req)).await
190229
}
191-
AgentMethod::SessionPrompt => {
192-
handle_request(&msg, nats, |req: PromptRequest| agent.prompt(req)).await
230+
SessionAgentMethod::Prompt => {
231+
handle_request(msg, nats, |req: PromptRequest| agent.prompt(req)).await
193232
}
194-
AgentMethod::SessionCancel => {
195-
handle_notification(&msg, |req: CancelNotification| agent.cancel(req)).await
233+
SessionAgentMethod::Cancel => {
234+
handle_notification(msg, |req: CancelNotification| agent.cancel(req)).await
196235
}
197-
AgentMethod::SessionSetMode => {
198-
handle_request(&msg, nats, |req: SetSessionModeRequest| {
236+
SessionAgentMethod::SetMode => {
237+
handle_request(msg, nats, |req: SetSessionModeRequest| {
199238
agent.set_session_mode(req)
200239
})
201240
.await
202241
}
203-
AgentMethod::SessionSetConfigOption => {
204-
handle_request(&msg, nats, |req: SetSessionConfigOptionRequest| {
242+
SessionAgentMethod::SetConfigOption => {
243+
handle_request(msg, nats, |req: SetSessionConfigOptionRequest| {
205244
agent.set_session_config_option(req)
206245
})
207246
.await
208247
}
209-
AgentMethod::SessionSetModel => {
210-
handle_request(&msg, nats, |req: SetSessionModelRequest| {
248+
SessionAgentMethod::SetModel => {
249+
handle_request(msg, nats, |req: SetSessionModelRequest| {
211250
agent.set_session_model(req)
212251
})
213252
.await
214253
}
215-
AgentMethod::SessionFork => {
216-
handle_request(&msg, nats, |req: ForkSessionRequest| {
217-
agent.fork_session(req)
218-
})
219-
.await
254+
SessionAgentMethod::Fork => {
255+
handle_request(msg, nats, |req: ForkSessionRequest| agent.fork_session(req)).await
220256
}
221-
AgentMethod::SessionResume => {
222-
handle_request(&msg, nats, |req: ResumeSessionRequest| {
257+
SessionAgentMethod::Resume => {
258+
handle_request(msg, nats, |req: ResumeSessionRequest| {
223259
agent.resume_session(req)
224260
})
225261
.await
226262
}
227-
AgentMethod::SessionClose => {
228-
handle_request(&msg, nats, |req: CloseSessionRequest| {
263+
SessionAgentMethod::Close => {
264+
handle_request(msg, nats, |req: CloseSessionRequest| {
229265
agent.close_session(req)
230266
})
231267
.await
232268
}
233-
AgentMethod::Ext(_) => {
269+
SessionAgentMethod::Ext(_) => {
234270
if msg.reply.is_some() {
235-
handle_request(&msg, nats, |req: ExtRequest| agent.ext_method(req)).await
271+
handle_request(msg, nats, |req: ExtRequest| agent.ext_method(req)).await
236272
} else {
237-
handle_notification(&msg, |req: ExtNotification| agent.ext_notification(req)).await
273+
handle_notification(msg, |req: ExtNotification| agent.ext_notification(req)).await
238274
}
239275
}
240-
};
241-
242-
if let Err(e) = result {
243-
let sid = parsed
244-
.session_id
245-
.as_ref()
246-
.map(|s| s.as_str())
247-
.unwrap_or("-");
248-
warn!(subject, session_id = sid, error = %e, "Error handling agent request");
249276
}
250277
}
251278

rsworkspace/crates/acp-nats/src/nats/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ pub(crate) mod token;
55

66
pub use extensions::ExtSessionReady;
77
pub use parsing::{
8-
AgentMethod, ClientMethod, ParsedAgentSubject, ParsedClientSubject, parse_agent_subject,
9-
parse_client_subject,
8+
ClientMethod, GlobalAgentMethod, ParsedAgentSubject, ParsedClientSubject, SessionAgentMethod,
9+
parse_agent_subject, parse_client_subject,
1010
};
1111
pub use subjects::{agent, session};
1212
pub use trogon_nats::{

0 commit comments

Comments
 (0)