Skip to content

Commit 28cce3a

Browse files
committed
feat(acp-nats, acp-nats-agent): wire JetStream into bridge and agent
Wire Layer 3 of JetStream integration — the actual message flow. JsMessage is now a zero-cost trait abstraction: NatsJsMessage wraps real jetstream::Message for production, MockJsMessage records signals for testing. All dispatch and prompt code is generic over the trait. Bridge prompt handler branches on bridge.js(): when JetStream is available, publishes to COMMANDS stream and consumes from NOTIFICATIONS/RESPONSES streams instead of subscribe-before-publish. Session.ready skips the 100ms sleep when JetStream is available. Agent library gains serve_js() alongside serve() with full ack/nak/term signal handling. Runs in parallel via with_jetstream(). Binary crates (stdio, ws) create NatsJetStreamClient and pass to Bridge::with_jetstream(). () implements JetStream traits for backward compatibility. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 43f9048 commit 28cce3a

File tree

23 files changed

+2056
-230
lines changed

23 files changed

+2056
-230
lines changed

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 478 additions & 0 deletions
Large diffs are not rendered by default.

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use opentelemetry::metrics::Meter;
2121
use tokio::sync::mpsc;
2222
use tokio::task::JoinHandle;
2323
use tracing::{info, warn};
24-
#[cfg(not(coverage))]
25-
#[allow(unused_imports)]
2624
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
2725
use trogon_std::time::GetElapsed;
2826

@@ -36,7 +34,6 @@ use crate::constants::SESSION_READY_DELAY;
3634

3735
pub struct Bridge<N, C: GetElapsed, J = ()> {
3836
pub(crate) nats: N,
39-
#[allow(dead_code)] // Used in prompt.rs JetStream path
4037
pub(crate) js: Option<J>,
4138
pub(crate) clock: C,
4239
pub(crate) config: Config,
@@ -68,7 +65,6 @@ impl<N, C: GetElapsed> Bridge<N, C> {
6865
}
6966

7067
impl<N, C: GetElapsed, J> Bridge<N, C, J> {
71-
#[cfg(not(coverage))]
7268
pub fn with_jetstream(
7369
nats: N,
7470
js: J,
@@ -93,8 +89,6 @@ impl<N, C: GetElapsed, J> Bridge<N, C, J> {
9389
&self.nats
9490
}
9591

96-
#[cfg(not(coverage))]
97-
#[allow(dead_code)]
9892
pub(crate) fn js(&self) -> Option<&J> {
9993
self.js.as_ref()
10094
}
@@ -116,8 +110,9 @@ impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed, J>
116110
let nats = self.nats.clone();
117111
let prefix = self.config.acp_prefix().to_string();
118112
let metrics = self.metrics.clone();
113+
let has_jetstream = self.js.is_some();
119114
let handle = tokio::spawn(async move {
120-
publish_session_ready(&nats, &prefix, &session_id, &metrics).await;
115+
publish_session_ready(&nats, &prefix, &session_id, &metrics, has_jetstream).await;
121116
});
122117
self.spawn_background(handle);
123118
}
@@ -128,8 +123,11 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
128123
prefix: &str,
129124
session_id: &SessionId,
130125
metrics: &Metrics,
126+
has_jetstream: bool,
131127
) {
132-
tokio::time::sleep(SESSION_READY_DELAY).await;
128+
if !has_jetstream {
129+
tokio::time::sleep(SESSION_READY_DELAY).await;
130+
}
133131

134132
let subject = session::agent::ext_ready(prefix, &session_id.to_string());
135133
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
@@ -154,8 +152,11 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
154152
}
155153

156154
#[async_trait::async_trait(?Send)]
157-
impl<N: RequestClient + PublishClient + SubscribeClient + FlushClient, C: GetElapsed, J> Agent
158-
for Bridge<N, C, J>
155+
impl<
156+
N: RequestClient + PublishClient + SubscribeClient + FlushClient,
157+
C: GetElapsed,
158+
J: JetStreamPublisher + JetStreamConsumerFactory,
159+
> Agent for Bridge<N, C, J>
159160
{
160161
async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse> {
161162
initialize::handle(self, args).await

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
use super::Bridge;
2+
use super::js_request;
23
use crate::error::map_nats_error;
34
use crate::nats::{self, RequestClient, session};
45
use crate::session_id::AcpSessionId;
56
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
67
use tracing::{info, instrument};
8+
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
79
use trogon_std::time::GetElapsed;
810

911
#[instrument(
1012
name = "acp.session.close",
1113
skip(bridge, args),
1214
fields(session_id = %args.session_id)
1315
)]
14-
pub async fn handle<N: RequestClient, C: GetElapsed, J>(
16+
pub async fn handle<
17+
N: RequestClient,
18+
C: GetElapsed,
19+
J: JetStreamPublisher + JetStreamConsumerFactory,
20+
>(
1521
bridge: &Bridge<N, C, J>,
1622
args: CloseSessionRequest,
1723
) -> Result<CloseSessionResponse> {
@@ -28,17 +34,33 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2834
format!("Invalid session ID: {}", e),
2935
)
3036
})?;
31-
let nats = bridge.nats();
32-
let subject = session::agent::close(bridge.config.acp_prefix(), session_id.as_str());
33-
34-
let result = nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
35-
nats,
36-
&subject,
37-
&args,
38-
bridge.config.operation_timeout,
39-
)
40-
.await
41-
.map_err(map_nats_error);
37+
let prefix = bridge.config.acp_prefix();
38+
let subject = session::agent::close(prefix, session_id.as_str());
39+
40+
let result = match bridge.js() {
41+
Some(js) => {
42+
let req_id = uuid::Uuid::new_v4().to_string();
43+
js_request::js_request::<J, _, CloseSessionResponse, _>(
44+
js,
45+
&subject,
46+
&args,
47+
&trogon_std::StdJsonSerialize,
48+
prefix,
49+
session_id.as_str(),
50+
&req_id,
51+
bridge.config.operation_timeout,
52+
)
53+
.await
54+
}
55+
None => nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
56+
bridge.nats(),
57+
&subject,
58+
&args,
59+
bridge.config.operation_timeout,
60+
)
61+
.await
62+
.map_err(map_nats_error),
63+
};
4264

4365
bridge.metrics.record_request(
4466
"close_session",

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
use super::Bridge;
2+
use super::js_request;
23
use crate::error::map_nats_error;
34
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
45
use crate::session_id::AcpSessionId;
56
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
67
use tracing::{Span, info, instrument};
8+
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
79
use trogon_std::time::GetElapsed;
810

911
#[instrument(
1012
name = "acp.session.fork",
1113
skip(bridge, args),
1214
fields(session_id = %args.session_id, new_session_id = tracing::field::Empty)
1315
)]
14-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed, J>(
16+
pub async fn handle<
17+
N: RequestClient + PublishClient + FlushClient,
18+
C: GetElapsed,
19+
J: JetStreamPublisher + JetStreamConsumerFactory,
20+
>(
1521
bridge: &Bridge<N, C, J>,
1622
args: ForkSessionRequest,
1723
) -> Result<ForkSessionResponse> {
@@ -28,17 +34,33 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
2834
format!("Invalid session ID: {}", e),
2935
)
3036
})?;
31-
let nats = bridge.nats();
32-
let subject = session::agent::fork(bridge.config.acp_prefix(), session_id.as_str());
33-
34-
let result = nats::request_with_timeout::<N, ForkSessionRequest, ForkSessionResponse>(
35-
nats,
36-
&subject,
37-
&args,
38-
bridge.config.operation_timeout,
39-
)
40-
.await
41-
.map_err(map_nats_error);
37+
let prefix = bridge.config.acp_prefix();
38+
let subject = session::agent::fork(prefix, session_id.as_str());
39+
40+
let result = match bridge.js() {
41+
Some(js) => {
42+
let req_id = uuid::Uuid::new_v4().to_string();
43+
js_request::js_request::<J, _, ForkSessionResponse, _>(
44+
js,
45+
&subject,
46+
&args,
47+
&trogon_std::StdJsonSerialize,
48+
prefix,
49+
session_id.as_str(),
50+
&req_id,
51+
bridge.config.operation_timeout,
52+
)
53+
.await
54+
}
55+
None => nats::request_with_timeout::<N, ForkSessionRequest, ForkSessionResponse>(
56+
bridge.nats(),
57+
&subject,
58+
&args,
59+
bridge.config.operation_timeout,
60+
)
61+
.await
62+
.map_err(map_nats_error),
63+
};
4264

4365
if let Ok(ref response) = result {
4466
Span::current().record("new_session_id", response.session_id.to_string().as_str());

0 commit comments

Comments
 (0)