Skip to content

Commit 3afd587

Browse files
committed
refactor(trogon-nats): zero-wrapping JetStream trait alignment with async_nats
- publish_with_headers returns async_nats PublishAckFuture directly via IntoFuture - get_or_create_stream returns async_nats Stream directly via associated type - Both use async_nats error types directly — no mapping - Renamed js_publish_with_headers to publish_with_headers - Removed JsPublishAckFuture wrapper and ReadyAckFuture - Mocks use std::future::Ready, no custom future types Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent b6d29b3 commit 3afd587

7 files changed

Lines changed: 60 additions & 41 deletions

File tree

rsworkspace/crates/acp-nats/src/agent/js_request.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ where
6262
headers.insert(REQ_ID_HEADER, req_id);
6363
headers.insert(SESSION_ID_HEADER, session_id);
6464

65-
js.js_publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
65+
js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
6666
.await
67-
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?;
67+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
68+
.await
69+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js ack: {e}")))?;
6870

6971
match timeout(operation_timeout, resp_messages.next()).await {
7072
Ok(Some(Ok(js_msg))) => {

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,11 @@ where
262262
headers.insert(SESSION_ID_HEADER, sid);
263263

264264
let prompt_subject = session::agent::prompt(prefix, sid);
265-
js.js_publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
265+
js.publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
266266
.await
267-
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?;
267+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
268+
.await
269+
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js ack: {e}")))?;
268270

269271
let op_timeout = bridge.config.prompt_timeout();
270272

rsworkspace/crates/acp-nats/src/agent/test_support.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,17 @@ impl MockJs {
4141

4242
impl trogon_nats::jetstream::JetStreamPublisher for MockJs {
4343
type PublishError = trogon_nats::mocks::MockError;
44+
type AckFuture =
45+
std::future::Ready<Result<async_nats::jetstream::publish::PublishAck, Self::PublishError>>;
4446

45-
async fn js_publish_with_headers<S: async_nats::subject::ToSubject + Send>(
47+
async fn publish_with_headers<S: async_nats::subject::ToSubject + Send>(
4648
&self,
4749
subject: S,
4850
headers: async_nats::HeaderMap,
4951
payload: bytes::Bytes,
50-
) -> Result<async_nats::jetstream::publish::PublishAck, Self::PublishError> {
52+
) -> Result<Self::AckFuture, Self::PublishError> {
5153
self.publisher
52-
.js_publish_with_headers(subject, headers, payload)
54+
.publish_with_headers(subject, headers, payload)
5355
.await
5456
}
5557
}

rsworkspace/crates/trogon-nats/src/jetstream/client.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use async_nats::HeaderMap;
22
use async_nats::jetstream;
33
use async_nats::jetstream::AckKind;
44
use async_nats::jetstream::consumer::pull;
5-
use async_nats::jetstream::publish::PublishAck;
65
use async_nats::jetstream::stream;
76
use async_nats::subject::ToSubject;
87
use bytes::Bytes;
@@ -40,35 +39,33 @@ impl std::fmt::Display for JetStreamError {
4039
impl std::error::Error for JetStreamError {}
4140

4241
impl JetStreamContext for NatsJetStreamClient {
43-
type Error = JetStreamError;
42+
type Error = async_nats::jetstream::context::CreateStreamError;
43+
type Stream = jetstream::stream::Stream;
4444

4545
async fn get_or_create_stream<S: Into<stream::Config> + Send>(
4646
&self,
4747
config: S,
48-
) -> Result<(), JetStreamError> {
49-
self.context
50-
.get_or_create_stream(config)
51-
.await
52-
.map(|_| ())
53-
.map_err(|e| JetStreamError(e.to_string()))
48+
) -> Result<jetstream::stream::Stream, async_nats::jetstream::context::CreateStreamError> {
49+
self.context.get_or_create_stream(config).await
5450
}
5551
}
5652

53+
pub type PublishError = async_nats::jetstream::context::PublishError;
54+
pub type PublishAckFuture = async_nats::jetstream::context::PublishAckFuture;
55+
5756
impl JetStreamPublisher for NatsJetStreamClient {
58-
type PublishError = JetStreamError;
57+
type PublishError = PublishError;
58+
type AckFuture = PublishAckFuture;
5959

60-
async fn js_publish_with_headers<S: ToSubject + Send>(
60+
async fn publish_with_headers<S: ToSubject + Send>(
6161
&self,
6262
subject: S,
6363
headers: HeaderMap,
6464
payload: Bytes,
65-
) -> Result<PublishAck, JetStreamError> {
65+
) -> Result<PublishAckFuture, PublishError> {
6666
self.context
6767
.publish_with_headers(subject, headers, payload)
6868
.await
69-
.map_err(|e| JetStreamError(e.to_string()))?
70-
.await
71-
.map_err(|e| JetStreamError(e.to_string()))
7269
}
7370
}
7471

rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ impl Default for MockJetStreamContext {
171171

172172
impl JetStreamContext for MockJetStreamContext {
173173
type Error = MockError;
174+
type Stream = ();
174175

175176
async fn get_or_create_stream<S: Into<stream::Config> + Send>(
176177
&self,
@@ -254,13 +255,14 @@ impl Default for MockJetStreamPublisher {
254255

255256
impl JetStreamPublisher for MockJetStreamPublisher {
256257
type PublishError = MockError;
258+
type AckFuture = std::future::Ready<Result<PublishAck, MockError>>;
257259

258-
async fn js_publish_with_headers<S: ToSubject + Send>(
260+
async fn publish_with_headers<S: ToSubject + Send>(
259261
&self,
260262
subject: S,
261263
headers: HeaderMap,
262264
payload: Bytes,
263-
) -> Result<PublishAck, MockError> {
265+
) -> Result<Self::AckFuture, MockError> {
264266
let subject = subject.to_subject().to_string();
265267
let should_fail = {
266268
let mut count = self.publish_fail_count.lock().unwrap();
@@ -288,13 +290,13 @@ impl JetStreamPublisher for MockJetStreamPublisher {
288290
current
289291
};
290292

291-
Ok(PublishAck {
293+
Ok(std::future::ready(Ok(PublishAck {
292294
stream: "mock-stream".to_string(),
293295
sequence: seq,
294296
domain: String::new(),
295297
duplicate: false,
296298
value: None,
297-
})
299+
})))
298300
}
299301
}
300302

@@ -478,12 +480,14 @@ mod tests {
478480
async fn mock_publisher_records_publishes() {
479481
let pub_mock = MockJetStreamPublisher::new();
480482
let ack = pub_mock
481-
.js_publish_with_headers(
483+
.publish_with_headers(
482484
"test.subject".to_string(),
483485
HeaderMap::new(),
484486
Bytes::from("hello"),
485487
)
486488
.await
489+
.unwrap()
490+
.await
487491
.unwrap();
488492
assert_eq!(ack.sequence, 1);
489493
assert_eq!(pub_mock.published_subjects(), vec!["test.subject"]);
@@ -494,11 +498,15 @@ mod tests {
494498
async fn mock_publisher_increments_sequence() {
495499
let pub_mock = MockJetStreamPublisher::new();
496500
let ack1 = pub_mock
497-
.js_publish_with_headers("a".to_string(), HeaderMap::new(), Bytes::new())
501+
.publish_with_headers("a".to_string(), HeaderMap::new(), Bytes::new())
502+
.await
503+
.unwrap()
498504
.await
499505
.unwrap();
500506
let ack2 = pub_mock
501-
.js_publish_with_headers("b".to_string(), HeaderMap::new(), Bytes::new())
507+
.publish_with_headers("b".to_string(), HeaderMap::new(), Bytes::new())
508+
.await
509+
.unwrap()
502510
.await
503511
.unwrap();
504512
assert_eq!(ack1.sequence, 1);
@@ -510,12 +518,12 @@ mod tests {
510518
let pub_mock = MockJetStreamPublisher::new();
511519
pub_mock.fail_next_js_publish();
512520
let result = pub_mock
513-
.js_publish_with_headers("test".to_string(), HeaderMap::new(), Bytes::new())
521+
.publish_with_headers("test".to_string(), HeaderMap::new(), Bytes::new())
514522
.await;
515523
assert!(result.is_err());
516524

517525
let result = pub_mock
518-
.js_publish_with_headers("test".to_string(), HeaderMap::new(), Bytes::new())
526+
.publish_with_headers("test".to_string(), HeaderMap::new(), Bytes::new())
519527
.await;
520528
assert!(result.is_ok());
521529
}
@@ -589,19 +597,21 @@ mod tests {
589597
pub_mock.fail_js_publish_count(2);
590598
assert!(
591599
pub_mock
592-
.js_publish_with_headers("a".to_string(), HeaderMap::new(), Bytes::new())
600+
.publish_with_headers("a".to_string(), HeaderMap::new(), Bytes::new())
593601
.await
594602
.is_err()
595603
);
596604
assert!(
597605
pub_mock
598-
.js_publish_with_headers("b".to_string(), HeaderMap::new(), Bytes::new())
606+
.publish_with_headers("b".to_string(), HeaderMap::new(), Bytes::new())
599607
.await
600608
.is_err()
601609
);
602610
assert!(
603611
pub_mock
604-
.js_publish_with_headers("c".to_string(), HeaderMap::new(), Bytes::new())
612+
.publish_with_headers("c".to_string(), HeaderMap::new(), Bytes::new())
613+
.await
614+
.unwrap()
605615
.await
606616
.is_ok()
607617
);

rsworkspace/crates/trogon-nats/src/jetstream/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ pub mod traits;
77
pub mod mocks;
88

99
#[cfg(not(coverage))]
10-
pub use client::{JetStreamError, NatsJetStreamClient, NatsJetStreamConsumer, NatsJsMessage};
10+
pub use client::{
11+
JetStreamError, NatsJetStreamClient, NatsJetStreamConsumer, NatsJsMessage, PublishAckFuture,
12+
PublishError,
13+
};
1114
pub use message::{
1215
JsAck, JsAckWith, JsDispatchMessage, JsDoubleAck, JsDoubleAckWith, JsMessageRef,
1316
JsRequestMessage,

rsworkspace/crates/trogon-nats/src/jetstream/traits.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::error::Error;
2-
use std::future::Future;
2+
use std::future::{Future, IntoFuture};
33

44
use async_nats::HeaderMap;
55
use async_nats::jetstream::consumer::pull;
@@ -11,22 +11,24 @@ use futures::Stream;
1111

1212
pub trait JetStreamContext: Send + Sync + Clone + 'static {
1313
type Error: Error + Send + Sync;
14+
type Stream: Send;
1415

1516
fn get_or_create_stream<S: Into<stream::Config> + Send>(
1617
&self,
1718
config: S,
18-
) -> impl Future<Output = Result<(), Self::Error>> + Send;
19+
) -> impl Future<Output = Result<Self::Stream, Self::Error>> + Send;
1920
}
2021

2122
pub trait JetStreamPublisher: Send + Sync + Clone + 'static {
2223
type PublishError: Error + Send + Sync;
24+
type AckFuture: IntoFuture<Output = Result<PublishAck, Self::PublishError>> + Send;
2325

24-
fn js_publish_with_headers<S: ToSubject + Send>(
26+
fn publish_with_headers<S: ToSubject + Send>(
2527
&self,
2628
subject: S,
2729
headers: HeaderMap,
2830
payload: Bytes,
29-
) -> impl Future<Output = Result<PublishAck, Self::PublishError>> + Send;
31+
) -> impl Future<Output = Result<Self::AckFuture, Self::PublishError>> + Send;
3032
}
3133

3234
pub trait JetStreamConsumerFactory: Send + Sync + Clone + 'static {
@@ -64,13 +66,14 @@ impl Error for NoJetStream {}
6466

6567
impl JetStreamPublisher for () {
6668
type PublishError = NoJetStream;
69+
type AckFuture = std::future::Ready<Result<PublishAck, NoJetStream>>;
6770

68-
async fn js_publish_with_headers<S: ToSubject + Send>(
71+
async fn publish_with_headers<S: ToSubject + Send>(
6972
&self,
7073
_subject: S,
7174
_headers: HeaderMap,
7275
_payload: Bytes,
73-
) -> Result<PublishAck, NoJetStream> {
76+
) -> Result<Self::AckFuture, NoJetStream> {
7477
Err(NoJetStream)
7578
}
7679
}
@@ -194,7 +197,7 @@ mod tests {
194197

195198
#[tokio::test]
196199
async fn unit_publisher_returns_err() {
197-
let result = ().js_publish_with_headers("s", HeaderMap::new(), Bytes::new()).await;
200+
let result = ().publish_with_headers("s", HeaderMap::new(), Bytes::new()).await;
198201
assert!(result.is_err());
199202
}
200203

0 commit comments

Comments
 (0)