Skip to content

Commit 4f94c5e

Browse files
committed
feat(acp-nats, acp-nats-agent): wire JetStream into bridge and agent
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 43f9048 commit 4f94c5e

25 files changed

Lines changed: 3335 additions & 304 deletions

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 1047 additions & 29 deletions
Large diffs are not rendered by default.

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use opentelemetry::metrics::Meter;
2121
use tokio::sync::mpsc;
2222
use tokio::task::JoinHandle;
2323
use tracing::{info, warn};
24-
#[cfg(not(coverage))]
25-
#[allow(unused_imports)]
2624
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
2725
use trogon_std::time::GetElapsed;
2826

@@ -36,7 +34,6 @@ use crate::constants::SESSION_READY_DELAY;
3634

3735
pub struct Bridge<N, C: GetElapsed, J = ()> {
3836
pub(crate) nats: N,
39-
#[allow(dead_code)] // Used in prompt.rs JetStream path
4037
pub(crate) js: Option<J>,
4138
pub(crate) clock: C,
4239
pub(crate) config: Config,
@@ -68,7 +65,6 @@ impl<N, C: GetElapsed> Bridge<N, C> {
6865
}
6966

7067
impl<N, C: GetElapsed, J> Bridge<N, C, J> {
71-
#[cfg(not(coverage))]
7268
pub fn with_jetstream(
7369
nats: N,
7470
js: J,
@@ -93,8 +89,6 @@ impl<N, C: GetElapsed, J> Bridge<N, C, J> {
9389
&self.nats
9490
}
9591

96-
#[cfg(not(coverage))]
97-
#[allow(dead_code)]
9892
pub(crate) fn js(&self) -> Option<&J> {
9993
self.js.as_ref()
10094
}
@@ -154,8 +148,11 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
154148
}
155149

156150
#[async_trait::async_trait(?Send)]
157-
impl<N: RequestClient + PublishClient + SubscribeClient + FlushClient, C: GetElapsed, J> Agent
158-
for Bridge<N, C, J>
151+
impl<
152+
N: RequestClient + PublishClient + SubscribeClient + FlushClient,
153+
C: GetElapsed,
154+
J: JetStreamPublisher + JetStreamConsumerFactory,
155+
> Agent for Bridge<N, C, J>
159156
{
160157
async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse> {
161158
initialize::handle(self, args).await

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
use super::Bridge;
1+
use super::{Bridge, js_request};
22
use crate::error::map_nats_error;
3-
use crate::nats::{self, RequestClient, session};
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
66
use tracing::{info, instrument};
7+
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
78
use trogon_std::time::GetElapsed;
89

910
#[instrument(
1011
name = "acp.session.close",
1112
skip(bridge, args),
1213
fields(session_id = %args.session_id)
1314
)]
14-
pub async fn handle<N: RequestClient, C: GetElapsed, J>(
15+
pub async fn handle<
16+
N: RequestClient + PublishClient + FlushClient,
17+
C: GetElapsed,
18+
J: JetStreamPublisher + JetStreamConsumerFactory,
19+
>(
1520
bridge: &Bridge<N, C, J>,
1621
args: CloseSessionRequest,
1722
) -> Result<CloseSessionResponse> {
@@ -28,17 +33,33 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2833
format!("Invalid session ID: {}", e),
2934
)
3035
})?;
31-
let nats = bridge.nats();
32-
let subject = session::agent::close(bridge.config.acp_prefix(), session_id.as_str());
33-
34-
let result = nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
35-
nats,
36-
&subject,
37-
&args,
38-
bridge.config.operation_timeout,
39-
)
40-
.await
41-
.map_err(map_nats_error);
36+
let prefix = bridge.config.acp_prefix();
37+
let subject = session::agent::close(prefix, session_id.as_str());
38+
39+
let result = match bridge.js() {
40+
Some(js) => {
41+
let req_id = uuid::Uuid::new_v4().to_string();
42+
js_request::js_request::<J, _, CloseSessionResponse, _>(
43+
js,
44+
&subject,
45+
&args,
46+
&trogon_std::StdJsonSerialize,
47+
prefix,
48+
session_id.as_str(),
49+
&req_id,
50+
bridge.config.operation_timeout,
51+
)
52+
.await
53+
}
54+
None => nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
55+
bridge.nats(),
56+
&subject,
57+
&args,
58+
bridge.config.operation_timeout,
59+
)
60+
.await
61+
.map_err(map_nats_error),
62+
};
4263

4364
bridge.metrics.record_request(
4465
"close_session",

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
use super::Bridge;
1+
use super::{Bridge, js_request};
22
use crate::error::map_nats_error;
33
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
66
use tracing::{Span, info, instrument};
7+
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher};
78
use trogon_std::time::GetElapsed;
89

910
#[instrument(
1011
name = "acp.session.fork",
1112
skip(bridge, args),
1213
fields(session_id = %args.session_id, new_session_id = tracing::field::Empty)
1314
)]
14-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed, J>(
15+
pub async fn handle<
16+
N: RequestClient + PublishClient + FlushClient,
17+
C: GetElapsed,
18+
J: JetStreamPublisher + JetStreamConsumerFactory,
19+
>(
1520
bridge: &Bridge<N, C, J>,
1621
args: ForkSessionRequest,
1722
) -> Result<ForkSessionResponse> {
@@ -28,17 +33,33 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
2833
format!("Invalid session ID: {}", e),
2934
)
3035
})?;
31-
let nats = bridge.nats();
32-
let subject = session::agent::fork(bridge.config.acp_prefix(), session_id.as_str());
33-
34-
let result = nats::request_with_timeout::<N, ForkSessionRequest, ForkSessionResponse>(
35-
nats,
36-
&subject,
37-
&args,
38-
bridge.config.operation_timeout,
39-
)
40-
.await
41-
.map_err(map_nats_error);
36+
let prefix = bridge.config.acp_prefix();
37+
let subject = session::agent::fork(prefix, session_id.as_str());
38+
39+
let result = match bridge.js() {
40+
Some(js) => {
41+
let req_id = uuid::Uuid::new_v4().to_string();
42+
js_request::js_request::<J, _, ForkSessionResponse, _>(
43+
js,
44+
&subject,
45+
&args,
46+
&trogon_std::StdJsonSerialize,
47+
prefix,
48+
session_id.as_str(),
49+
&req_id,
50+
bridge.config.operation_timeout,
51+
)
52+
.await
53+
}
54+
None => nats::request_with_timeout::<N, ForkSessionRequest, ForkSessionResponse>(
55+
bridge.nats(),
56+
&subject,
57+
&args,
58+
bridge.config.operation_timeout,
59+
)
60+
.await
61+
.map_err(map_nats_error),
62+
};
4263

4364
if let Ok(ref response) = result {
4465
Span::current().record("new_session_id", response.session_id.to_string().as_str());

0 commit comments

Comments
 (0)