Skip to content

Commit c0eeef1

Browse files
committed
refactor(acp-nats): typed subjects enforce correct transport at compile time
Introduce six subject newtypes: RequestSubject, PublishSubject, JetStreamSubject, AgentResponseSubject, SubscribeSubject, and ClientRequestSubject. Every subject builder returns the type matching its transport. Wrapper functions only accept the correct type. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent b6d29b3 commit c0eeef1

22 files changed

Lines changed: 713 additions & 325 deletions

rsworkspace/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use agent_client_protocol::{
1212
};
1313
use async_nats::Message;
1414
use async_nats::jetstream::AckKind;
15+
use async_nats::subject::ToSubject;
1516
#[cfg(test)]
1617
use bytes::Bytes;
1718
use futures::StreamExt;
@@ -112,8 +113,8 @@ where
112113
impl std::future::Future<Output = Result<(), ConnectionError>>,
113114
)
114115
where
115-
J: JetStreamConsumerFactory + 'static,
116-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
116+
J: JetStreamGetStream + 'static,
117+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
117118
{
118119
let nats_for_serve = nats.clone();
119120
let nats_for_js = nats.clone();
@@ -422,7 +423,8 @@ where
422423
}
423424

424425
use trogon_nats::jetstream::{
425-
JetStreamConsumer as _, JetStreamConsumerFactory, JsAckWith, JsDispatchMessage,
426+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JsAckWith,
427+
JsDispatchMessage,
426428
};
427429

428430
async fn handle_request_with_keepalive<N, Resp, ReqT, F, M>(
@@ -484,17 +486,22 @@ async fn serve_js<N, J, A>(
484486
) -> Result<(), ConnectionError>
485487
where
486488
N: PublishClient + FlushClient + Clone + 'static,
487-
J: JetStreamConsumerFactory + 'static,
488-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
489+
J: JetStreamGetStream + 'static,
490+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
489491
A: Agent + 'static,
490492
{
491493
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix);
492494
let config = acp_nats::jetstream::consumers::commands_observer();
493495

494496
info!(stream = %stream_name, "Starting JetStream consumer for COMMANDS stream");
495497

496-
let consumer = js
497-
.create_consumer(&stream_name, config)
498+
let stream = js
499+
.get_stream(&stream_name)
500+
.await
501+
.map_err(|e| ConnectionError::JetStream(Box::new(e)))?;
502+
503+
let consumer = stream
504+
.create_consumer(config)
498505
.await
499506
.map_err(|e| ConnectionError::JetStream(Box::new(e)))?;
500507

@@ -581,7 +588,7 @@ async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent, M: JsDisp
581588
let inner = js_msg.message();
582589
let msg = Message {
583590
subject: subject.as_str().into(),
584-
reply: reply_subject.as_deref().map(|s| s.into()),
591+
reply: reply_subject.map(|s| s.to_subject()),
585592
payload: inner.payload.clone(),
586593
headers: inner.headers.clone(),
587594
status: None,

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use opentelemetry::metrics::Meter;
2121
use tokio::sync::mpsc;
2222
use tokio::task::JoinHandle;
2323
use tracing::{info, warn};
24-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
24+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
2525
use trogon_std::time::GetElapsed;
2626

2727
use super::{
@@ -150,14 +150,14 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
150150
impl<
151151
N: RequestClient + PublishClient + FlushClient,
152152
C: GetElapsed,
153-
J: JetStreamPublisher + JetStreamConsumerFactory,
153+
J: JetStreamPublisher + JetStreamGetStream,
154154
> Bridge<N, C, J>
155155
where
156-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
156+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
157157
{
158158
pub(crate) async fn session_request<Req, Res>(
159159
&self,
160-
subject: &str,
160+
subject: &crate::nats::JetStreamSubject,
161161
args: &Req,
162162
session_id: &str,
163163
) -> Result<Res>
@@ -172,7 +172,7 @@ where
172172
let req_id = uuid::Uuid::new_v4().to_string();
173173
js_request::js_request::<J, _, Res, _>(
174174
js,
175-
subject,
175+
&subject.0,
176176
args,
177177
&trogon_std::StdJsonSerialize,
178178
self.config.acp_prefix(),
@@ -182,9 +182,9 @@ where
182182
)
183183
.await
184184
}
185-
None => nats::request_with_timeout::<N, Req, Res>(
185+
None => trogon_nats::request_with_timeout::<N, Req, Res>(
186186
self.nats(),
187-
subject,
187+
&subject.0,
188188
args,
189189
self.config.operation_timeout,
190190
)
@@ -198,10 +198,10 @@ where
198198
impl<
199199
N: RequestClient + PublishClient + SubscribeClient + FlushClient,
200200
C: GetElapsed,
201-
J: JetStreamPublisher + JetStreamConsumerFactory,
201+
J: JetStreamPublisher + JetStreamGetStream,
202202
> Agent for Bridge<N, C, J>
203203
where
204-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
204+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
205205
{
206206
async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse> {
207207
initialize::handle(self, args).await

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
55
use tracing::{info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: CloseSessionRequest,
2121
) -> Result<CloseSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3838
)
3939
})?;
4040

41-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
41+
let subject = agent::ext_notify(bridge.config.acp_prefix(), method_name.as_str());
4242

4343
let publish_result = nats::publish(
4444
bridge.nats(),

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
55
use tracing::{Span, info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: ForkSessionRequest,
2121
) -> Result<ForkSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

rsworkspace/crates/acp-nats/src/agent/js_request.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::time::Duration;
77
use tokio::time::timeout;
88
use trogon_nats::REQ_ID_HEADER;
99
use trogon_nats::jetstream::{
10-
JetStreamConsumer as _, JetStreamConsumerFactory, JetStreamPublisher, JsAck as _,
11-
JsAckWith as _, JsMessageRef as _, JsRequestMessage,
10+
JetStreamConsumer as _, JetStreamCreateConsumer as _, JetStreamGetStream, JetStreamPublisher,
11+
JsAck as _, JsAckWith as _, JsMessageRef as _, JsRequestMessage,
1212
};
1313
use trogon_std::JsonSerialize;
1414

@@ -27,8 +27,8 @@ pub async fn js_request<J, Req, Res, S>(
2727
operation_timeout: Duration,
2828
) -> agent_client_protocol::Result<Res>
2929
where
30-
J: JetStreamPublisher + JetStreamConsumerFactory,
31-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
30+
J: JetStreamPublisher + JetStreamGetStream,
31+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
3232
Req: serde::Serialize,
3333
Res: DeserializeOwned,
3434
S: JsonSerialize,
@@ -37,22 +37,22 @@ where
3737
// runner responds before we start consuming. DeliverAll replays from stream start.
3838
let responses_stream = streams::responses_stream_name(prefix);
3939
let resp_config = consumers::response_consumer(prefix, session_id, req_id);
40-
let resp_consumer: J::Consumer = js
41-
.create_consumer(&responses_stream, resp_config)
40+
let stream = js
41+
.get_stream(&responses_stream)
4242
.await
43-
.map_err(|e| {
44-
Error::new(
45-
ErrorCode::InternalError.into(),
46-
format!("create response consumer: {e}"),
47-
)
48-
})?;
49-
let mut resp_messages: <J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Messages =
50-
resp_consumer.messages().await.map_err(|e| {
51-
Error::new(
52-
ErrorCode::InternalError.into(),
53-
format!("response messages: {e}"),
54-
)
55-
})?;
43+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("get stream: {e}")))?;
44+
let resp_consumer = stream.create_consumer(resp_config).await.map_err(|e| {
45+
Error::new(
46+
ErrorCode::InternalError.into(),
47+
format!("create response consumer: {e}"),
48+
)
49+
})?;
50+
let mut resp_messages = resp_consumer.messages().await.map_err(|e| {
51+
Error::new(
52+
ErrorCode::InternalError.into(),
53+
format!("response messages: {e}"),
54+
)
55+
})?;
5656

5757
let payload_bytes = serializer
5858
.to_vec(request)
@@ -62,9 +62,11 @@ where
6262
headers.insert(REQ_ID_HEADER, req_id);
6363
headers.insert(SESSION_ID_HEADER, session_id);
6464

65-
js.js_publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
65+
js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
66+
.await
67+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
6668
.await
67-
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?;
69+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js ack: {e}")))?;
6870

6971
match timeout(operation_timeout, resp_messages.next()).await {
7072
Ok(Some(Ok(js_msg))) => {

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::nats::{FlushClient, PublishClient, RequestClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
55
use tracing::{info, instrument};
6-
use trogon_nats::jetstream::{JetStreamConsumerFactory, JetStreamPublisher, JsRequestMessage};
6+
use trogon_nats::jetstream::{JetStreamGetStream, JetStreamPublisher, JsRequestMessage};
77
use trogon_std::time::GetElapsed;
88

99
#[instrument(
@@ -14,13 +14,13 @@ use trogon_std::time::GetElapsed;
1414
pub async fn handle<
1515
N: RequestClient + PublishClient + FlushClient,
1616
C: GetElapsed,
17-
J: JetStreamPublisher + JetStreamConsumerFactory,
17+
J: JetStreamPublisher + JetStreamGetStream,
1818
>(
1919
bridge: &Bridge<N, C, J>,
2020
args: LoadSessionRequest,
2121
) -> Result<LoadSessionResponse>
2222
where
23-
<J::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
23+
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsRequestMessage,
2424
{
2525
let start = bridge.clock.now();
2626

0 commit comments

Comments
 (0)