Skip to content

Commit d2ba75a

Browse files
committed
refactor(trogon-nats): zero-wrapping JetStream trait alignment with async_nats
Every production trait impl is a direct passthrough to async_nats: - No newtypes, no wrappers, no BoxStream, no error mapping - Traits implemented directly on async_nats types (Stream, Consumer, Message) - All associated types are async_nats types - Removed NatsJsMessage, NatsJetStreamStream, NatsJetStreamConsumer newtypes - Removed JetStreamError entirely Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent b6d29b3 commit d2ba75a

File tree

17 files changed

+271
-232
lines changed

17 files changed

+271
-232
lines changed

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ where
112112
impl std::future::Future<Output = Result<(), ConnectionError>>,
113113
)
114114
where
115-
J: JetStreamConsumerFactory + 'static,
116-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
115+
J: JetStreamGetStream + 'static,
116+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
117117
{
118118
let nats_for_serve = nats.clone();
119119
let nats_for_js = nats.clone();
@@ -422,7 +422,8 @@ where
422422
}
423423

424424
use trogon_nats::jetstream::{
425-
JetStreamConsumer as _, JetStreamConsumerFactory, JsAckWith, JsDispatchMessage,
425+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JsAckWith,
426+
JsDispatchMessage,
426427
};
427428

428429
async fn handle_request_with_keepalive<N, Resp, ReqT, F, M>(
@@ -484,17 +485,22 @@ async fn serve_js<N, J, A>(
484485
) -> Result<(), ConnectionError>
485486
where
486487
N: PublishClient + FlushClient + Clone + 'static,
487-
J: JetStreamConsumerFactory + 'static,
488-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
488+
J: JetStreamGetStream + 'static,
489+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
489490
A: Agent + 'static,
490491
{
491492
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix);
492493
let config = acp_nats::jetstream::consumers::commands_observer();
493494

494495
info!(stream = %stream_name, "Starting JetStream consumer for COMMANDS stream");
495496

496-
let consumer = js
497-
.create_consumer(&stream_name, config)
497+
let stream = js
498+
.get_stream(&stream_name)
499+
.await
500+
.map_err(|e| ConnectionError::JetStream(Box::new(e)))?;
501+
502+
let consumer = stream
503+
.create_consumer(config)
498504
.await
499505
.map_err(|e| ConnectionError::JetStream(Box::new(e)))?;
500506

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use opentelemetry::metrics::Meter;
2121
use tokio::sync::mpsc;
2222
use tokio::task::JoinHandle;
2323
use tracing::{info, warn};
24-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
24+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
2525
use trogon_std::time::GetElapsed;
2626

2727
use super::{
@@ -150,10 +150,10 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
150150
impl<
151151
N: RequestClient + PublishClient + FlushClient,
152152
C: GetElapsed,
153-
J: JetStreamPublisher + JetStreamConsumerFactory,
153+
J: JetStreamPublisher + JetStreamGetStream,
154154
> Bridge<N, C, J>
155155
where
156-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
156+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
157157
{
158158
pub(crate) async fn session_request<Req, Res>(
159159
&self,
@@ -198,10 +198,10 @@ where
198198
impl<
199199
N: RequestClient + PublishClient + SubscribeClient + FlushClient,
200200
C: GetElapsed,
201-
J: JetStreamPublisher + JetStreamConsumerFactory,
201+
J: JetStreamPublisher + JetStreamGetStream,
202202
> Agent for Bridge<N, C, J>
203203
where
204-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
204+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
205205
{
206206
async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse> {
207207
initialize::handle(self, args).await

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
55
use tracing::{info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: CloseSessionRequest,
2121
) -> Result<CloseSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
55
use tracing::{Span, info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: ForkSessionRequest,
2121
) -> Result<ForkSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/js_request.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::time::Duration;
77
use tokio::time::timeout;
88
use trogon_nats::REQ_ID_HEADER;
99
use trogon_nats::jetstream::{
10-
JetStreamConsumer as _, JetStreamConsumerFactory, JetStreamPublisher, JsAck as _,
11-
JsAckWith as _, JsMessageRef as _, JsRequestMessage,
10+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JetStreamPublisher,
11+
JsAck as _, JsAckWith as _, JsMessageRef as _, JsRequestMessage,
1212
};
1313
use trogon_std::JsonSerialize;
1414

@@ -27,8 +27,8 @@ pub async fn js_request<J, Req, Res, S>(
2727
operation_timeout: Duration,
2828
) -> agent_client_protocol::Result<Res>
2929
where
30-
J: JetStreamPublisher + JetStreamConsumerFactory,
31-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
30+
J: JetStreamPublisher + JetStreamGetStream,
31+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
3232
Req: serde::Serialize,
3333
Res: DeserializeOwned,
3434
S: JsonSerialize,
@@ -37,22 +37,22 @@ where
3737
// runner responds before we start consuming. DeliverAll replays from stream start.
3838
let responses_stream = streams::responses_stream_name(prefix);
3939
let resp_config = consumers::response_consumer(prefix, session_id, req_id);
40-
let resp_consumer: J::Consumer = js
41-
.create_consumer(&responses_stream, resp_config)
40+
let stream = js
41+
.get_stream(&responses_stream)
4242
.await
43-
.map_err(|e| {
44-
Error::new(
45-
ErrorCode::InternalError.into(),
46-
format!("create response consumer: {e}"),
47-
)
48-
})?;
49-
let mut resp_messages: <J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Messages =
50-
resp_consumer.messages().await.map_err(|e| {
51-
Error::new(
52-
ErrorCode::InternalError.into(),
53-
format!("response messages: {e}"),
54-
)
55-
})?;
43+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("get stream: {e}")))?;
44+
let resp_consumer = stream.create_consumer(resp_config).await.map_err(|e| {
45+
Error::new(
46+
ErrorCode::InternalError.into(),
47+
format!("create response consumer: {e}"),
48+
)
49+
})?;
50+
let mut resp_messages = resp_consumer.messages().await.map_err(|e| {
51+
Error::new(
52+
ErrorCode::InternalError.into(),
53+
format!("response messages: {e}"),
54+
)
55+
})?;
5656

5757
let payload_bytes = serializer
5858
.to_vec(request)
@@ -62,9 +62,11 @@ where
6262
headers.insert(REQ_ID_HEADER, req_id);
6363
headers.insert(SESSION_ID_HEADER, session_id);
6464

65-
js.js_publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
65+
js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
66+
.await
67+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
6668
.await
67-
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?;
69+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js ack: {e}")))?;
6870

6971
match timeout(operation_timeout, resp_messages.next()).await {
7072
Ok(Some(Ok(js_msg))) => {

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
55
use tracing::{info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: LoadSessionRequest,
2121
) -> Result<LoadSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use futures::StreamExt;
77
use tokio::time::timeout;
88
use tracing::{instrument, warn};
99
use trogon_nats::jetstream::{
10-
JetStreamConsumer as _, JetStreamConsumerFactory, JetStreamPublisher, JsAck as _,
11-
JsAckWith as _, JsMessageRef as _, JsRequestMessage,
10+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JetStreamPublisher,
11+
JsAck as _, JsAckWith as _, JsMessageRef as _, JsRequestMessage,
1212
};
1313
use trogon_std::JsonSerialize;
1414

@@ -33,8 +33,8 @@ pub async fn handle<N, C, J, S>(
3333
where
3434
N: RequestClient + PublishClient + SubscribeClient + FlushClient,
3535
C: trogon_std::time::GetElapsed,
36-
J: JetStreamPublisher + JetStreamConsumerFactory,
37-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
36+
J: JetStreamPublisher + JetStreamGetStream,
37+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
3838
S: JsonSerialize,
3939
{
4040
let start = bridge.clock.now();
@@ -195,50 +195,60 @@ async fn handle_js<N, C, J, S>(
195195
where
196196
N: SubscribeClient,
197197
C: trogon_std::time::GetElapsed,
198-
J: JetStreamPublisher + JetStreamConsumerFactory,
199-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
198+
J: JetStreamPublisher + JetStreamGetStream,
199+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
200200
S: JsonSerialize,
201201
{
202202
// Create consumers BEFORE publishing — same principle as subscribe-before-publish.
203203
// JetStream consumers with DeliverAll replay from stream start, so they'll see the
204204
// response even if the runner responds before we start consuming.
205205
let notifications_stream = streams::notifications_stream_name(prefix);
206206
let notif_config = consumers::prompt_notifications_consumer(prefix, sid, req_id);
207-
let notif_consumer: J::Consumer = js
208-
.create_consumer(&notifications_stream, notif_config)
207+
let notif_stream = js.get_stream(&notifications_stream).await.map_err(|e| {
208+
Error::new(
209+
ErrorCode::InternalError.into(),
210+
format!("get notifications stream: {e}"),
211+
)
212+
})?;
213+
let notif_consumer = notif_stream
214+
.create_consumer(notif_config)
209215
.await
210216
.map_err(|e| {
211217
Error::new(
212218
ErrorCode::InternalError.into(),
213219
format!("create notification consumer: {e}"),
214220
)
215221
})?;
216-
let mut notif_messages: <J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Messages =
217-
notif_consumer.messages().await.map_err(|e| {
218-
Error::new(
219-
ErrorCode::InternalError.into(),
220-
format!("notification messages: {e}"),
221-
)
222-
})?;
222+
let mut notif_messages = notif_consumer.messages().await.map_err(|e| {
223+
Error::new(
224+
ErrorCode::InternalError.into(),
225+
format!("notification messages: {e}"),
226+
)
227+
})?;
223228

224229
let responses_stream = streams::responses_stream_name(prefix);
225230
let resp_config = consumers::prompt_response_consumer(prefix, sid, req_id);
226-
let resp_consumer: J::Consumer = js
227-
.create_consumer(&responses_stream, resp_config)
231+
let resp_stream = js.get_stream(&responses_stream).await.map_err(|e| {
232+
Error::new(
233+
ErrorCode::InternalError.into(),
234+
format!("get responses stream: {e}"),
235+
)
236+
})?;
237+
let resp_consumer = resp_stream
238+
.create_consumer(resp_config)
228239
.await
229240
.map_err(|e| {
230241
Error::new(
231242
ErrorCode::InternalError.into(),
232243
format!("create response consumer: {e}"),
233244
)
234245
})?;
235-
let mut resp_messages: <J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Messages =
236-
resp_consumer.messages().await.map_err(|e| {
237-
Error::new(
238-
ErrorCode::InternalError.into(),
239-
format!("response messages: {e}"),
240-
)
241-
})?;
246+
let mut resp_messages = resp_consumer.messages().await.map_err(|e| {
247+
Error::new(
248+
ErrorCode::InternalError.into(),
249+
format!("response messages: {e}"),
250+
)
251+
})?;
242252

243253
// Cancel still uses core NATS — it's a fire-and-forget signal, not persisted.
244254
let mut cancel_sub = bridge
@@ -262,9 +272,11 @@ where
262272
headers.insert(SESSION_ID_HEADER, sid);
263273

264274
let prompt_subject = session::agent::prompt(prefix, sid);
265-
js.js_publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
275+
js.publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
276+
.await
277+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
266278
.await
267-
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?;
279+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js ack: {e}")))?;
268280

269281
let op_timeout = bridge.config.prompt_timeout();
270282

rsworkspace/crates/acp-nats/src/agent/resume_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use agent_client_protocol::{
55
Error, ErrorCode, Result, ResumeSessionRequest, ResumeSessionResponse,
66
};
77
use tracing::{info, instrument};
8-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
8+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
99
use trogon_std::time::GetElapsed;
1010

1111
#[instrument(
@@ -16,13 +16,13 @@ use trogon_std::time::GetElapsed;
1616
pub async fn handle<
1717
N: RequestClient + PublishClient + FlushClient,
1818
C: GetElapsed,
19-
J: JetStreamPublisher + JetStreamConsumerFactory,
19+
J: JetStreamPublisher + JetStreamGetStream,
2020
>(
2121
bridge: &Bridge<N, C, J>,
2222
args: ResumeSessionRequest,
2323
) -> Result<ResumeSessionResponse>
2424
where
25-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
25+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2626
{
2727
let start = bridge.clock.now();
2828

rsworkspace/crates/acp-nats/src/agent/set_session_config_option.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use agent_client_protocol::{
55
Error, ErrorCode, Result, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
66
};
77
use tracing::{info, instrument};
8-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
8+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
99
use trogon_std::time::GetElapsed;
1010

1111
#[instrument(
@@ -16,13 +16,13 @@ use trogon_std::time::GetElapsed;
1616
pub async fn handle<
1717
N: RequestClient + PublishClient + FlushClient,
1818
C: GetElapsed,
19-
J: JetStreamPublisher + JetStreamConsumerFactory,
19+
J: JetStreamPublisher + JetStreamGetStream,
2020
>(
2121
bridge: &Bridge<N, C, J>,
2222
args: SetSessionConfigOptionRequest,
2323
) -> Result<SetSessionConfigOptionResponse>
2424
where
25-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
25+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2626
{
2727
let start = bridge.clock.now();
2828

0 commit comments

Comments
 (0)