Skip to content

Commit 99fda4a

Browse files
feat(acp-runner): add set_session_model handler to RpcServer
Subscribe to {prefix}.*.agent.session.set_model and persist the model override to SessionState in KV, matching the existing set_session_mode pattern.
1 parent f6f973a commit 99fda4a

4 files changed

Lines changed: 44 additions & 35 deletions

File tree

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::error::map_nats_error;
3-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
3+
use crate::nats::{self, RequestClient, agent};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
66
use tracing::{Span, info, instrument};
@@ -11,7 +11,7 @@ use trogon_std::time::GetElapsed;
1111
skip(bridge, args),
1212
fields(session_id = %args.session_id, new_session_id = tracing::field::Empty)
1313
)]
14-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
14+
pub async fn handle<N: RequestClient, C: GetElapsed>(
1515
bridge: &Bridge<N, C>,
1616
args: ForkSessionRequest,
1717
) -> Result<ForkSessionResponse> {
@@ -44,7 +44,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
4444
Span::current().record("new_session_id", response.session_id.to_string().as_str());
4545
info!(new_session_id = %response.session_id, "Session forked");
4646

47-
bridge.schedule_session_ready(response.session_id.clone());
4847
}
4948

5049
bridge.metrics.record_request(

rsworkspace/crates/acp-nats/src/agent/resume_session.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::error::map_nats_error;
3-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
3+
use crate::nats::{self, RequestClient, agent};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{
66
Error, ErrorCode, Result, ResumeSessionRequest, ResumeSessionResponse,
@@ -13,7 +13,7 @@ use trogon_std::time::GetElapsed;
1313
skip(bridge, args),
1414
fields(session_id = %args.session_id)
1515
)]
16-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
16+
pub async fn handle<N: RequestClient, C: GetElapsed>(
1717
bridge: &Bridge<N, C>,
1818
args: ResumeSessionRequest,
1919
) -> Result<ResumeSessionResponse> {
@@ -42,10 +42,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
4242
.await
4343
.map_err(map_nats_error);
4444

45-
if result.is_ok() {
46-
bridge.schedule_session_ready(args.session_id.clone());
47-
}
48-
4945
bridge.metrics.record_request(
5046
"resume_session",
5147
bridge.clock.elapsed(start).as_secs_f64(),

rsworkspace/crates/acp-nats/src/nats/subjects.rs

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ pub mod agent {
4747
format!("{}.{}.agent.session.resume", prefix, session_id)
4848
}
4949

50+
pub fn session_close(prefix: &str, session_id: &str) -> String {
51+
format!("{}.{}.agent.session.close", prefix, session_id)
52+
}
53+
5054
pub fn ext_session_ready(prefix: &str, session_id: &str) -> String {
5155
format!("{}.{}.agent.ext.session.ready", prefix, session_id)
5256
}
@@ -70,30 +74,6 @@ pub mod agent {
7074
)
7175
}
7276

73-
pub fn session_list(prefix: &str) -> String {
74-
format!("{}.agent.session.list", prefix)
75-
}
76-
77-
pub fn session_set_config_option(prefix: &str, session_id: &str) -> String {
78-
format!("{}.{}.agent.session.set_config_option", prefix, session_id)
79-
}
80-
81-
pub fn session_set_model(prefix: &str, session_id: &str) -> String {
82-
format!("{}.{}.agent.session.set_model", prefix, session_id)
83-
}
84-
85-
pub fn session_fork(prefix: &str, session_id: &str) -> String {
86-
format!("{}.{}.agent.session.fork", prefix, session_id)
87-
}
88-
89-
pub fn session_resume(prefix: &str, session_id: &str) -> String {
90-
format!("{}.{}.agent.session.resume", prefix, session_id)
91-
}
92-
93-
pub fn session_close(prefix: &str, session_id: &str) -> String {
94-
format!("{}.{}.agent.session.close", prefix, session_id)
95-
}
96-
9777
/// Alias for `session_prompt` — used by the runner crate.
9878
pub fn prompt(prefix: &str, session_id: &str) -> String {
9979
session_prompt(prefix, session_id)

rsworkspace/crates/trogon-acp-runner/src/rpc_server.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! NATS request and deserialises the reply. This module is the other side of
55
//! those requests, implementing the actual agent logic for:
66
//! initialize · authenticate · new_session · load_session
7-
//! set_session_mode · set_session_config_option
7+
//! set_session_mode · set_session_model · set_session_config_option
88
//! list_sessions · fork_session · resume_session
99
//!
1010
//! `prompt` / `cancel` are handled by `runner.rs` via the streaming pub/sub
@@ -19,7 +19,7 @@ use agent_client_protocol::{
1919
ResumeSessionRequest, ResumeSessionResponse, SessionCapabilities, SessionForkCapabilities,
2020
SessionInfo, SessionListCapabilities, SessionResumeCapabilities,
2121
SetSessionConfigOptionRequest, SetSessionConfigOptionResponse, SetSessionModeRequest,
22-
SetSessionModeResponse,
22+
SetSessionModeResponse, SetSessionModelRequest, SetSessionModelResponse,
2323
};
2424
use acp_nats::nats::{ExtSessionReady, agent as subjects};
2525
use futures_util::StreamExt;
@@ -108,6 +108,10 @@ impl RpcServer {
108108
.nats
109109
.subscribe(format!("{}.*.agent.session.set_mode", prefix))
110110
.await?;
111+
let mut set_model_sub = self
112+
.nats
113+
.subscribe(format!("{}.*.agent.session.set_model", prefix))
114+
.await?;
111115
let mut set_config_sub = self
112116
.nats
113117
.subscribe(format!("{}.*.agent.session.set_config_option", prefix))
@@ -146,6 +150,10 @@ impl RpcServer {
146150
let Some(msg) = msg else { break; };
147151
self.handle_set_session_mode(msg).await;
148152
}
153+
msg = set_model_sub.next() => {
154+
let Some(msg) = msg else { break; };
155+
self.handle_set_session_model(msg).await;
156+
}
149157
msg = set_config_sub.next() => {
150158
let Some(msg) = msg else { break; };
151159
self.handle_set_session_config_option(msg).await;
@@ -271,6 +279,32 @@ impl RpcServer {
271279
self.reply(&msg, &SetSessionModeResponse::new()).await;
272280
}
273281

282+
async fn handle_set_session_model(&self, msg: async_nats::Message) {
283+
let request: SetSessionModelRequest = match serde_json::from_slice(&msg.payload) {
284+
Ok(r) => r,
285+
Err(e) => {
286+
warn!(error = %e, "rpc: bad set_session_model payload");
287+
return;
288+
}
289+
};
290+
291+
let session_id = request.session_id.to_string();
292+
match self.store.load(&session_id).await {
293+
Ok(mut state) => {
294+
state.model = Some(request.model_id.to_string());
295+
state.updated_at = now_iso8601();
296+
if let Err(e) = self.store.save(&session_id, &state).await {
297+
warn!(session_id = %session_id, error = %e, "rpc: failed to persist model update");
298+
}
299+
}
300+
Err(e) => {
301+
warn!(session_id = %session_id, error = %e, "rpc: failed to load session for model update");
302+
}
303+
}
304+
305+
self.reply(&msg, &SetSessionModelResponse::new()).await;
306+
}
307+
274308
async fn handle_set_session_config_option(&self, msg: async_nats::Message) {
275309
let _request: SetSessionConfigOptionRequest = match serde_json::from_slice(&msg.payload) {
276310
Ok(r) => r,

0 commit comments

Comments
 (0)