Skip to content

Commit b6d29b3

Browse files
authored
refactor(trogon-nats): align JetStream trait signatures with async_nats (#73)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 3713f05 commit b6d29b3

4 files changed

Lines changed: 26 additions & 15 deletions

File tree

rsworkspace/crates/acp-nats/src/agent/test_support.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ impl MockJs {
4242
impl trogon_nats::jetstream::JetStreamPublisher for MockJs {
4343
type PublishError = trogon_nats::mocks::MockError;
4444

45-
async fn js_publish_with_headers(
45+
async fn js_publish_with_headers<S: async_nats::subject::ToSubject + Send>(
4646
&self,
47-
subject: String,
47+
subject: S,
4848
headers: async_nats::HeaderMap,
4949
payload: bytes::Bytes,
5050
) -> Result<async_nats::jetstream::publish::PublishAck, Self::PublishError> {

rsworkspace/crates/trogon-nats/src/jetstream/client.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use async_nats::jetstream::AckKind;
44
use async_nats::jetstream::consumer::pull;
55
use async_nats::jetstream::publish::PublishAck;
66
use async_nats::jetstream::stream;
7+
use async_nats::subject::ToSubject;
78
use bytes::Bytes;
89
use futures::StreamExt;
910

@@ -41,7 +42,10 @@ impl std::error::Error for JetStreamError {}
4142
impl JetStreamContext for NatsJetStreamClient {
4243
type Error = JetStreamError;
4344

44-
async fn get_or_create_stream(&self, config: stream::Config) -> Result<(), JetStreamError> {
45+
async fn get_or_create_stream<S: Into<stream::Config> + Send>(
46+
&self,
47+
config: S,
48+
) -> Result<(), JetStreamError> {
4549
self.context
4650
.get_or_create_stream(config)
4751
.await
@@ -53,9 +57,9 @@ impl JetStreamContext for NatsJetStreamClient {
5357
impl JetStreamPublisher for NatsJetStreamClient {
5458
type PublishError = JetStreamError;
5559

56-
async fn js_publish_with_headers(
60+
async fn js_publish_with_headers<S: ToSubject + Send>(
5761
&self,
58-
subject: String,
62+
subject: S,
5963
headers: HeaderMap,
6064
payload: Bytes,
6165
) -> Result<PublishAck, JetStreamError> {

rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use async_nats::jetstream::AckKind;
66
use async_nats::jetstream::consumer::pull;
77
use async_nats::jetstream::publish::PublishAck;
88
use async_nats::jetstream::stream;
9+
use async_nats::subject::ToSubject;
910
use bytes::Bytes;
1011
use futures::channel::mpsc;
1112
use futures::stream::BoxStream;
@@ -171,7 +172,11 @@ impl Default for MockJetStreamContext {
171172
impl JetStreamContext for MockJetStreamContext {
172173
type Error = MockError;
173174

174-
async fn get_or_create_stream(&self, config: stream::Config) -> Result<(), MockError> {
175+
async fn get_or_create_stream<S: Into<stream::Config> + Send>(
176+
&self,
177+
config: S,
178+
) -> Result<(), MockError> {
179+
let config = config.into();
175180
let should_fail = {
176181
let mut flag = self.should_fail.lock().unwrap();
177182
if *flag {
@@ -250,12 +255,13 @@ impl Default for MockJetStreamPublisher {
250255
impl JetStreamPublisher for MockJetStreamPublisher {
251256
type PublishError = MockError;
252257

253-
async fn js_publish_with_headers(
258+
async fn js_publish_with_headers<S: ToSubject + Send>(
254259
&self,
255-
subject: String,
260+
subject: S,
256261
headers: HeaderMap,
257262
payload: Bytes,
258263
) -> Result<PublishAck, MockError> {
264+
let subject = subject.to_subject().to_string();
259265
let should_fail = {
260266
let mut count = self.publish_fail_count.lock().unwrap();
261267
if *count > 0 {

rsworkspace/crates/trogon-nats/src/jetstream/traits.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,25 @@ use async_nats::HeaderMap;
55
use async_nats::jetstream::consumer::pull;
66
use async_nats::jetstream::publish::PublishAck;
77
use async_nats::jetstream::stream;
8+
use async_nats::subject::ToSubject;
89
use bytes::Bytes;
910
use futures::Stream;
1011

1112
pub trait JetStreamContext: Send + Sync + Clone + 'static {
1213
type Error: Error + Send + Sync;
1314

14-
fn get_or_create_stream(
15+
fn get_or_create_stream<S: Into<stream::Config> + Send>(
1516
&self,
16-
config: stream::Config,
17+
config: S,
1718
) -> impl Future<Output = Result<(), Self::Error>> + Send;
1819
}
1920

2021
pub trait JetStreamPublisher: Send + Sync + Clone + 'static {
2122
type PublishError: Error + Send + Sync;
2223

23-
fn js_publish_with_headers(
24+
fn js_publish_with_headers<S: ToSubject + Send>(
2425
&self,
25-
subject: String,
26+
subject: S,
2627
headers: HeaderMap,
2728
payload: Bytes,
2829
) -> impl Future<Output = Result<PublishAck, Self::PublishError>> + Send;
@@ -64,9 +65,9 @@ impl Error for NoJetStream {}
6465
impl JetStreamPublisher for () {
6566
type PublishError = NoJetStream;
6667

67-
async fn js_publish_with_headers(
68+
async fn js_publish_with_headers<S: ToSubject + Send>(
6869
&self,
69-
_subject: String,
70+
_subject: S,
7071
_headers: HeaderMap,
7172
_payload: Bytes,
7273
) -> Result<PublishAck, NoJetStream> {
@@ -193,7 +194,7 @@ mod tests {
193194

194195
#[tokio::test]
195196
async fn unit_publisher_returns_err() {
196-
let result = ().js_publish_with_headers("s".into(), HeaderMap::new(), Bytes::new()).await;
197+
let result = ().js_publish_with_headers("s", HeaderMap::new(), Bytes::new()).await;
197198
assert!(result.is_err());
198199
}
199200

0 commit comments

Comments
 (0)