Skip to content

Commit 7b9c5af

Browse files
committed
refactor(trogon-nats): zero-wrapping JetStream trait alignment with async_nats
Every production trait impl is a direct passthrough to async_nats: - No newtypes, no wrappers, no BoxStream, no error mapping - Traits implemented directly on async_nats types (Stream, Consumer, Message) - All associated types are async_nats types - Removed NatsJsMessage, NatsJetStreamStream, NatsJetStreamConsumer newtypes - Removed JetStreamError entirely - Fixed swapped StreamError/MessagesError naming to match async_nats - Split JetStreamConsumerFactory into JetStreamGetStream + JetStreamCreateConsumer - Renamed js_publish_with_headers to publish_with_headers Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent b6d29b3 commit 7b9c5af

18 files changed

Lines changed: 334 additions & 232 deletions

rsworkspace/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ where
112112
impl std::future::Future<Output = Result<(), ConnectionError>>,
113113
)
114114
where
115-
J: JetStreamConsumerFactory + 'static,
116-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
115+
J: JetStreamGetStream + 'static,
116+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
117117
{
118118
let nats_for_serve = nats.clone();
119119
let nats_for_js = nats.clone();
@@ -422,7 +422,8 @@ where
422422
}
423423

424424
use trogon_nats::jetstream::{
425-
JetStreamConsumer as _, JetStreamConsumerFactory, JsAckWith, JsDispatchMessage,
425+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JsAckWith,
426+
JsDispatchMessage,
426427
};
427428

428429
async fn handle_request_with_keepalive<N, Resp, ReqT, F, M>(
@@ -484,17 +485,22 @@ async fn serve_js<N, J, A>(
484485
) -> Result<(), ConnectionError>
485486
where
486487
N: PublishClient + FlushClient + Clone + 'static,
487-
J: JetStreamConsumerFactory + 'static,
488-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
488+
J: JetStreamGetStream + 'static,
489+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
489490
A: Agent + 'static,
490491
{
491492
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix);
492493
let config = acp_nats::jetstream::consumers::commands_observer();
493494

494495
info!(stream = %stream_name, "Starting JetStream consumer for COMMANDS stream");
495496

496-
let consumer = js
497-
.create_consumer(&stream_name, config)
497+
let stream = js
498+
.get_stream(&stream_name)
499+
.await
500+
.map_err(|e| ConnectionError::JetStream(Box::new(e)))?;
501+
502+
let consumer = stream
503+
.create_consumer(config)
498504
.await
499505
.map_err(|e| ConnectionError::JetStream(Box::new(e)))?;
500506

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use opentelemetry::metrics::Meter;
2121
use tokio::sync::mpsc;
2222
use tokio::task::JoinHandle;
2323
use tracing::{info, warn};
24-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
24+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
2525
use trogon_std::time::GetElapsed;
2626

2727
use super::{
@@ -150,10 +150,10 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
150150
impl<
151151
N: RequestClient + PublishClient + FlushClient,
152152
C: GetElapsed,
153-
J: JetStreamPublisher + JetStreamConsumerFactory,
153+
J: JetStreamPublisher + JetStreamGetStream,
154154
> Bridge<N, C, J>
155155
where
156-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
156+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
157157
{
158158
pub(crate) async fn session_request<Req, Res>(
159159
&self,
@@ -198,10 +198,10 @@ where
198198
impl<
199199
N: RequestClient + PublishClient + SubscribeClient + FlushClient,
200200
C: GetElapsed,
201-
J: JetStreamPublisher + JetStreamConsumerFactory,
201+
J: JetStreamPublisher + JetStreamGetStream,
202202
> Agent for Bridge<N, C, J>
203203
where
204-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
204+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
205205
{
206206
async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse> {
207207
initialize::handle(self, args).await

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
55
use tracing::{info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: CloseSessionRequest,
2121
) -> Result<CloseSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
55
use tracing::{Span, info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: ForkSessionRequest,
2121
) -> Result<ForkSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/js_request.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::time::Duration;
77
use tokio::time::timeout;
88
use trogon_nats::REQ_ID_HEADER;
99
use trogon_nats::jetstream::{
10-
JetStreamConsumer as _, JetStreamConsumerFactory, JetStreamPublisher, JsAck as _,
11-
JsAckWith as _, JsMessageRef as _, JsRequestMessage,
10+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JetStreamPublisher,
11+
JsAck as _, JsAckWith as _, JsMessageRef as _, JsRequestMessage,
1212
};
1313
use trogon_std::JsonSerialize;
1414

@@ -27,8 +27,8 @@ pub async fn js_request<J, Req, Res, S>(
2727
operation_timeout: Duration,
2828
) -> agent_client_protocol::Result<Res>
2929
where
30-
J: JetStreamPublisher + JetStreamConsumerFactory,
31-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
30+
J: JetStreamPublisher + JetStreamGetStream,
31+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
3232
Req: serde::Serialize,
3333
Res: DeserializeOwned,
3434
S: JsonSerialize,
@@ -37,22 +37,22 @@ where
3737
// runner responds before we start consuming. DeliverAll replays from stream start.
3838
let responses_stream = streams::responses_stream_name(prefix);
3939
let resp_config = consumers::response_consumer(prefix, session_id, req_id);
40-
let resp_consumer: J::Consumer = js
41-
.create_consumer(&responses_stream, resp_config)
40+
let stream = js
41+
.get_stream(&responses_stream)
4242
.await
43-
.map_err(|e| {
44-
Error::new(
45-
ErrorCode::InternalError.into(),
46-
format!("create response consumer: {e}"),
47-
)
48-
})?;
49-
let mut resp_messages: <J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Messages =
50-
resp_consumer.messages().await.map_err(|e| {
51-
Error::new(
52-
ErrorCode::InternalError.into(),
53-
format!("response messages: {e}"),
54-
)
55-
})?;
43+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("get stream: {e}")))?;
44+
let resp_consumer = stream.create_consumer(resp_config).await.map_err(|e| {
45+
Error::new(
46+
ErrorCode::InternalError.into(),
47+
format!("create response consumer: {e}"),
48+
)
49+
})?;
50+
let mut resp_messages = resp_consumer.messages().await.map_err(|e| {
51+
Error::new(
52+
ErrorCode::InternalError.into(),
53+
format!("response messages: {e}"),
54+
)
55+
})?;
5656

5757
let payload_bytes = serializer
5858
.to_vec(request)
@@ -62,9 +62,11 @@ where
6262
headers.insert(REQ_ID_HEADER, req_id);
6363
headers.insert(SESSION_ID_HEADER, session_id);
6464

65-
js.js_publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
65+
js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
66+
.await
67+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
6668
.await
67-
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?;
69+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js ack: {e}")))?;
6870

6971
match timeout(operation_timeout, resp_messages.next()).await {
7072
Ok(Some(Ok(js_msg))) => {

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
55
use tracing::{info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: LoadSessionRequest,
2121
) -> Result<LoadSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

0 commit comments

Comments
 (0)