Skip to content

Commit 43f9048

Browse files
authored
refactor(acp-nats, acp-nats-agent): narrow JetStream scope and add ExtStreamPolicy (#66)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 71f69c6 commit 43f9048

File tree

6 files changed

+82
-147
lines changed

6 files changed

+82
-147
lines changed

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 0 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use futures::future::LocalBoxFuture;
1616
use std::rc::Rc;
1717
use std::time::Duration;
1818
use tracing::{info, warn};
19-
#[cfg(not(coverage))]
20-
use trogon_nats::jetstream::JsMessage;
2119
use trogon_nats::{FlushClient, PublishClient, RequestClient, SubscribeClient};
2220

2321
pub enum ConnectionError {
@@ -315,137 +313,6 @@ where
315313
.map_err(DispatchError::NotificationHandler)
316314
}
317315

318-
/// JetStream-aware dispatch: receives a `JsMessage`, dispatches to the agent,
319-
/// and signals ack/term based on the outcome.
320-
///
321-
/// - Unknown subject → `term()` (no redelivery)
322-
/// - Deserialize failure → reply error + `term()` (bad payload won't fix itself)
323-
/// - Handler success → reply + `ack()`
324-
/// - Handler error → reply error + `ack()` (application-level error, not transient)
325-
/// - Notification handler → `ack()`
326-
#[cfg(not(coverage))]
327-
#[allow(dead_code)] // Will be used when JetStream serve path is wired up
328-
async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent>(
329-
js_msg: &JsMessage,
330-
agent: &A,
331-
nats: &N,
332-
) {
333-
let msg = js_msg.message();
334-
let subject = msg.subject.as_str();
335-
336-
let parsed = match parse_agent_subject(subject) {
337-
Some(p) => p,
338-
None => {
339-
if let Err(e) = js_msg.term().await {
340-
warn!(error = %e, subject, "Failed to term unknown subject");
341-
}
342-
return;
343-
}
344-
};
345-
346-
let result = match parsed.method {
347-
AgentMethod::Initialize => {
348-
handle_request(msg, nats, |req: InitializeRequest| agent.initialize(req)).await
349-
}
350-
AgentMethod::Authenticate => {
351-
handle_request(msg, nats, |req: AuthenticateRequest| {
352-
agent.authenticate(req)
353-
})
354-
.await
355-
}
356-
AgentMethod::SessionNew => {
357-
handle_request(msg, nats, |req: NewSessionRequest| agent.new_session(req)).await
358-
}
359-
AgentMethod::SessionList => {
360-
handle_request(msg, nats, |req: ListSessionsRequest| {
361-
agent.list_sessions(req)
362-
})
363-
.await
364-
}
365-
AgentMethod::SessionLoad => {
366-
handle_request(msg, nats, |req: LoadSessionRequest| agent.load_session(req)).await
367-
}
368-
AgentMethod::SessionPrompt => {
369-
handle_request(msg, nats, |req: PromptRequest| agent.prompt(req)).await
370-
}
371-
AgentMethod::SessionCancel => {
372-
handle_notification(msg, |req: CancelNotification| agent.cancel(req)).await
373-
}
374-
AgentMethod::SessionSetMode => {
375-
handle_request(msg, nats, |req: SetSessionModeRequest| {
376-
agent.set_session_mode(req)
377-
})
378-
.await
379-
}
380-
AgentMethod::SessionSetConfigOption => {
381-
handle_request(msg, nats, |req: SetSessionConfigOptionRequest| {
382-
agent.set_session_config_option(req)
383-
})
384-
.await
385-
}
386-
AgentMethod::SessionSetModel => {
387-
handle_request(msg, nats, |req: SetSessionModelRequest| {
388-
agent.set_session_model(req)
389-
})
390-
.await
391-
}
392-
AgentMethod::SessionFork => {
393-
handle_request(msg, nats, |req: ForkSessionRequest| agent.fork_session(req)).await
394-
}
395-
AgentMethod::SessionResume => {
396-
handle_request(msg, nats, |req: ResumeSessionRequest| {
397-
agent.resume_session(req)
398-
})
399-
.await
400-
}
401-
AgentMethod::SessionClose => {
402-
handle_request(msg, nats, |req: CloseSessionRequest| {
403-
agent.close_session(req)
404-
})
405-
.await
406-
}
407-
AgentMethod::Ext(_) => {
408-
if msg.reply.is_some() {
409-
handle_request(msg, nats, |req: ExtRequest| agent.ext_method(req)).await
410-
} else {
411-
handle_notification(msg, |req: ExtNotification| agent.ext_notification(req)).await
412-
}
413-
}
414-
};
415-
416-
match &result {
417-
Ok(()) => {
418-
if let Err(e) = js_msg.ack().await {
419-
warn!(subject, error = %e, "Failed to ack JetStream message");
420-
}
421-
}
422-
Err(DispatchError::DeserializeRequest(_) | DispatchError::DeserializeNotification(_)) => {
423-
if let Err(e) = js_msg.term().await {
424-
warn!(subject, error = %e, "Failed to term bad payload");
425-
}
426-
}
427-
Err(DispatchError::NoReplySubject) => {
428-
if let Err(e) = js_msg.term().await {
429-
warn!(subject, error = %e, "Failed to term missing reply subject");
430-
}
431-
}
432-
Err(_) => {
433-
if let Err(e) = js_msg.ack().await {
434-
warn!(subject, error = %e, "Failed to ack after handler error");
435-
}
436-
}
437-
}
438-
439-
if let Err(e) = result {
440-
let sid = parsed
441-
.session_id
442-
.as_ref()
443-
.map(|s| s.as_str())
444-
.unwrap_or("-");
445-
warn!(subject, session_id = sid, error = %e, "Error handling agent request");
446-
}
447-
}
448-
449316
#[cfg(test)]
450317
mod tests {
451318
use super::*;

rsworkspace/crates/acp-nats/src/constants.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,3 @@ pub const CONTENT_TYPE_PLAIN: &str = "text/plain";
3232

3333
pub const SESSION_ID_HEADER: &str = "X-Session-Id";
3434
pub const CAUSATION_ID_HEADER: &str = "X-Causation-Id";
35-
pub const ENV_JETSTREAM_ENABLED: &str = "ACP_JETSTREAM_ENABLED";

rsworkspace/crates/acp-nats/src/jetstream/consumers.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ pub fn prompt_response_consumer(prefix: &str, session_id: &str, req_id: &str) ->
2121
}
2222
}
2323

24-
/// Consumer for the COMMANDS stream that receives all commands.
24+
/// Observer consumer for the COMMANDS stream.
2525
///
26-
/// No filter needed — the COMMANDS stream already contains only command subjects.
27-
/// The stream-level subject list acts as the filter.
28-
pub fn runner_commands_consumer() -> Config {
26+
/// Acks messages for audit persistence. No filter needed — the stream-level
27+
/// subject list already scopes to session-scoped commands only.
28+
pub fn commands_observer() -> Config {
2929
Config {
3030
deliver_policy: DeliverPolicy::All,
3131
ack_policy: AckPolicy::Explicit,
@@ -65,15 +65,15 @@ mod tests {
6565
}
6666

6767
#[test]
68-
fn runner_commands_consumer_delivers_all() {
69-
let config = runner_commands_consumer();
68+
fn commands_observer_delivers_all() {
69+
let config = commands_observer();
7070
assert_eq!(config.deliver_policy, DeliverPolicy::All);
7171
assert_eq!(config.ack_policy, AckPolicy::Explicit);
7272
}
7373

7474
#[test]
75-
fn runner_commands_consumer_no_filter() {
76-
let config = runner_commands_consumer();
75+
fn commands_observer_no_filter() {
76+
let config = commands_observer();
7777
assert_eq!(config.filter_subject, String::new());
7878
}
7979

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use crate::ext_method_name::ExtMethodName;
2+
3+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4+
pub enum ExtPersistence {
5+
Stream,
6+
Ephemeral,
7+
}
8+
9+
pub trait ExtStreamPolicy: Send + Sync {
10+
fn persistence(&self, method: &ExtMethodName) -> ExtPersistence;
11+
}
12+
13+
pub struct DefaultExtStreamPolicy;
14+
15+
impl ExtStreamPolicy for DefaultExtStreamPolicy {
16+
fn persistence(&self, _method: &ExtMethodName) -> ExtPersistence {
17+
ExtPersistence::Stream
18+
}
19+
}
20+
21+
#[cfg(test)]
22+
mod tests {
23+
use super::*;
24+
25+
#[test]
26+
fn default_policy_persists_everything() {
27+
let policy = DefaultExtStreamPolicy;
28+
let method = ExtMethodName::new("tool_call").unwrap();
29+
assert_eq!(policy.persistence(&method), ExtPersistence::Stream);
30+
}
31+
32+
#[test]
33+
fn custom_policy() {
34+
struct SelectivePolicy;
35+
impl ExtStreamPolicy for SelectivePolicy {
36+
fn persistence(&self, method: &ExtMethodName) -> ExtPersistence {
37+
match method.as_str() {
38+
"heartbeat" | "ping" => ExtPersistence::Ephemeral,
39+
_ => ExtPersistence::Stream,
40+
}
41+
}
42+
}
43+
44+
let policy = SelectivePolicy;
45+
assert_eq!(
46+
policy.persistence(&ExtMethodName::new("heartbeat").unwrap()),
47+
ExtPersistence::Ephemeral
48+
);
49+
assert_eq!(
50+
policy.persistence(&ExtMethodName::new("ping").unwrap()),
51+
ExtPersistence::Ephemeral
52+
);
53+
assert_eq!(
54+
policy.persistence(&ExtMethodName::new("tool_call").unwrap()),
55+
ExtPersistence::Stream
56+
);
57+
}
58+
59+
#[test]
60+
fn ext_persistence_debug() {
61+
assert_eq!(format!("{:?}", ExtPersistence::Stream), "Stream");
62+
assert_eq!(format!("{:?}", ExtPersistence::Ephemeral), "Ephemeral");
63+
}
64+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod consumers;
2+
pub mod ext_policy;
23
pub mod provision;
34
pub mod streams;

rsworkspace/crates/acp-nats/src/jetstream/streams.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub fn commands_config(prefix: &str) -> Config {
1313
Config {
1414
name: stream_name(prefix, "COMMANDS"),
1515
subjects: vec![
16-
format!("{prefix}.agent.>"),
1716
format!("{prefix}.session.*.agent.prompt"),
1817
format!("{prefix}.session.*.agent.cancel"),
1918
format!("{prefix}.session.*.agent.load"),
@@ -23,7 +22,6 @@ pub fn commands_config(prefix: &str) -> Config {
2322
format!("{prefix}.session.*.agent.fork"),
2423
format!("{prefix}.session.*.agent.resume"),
2524
format!("{prefix}.session.*.agent.close"),
26-
format!("{prefix}.session.*.agent.ext.>"),
2725
],
2826
storage: StorageType::File,
2927
retention: RetentionPolicy::Limits,
@@ -101,9 +99,9 @@ mod tests {
10199
}
102100

103101
#[test]
104-
fn commands_subjects_include_global_and_session() {
102+
fn commands_subjects_are_session_scoped_only() {
105103
let config = commands_config("acp");
106-
assert!(config.subjects.contains(&"acp.agent.>".to_string()));
104+
assert!(!config.subjects.contains(&"acp.agent.>".to_string()));
107105
assert!(
108106
config
109107
.subjects
@@ -117,10 +115,16 @@ mod tests {
117115
assert!(
118116
config
119117
.subjects
120-
.contains(&"acp.session.*.agent.ext.>".to_string())
118+
.contains(&"acp.session.*.agent.close".to_string())
121119
);
122120
}
123121

122+
#[test]
123+
fn commands_excludes_ext_subjects() {
124+
let config = commands_config("acp");
125+
assert!(!config.subjects.iter().any(|s| s.contains("ext")));
126+
}
127+
124128
#[test]
125129
fn responses_subjects() {
126130
let config = responses_config("acp");

0 commit comments

Comments
 (0)