Skip to content

Commit 71f69c6

Browse files
authored
feat(trogon-nats, acp-nats): add JetStream integration foundation (#65)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent ef26c8c commit 71f69c6

33 files changed

Lines changed: 1477 additions & 37 deletions

rsworkspace/Cargo.lock

Lines changed: 75 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/acp-nats-agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ agent-client-protocol = { workspace = true, features = [
1919
"unstable_session_resume",
2020
"unstable_session_usage",
2121
] }
22-
async-nats = { workspace = true }
22+
async-nats = { workspace = true, features = ["jetstream"] }
2323
async-trait = { workspace = true }
2424
bytes = { workspace = true }
2525
futures = { workspace = true }

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use futures::future::LocalBoxFuture;
1616
use std::rc::Rc;
1717
use std::time::Duration;
1818
use tracing::{info, warn};
19+
#[cfg(not(coverage))]
20+
use trogon_nats::jetstream::JsMessage;
1921
use trogon_nats::{FlushClient, PublishClient, RequestClient, SubscribeClient};
2022

2123
pub enum ConnectionError {
@@ -313,6 +315,137 @@ where
313315
.map_err(DispatchError::NotificationHandler)
314316
}
315317

318+
/// JetStream-aware dispatch: receives a `JsMessage`, dispatches to the agent,
319+
/// and signals ack/term based on the outcome.
320+
///
321+
/// - Unknown subject → `term()` (no redelivery)
322+
/// - Deserialize failure → reply error + `term()` (bad payload won't fix itself)
323+
/// - Handler success → reply + `ack()`
324+
/// - Handler error → reply error + `ack()` (application-level error, not transient)
325+
/// - Notification handler → `ack()`
326+
#[cfg(not(coverage))]
327+
#[allow(dead_code)] // Will be used when JetStream serve path is wired up
328+
async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent>(
329+
js_msg: &JsMessage,
330+
agent: &A,
331+
nats: &N,
332+
) {
333+
let msg = js_msg.message();
334+
let subject = msg.subject.as_str();
335+
336+
let parsed = match parse_agent_subject(subject) {
337+
Some(p) => p,
338+
None => {
339+
if let Err(e) = js_msg.term().await {
340+
warn!(error = %e, subject, "Failed to term unknown subject");
341+
}
342+
return;
343+
}
344+
};
345+
346+
let result = match parsed.method {
347+
AgentMethod::Initialize => {
348+
handle_request(msg, nats, |req: InitializeRequest| agent.initialize(req)).await
349+
}
350+
AgentMethod::Authenticate => {
351+
handle_request(msg, nats, |req: AuthenticateRequest| {
352+
agent.authenticate(req)
353+
})
354+
.await
355+
}
356+
AgentMethod::SessionNew => {
357+
handle_request(msg, nats, |req: NewSessionRequest| agent.new_session(req)).await
358+
}
359+
AgentMethod::SessionList => {
360+
handle_request(msg, nats, |req: ListSessionsRequest| {
361+
agent.list_sessions(req)
362+
})
363+
.await
364+
}
365+
AgentMethod::SessionLoad => {
366+
handle_request(msg, nats, |req: LoadSessionRequest| agent.load_session(req)).await
367+
}
368+
AgentMethod::SessionPrompt => {
369+
handle_request(msg, nats, |req: PromptRequest| agent.prompt(req)).await
370+
}
371+
AgentMethod::SessionCancel => {
372+
handle_notification(msg, |req: CancelNotification| agent.cancel(req)).await
373+
}
374+
AgentMethod::SessionSetMode => {
375+
handle_request(msg, nats, |req: SetSessionModeRequest| {
376+
agent.set_session_mode(req)
377+
})
378+
.await
379+
}
380+
AgentMethod::SessionSetConfigOption => {
381+
handle_request(msg, nats, |req: SetSessionConfigOptionRequest| {
382+
agent.set_session_config_option(req)
383+
})
384+
.await
385+
}
386+
AgentMethod::SessionSetModel => {
387+
handle_request(msg, nats, |req: SetSessionModelRequest| {
388+
agent.set_session_model(req)
389+
})
390+
.await
391+
}
392+
AgentMethod::SessionFork => {
393+
handle_request(msg, nats, |req: ForkSessionRequest| agent.fork_session(req)).await
394+
}
395+
AgentMethod::SessionResume => {
396+
handle_request(msg, nats, |req: ResumeSessionRequest| {
397+
agent.resume_session(req)
398+
})
399+
.await
400+
}
401+
AgentMethod::SessionClose => {
402+
handle_request(msg, nats, |req: CloseSessionRequest| {
403+
agent.close_session(req)
404+
})
405+
.await
406+
}
407+
AgentMethod::Ext(_) => {
408+
if msg.reply.is_some() {
409+
handle_request(msg, nats, |req: ExtRequest| agent.ext_method(req)).await
410+
} else {
411+
handle_notification(msg, |req: ExtNotification| agent.ext_notification(req)).await
412+
}
413+
}
414+
};
415+
416+
match &result {
417+
Ok(()) => {
418+
if let Err(e) = js_msg.ack().await {
419+
warn!(subject, error = %e, "Failed to ack JetStream message");
420+
}
421+
}
422+
Err(DispatchError::DeserializeRequest(_) | DispatchError::DeserializeNotification(_)) => {
423+
if let Err(e) = js_msg.term().await {
424+
warn!(subject, error = %e, "Failed to term bad payload");
425+
}
426+
}
427+
Err(DispatchError::NoReplySubject) => {
428+
if let Err(e) = js_msg.term().await {
429+
warn!(subject, error = %e, "Failed to term missing reply subject");
430+
}
431+
}
432+
Err(_) => {
433+
if let Err(e) = js_msg.ack().await {
434+
warn!(subject, error = %e, "Failed to ack after handler error");
435+
}
436+
}
437+
}
438+
439+
if let Err(e) = result {
440+
let sid = parsed
441+
.session_id
442+
.as_ref()
443+
.map(|s| s.as_str())
444+
.unwrap_or("-");
445+
warn!(subject, session_id = sid, error = %e, "Error handling agent request");
446+
}
447+
}
448+
316449
#[cfg(test)]
317450
mod tests {
318451
use super::*;

rsworkspace/crates/acp-nats/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ agent-client-protocol = { workspace = true, features = [
1919
"unstable_session_usage",
2020
] }
2121
opentelemetry = { workspace = true }
22-
async-nats = { workspace = true }
22+
async-nats = { workspace = true, features = ["jetstream"] }
2323
async-trait = { workspace = true }
2424
bytes = { workspace = true }
2525
futures = { workspace = true }

rsworkspace/crates/acp-nats/src/agent/authenticate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use trogon_std::time::GetElapsed;
1010
skip(bridge, args),
1111
fields(method_id = %args.method_id)
1212
)]
13-
pub async fn handle<N: RequestClient, C: GetElapsed>(
14-
bridge: &Bridge<N, C>,
13+
pub async fn handle<N: RequestClient, C: GetElapsed, J>(
14+
bridge: &Bridge<N, C, J>,
1515
args: AuthenticateRequest,
1616
) -> Result<AuthenticateResponse> {
1717
let start = bridge.clock.now();

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ use opentelemetry::metrics::Meter;
2121
use tokio::sync::mpsc;
2222
use tokio::task::JoinHandle;
2323
use tracing::{info, warn};
24+
#[cfg(not(coverage))]
25+
#[allow(unused_imports)]
26+
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
2427
use trogon_std::time::GetElapsed;
2528

2629
use super::{
@@ -31,8 +34,10 @@ use super::{
3134

3235
use crate::constants::SESSION_READY_DELAY;
3336

34-
pub struct Bridge<N, C: GetElapsed> {
37+
pub struct Bridge<N, C: GetElapsed, J = ()> {
3538
pub(crate) nats: N,
39+
#[allow(dead_code)] // Used in prompt.rs JetStream path
40+
pub(crate) js: Option<J>,
3641
pub(crate) clock: C,
3742
pub(crate) config: Config,
3843
pub(crate) metrics: Metrics,
@@ -51,6 +56,30 @@ impl<N, C: GetElapsed> Bridge<N, C> {
5156
) -> Self {
5257
Self {
5358
nats,
59+
js: None,
60+
clock,
61+
config,
62+
metrics: Metrics::new(meter),
63+
notification_sender,
64+
pending_session_prompt_responses: PendingSessionPromptResponseWaiters::new(),
65+
background_tasks: RefCell::new(Vec::new()),
66+
}
67+
}
68+
}
69+
70+
impl<N, C: GetElapsed, J> Bridge<N, C, J> {
71+
#[cfg(not(coverage))]
72+
pub fn with_jetstream(
73+
nats: N,
74+
js: J,
75+
clock: C,
76+
meter: &Meter,
77+
config: Config,
78+
notification_sender: mpsc::Sender<SessionNotification>,
79+
) -> Self {
80+
Self {
81+
nats,
82+
js: Some(js),
5483
clock,
5584
config,
5685
metrics: Metrics::new(meter),
@@ -64,6 +93,12 @@ impl<N, C: GetElapsed> Bridge<N, C> {
6493
&self.nats
6594
}
6695

96+
#[cfg(not(coverage))]
97+
#[allow(dead_code)]
98+
pub(crate) fn js(&self) -> Option<&J> {
99+
self.js.as_ref()
100+
}
101+
67102
pub(crate) fn spawn_background(&self, task: JoinHandle<()>) {
68103
self.background_tasks.borrow_mut().push(task);
69104
}
@@ -76,7 +111,7 @@ impl<N, C: GetElapsed> Bridge<N, C> {
76111
}
77112
}
78113

79-
impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed> Bridge<N, C> {
114+
impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed, J> Bridge<N, C, J> {
80115
pub(crate) fn schedule_session_ready(&self, session_id: SessionId) {
81116
let nats = self.nats.clone();
82117
let prefix = self.config.acp_prefix().to_string();
@@ -119,8 +154,8 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
119154
}
120155

121156
#[async_trait::async_trait(?Send)]
122-
impl<N: RequestClient + PublishClient + SubscribeClient + FlushClient, C: GetElapsed> Agent
123-
for Bridge<N, C>
157+
impl<N: RequestClient + PublishClient + SubscribeClient + FlushClient, C: GetElapsed, J> Agent
158+
for Bridge<N, C, J>
124159
{
125160
async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse> {
126161
initialize::handle(self, args).await

0 commit comments

Comments
 (0)