Skip to content

Commit 9d1cc90

Browse files
committed
refactor: decouple ClaimCheckPublisher from JetStreamPublisher trait
Split serve() to accept JetStreamContext and ClaimCheckPublisher as separate parameters instead of requiring a single type that implements both traits. ClaimCheckPublisher is now a plain struct with a publish_event method — no trait implementations, no type erasure. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 224895d commit 9d1cc90

11 files changed

Lines changed: 429 additions & 369 deletions

File tree

rsworkspace/crates/trogon-nats/src/jetstream/claim_check.rs

Lines changed: 94 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
use std::fmt;
2-
use std::future::IntoFuture;
3-
use std::pin::Pin;
2+
use std::time::Duration;
43

54
use async_nats::HeaderMap;
6-
use async_nats::jetstream::publish::PublishAck;
7-
use async_nats::subject::ToSubject;
85
use bytes::Bytes;
9-
10-
use async_nats::jetstream::stream;
6+
use tracing::error;
117

128
use super::object_store::{ObjectStoreGet, ObjectStorePut};
13-
use super::traits::{JetStreamContext, JetStreamPublisher};
9+
use super::publish::PublishOutcome;
10+
use super::traits::JetStreamPublisher;
1411

1512
pub const HEADER_CLAIM_CHECK: &str = "Trogon-Claim-Check";
1613
pub const HEADER_CLAIM_BUCKET: &str = "Trogon-Claim-Bucket";
@@ -89,42 +86,18 @@ impl<E: fmt::Display + fmt::Debug + Send + Sync + 'static> std::error::Error
8986
{
9087
}
9188

92-
#[derive(Debug)]
93-
pub enum ClaimCheckPublishError<PE: fmt::Display, SE: fmt::Display> {
94-
PublishFailed(PE),
95-
StoreFailed(SE),
96-
}
97-
98-
impl<PE: fmt::Display, SE: fmt::Display> fmt::Display for ClaimCheckPublishError<PE, SE> {
99-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100-
match self {
101-
Self::PublishFailed(e) => write!(f, "publish failed: {e}"),
102-
Self::StoreFailed(e) => {
103-
write!(f, "claim check object store put failed: {e}")
104-
}
105-
}
106-
}
107-
}
108-
109-
impl<
110-
PE: fmt::Display + fmt::Debug + Send + Sync + 'static,
111-
SE: fmt::Display + fmt::Debug + Send + Sync + 'static,
112-
> std::error::Error for ClaimCheckPublishError<PE, SE>
113-
{
114-
}
115-
11689
#[derive(Clone, Debug)]
11790
pub struct ClaimCheckPublisher<P, S> {
118-
inner: P,
91+
publisher: P,
11992
store: S,
12093
bucket_name: String,
12194
max_payload: MaxPayload,
12295
}
12396

12497
impl<P, S> ClaimCheckPublisher<P, S> {
125-
pub fn new(inner: P, store: S, bucket_name: String, max_payload: MaxPayload) -> Self {
98+
pub fn new(publisher: P, store: S, bucket_name: String, max_payload: MaxPayload) -> Self {
12699
Self {
127-
inner,
100+
publisher,
128101
store,
129102
bucket_name,
130103
max_payload,
@@ -137,105 +110,48 @@ fn claim_object_key(subject: &str) -> String {
137110
format!("{subject}/{id}")
138111
}
139112

140-
pub struct ClaimCheckAckFuture<PE, SE>
141-
where
142-
PE: fmt::Display + Send,
143-
SE: fmt::Display + Send,
144-
{
145-
inner: Pin<Box<dyn std::future::Future<Output = Result<PublishAck, PE>> + Send>>,
146-
_se: std::marker::PhantomData<SE>,
147-
}
148-
149-
impl<PE, SE> IntoFuture for ClaimCheckAckFuture<PE, SE>
150-
where
151-
PE: fmt::Display + Send + 'static,
152-
SE: fmt::Display + Send + 'static,
153-
{
154-
type Output = Result<PublishAck, ClaimCheckPublishError<PE, SE>>;
155-
type IntoFuture = Pin<
156-
Box<
157-
dyn std::future::Future<Output = Result<PublishAck, ClaimCheckPublishError<PE, SE>>>
158-
+ Send,
159-
>,
160-
>;
161-
162-
fn into_future(self) -> Self::IntoFuture {
163-
Box::pin(async move {
164-
self.inner
165-
.await
166-
.map_err(ClaimCheckPublishError::PublishFailed)
167-
})
168-
}
169-
}
170-
171-
impl<P, S> JetStreamPublisher for ClaimCheckPublisher<P, S>
172-
where
173-
P: JetStreamPublisher,
174-
S: ObjectStorePut,
175-
{
176-
type PublishError = ClaimCheckPublishError<P::PublishError, S::Error>;
177-
type AckFuture = ClaimCheckAckFuture<P::PublishError, S::Error>;
178-
179-
async fn publish_with_headers<Sub: ToSubject + Send>(
113+
impl<P: JetStreamPublisher, S: ObjectStorePut> ClaimCheckPublisher<P, S> {
114+
pub async fn publish_event(
180115
&self,
181-
subject: Sub,
116+
subject: String,
182117
headers: HeaderMap,
183118
payload: Bytes,
184-
) -> Result<Self::AckFuture, Self::PublishError> {
119+
ack_timeout: Duration,
120+
) -> PublishOutcome<P::PublishError> {
185121
if payload.len() <= self.max_payload.threshold() {
186-
let ack = self
187-
.inner
188-
.publish_with_headers(subject, headers, payload)
189-
.await
190-
.map_err(ClaimCheckPublishError::PublishFailed)?;
191-
return Ok(ClaimCheckAckFuture {
192-
inner: Box::pin(ack.into_future()),
193-
_se: std::marker::PhantomData,
194-
});
122+
return super::publish::publish_event(
123+
&self.publisher,
124+
subject,
125+
headers,
126+
payload,
127+
ack_timeout,
128+
)
129+
.await;
195130
}
196131

197-
let subject_str = subject.to_subject();
198-
let key = claim_object_key(&subject_str);
132+
let key = claim_object_key(&subject);
199133

200134
// Store-then-publish: if publish fails, the object becomes orphaned.
201135
// Cleanup relies on the object store bucket's TTL — see #101.
202136
let mut cursor = std::io::Cursor::new(payload);
203-
self.store
204-
.put(&key, &mut cursor)
205-
.await
206-
.map_err(ClaimCheckPublishError::StoreFailed)?;
137+
if let Err(e) = self.store.put(&key, &mut cursor).await {
138+
error!(error = %e, "claim check: failed to store payload in object store");
139+
return PublishOutcome::StoreFailed;
140+
}
207141

208142
let mut claim_headers = headers;
209143
claim_headers.insert(HEADER_CLAIM_CHECK, CLAIM_CHECK_VERSION);
210144
claim_headers.insert(HEADER_CLAIM_BUCKET, self.bucket_name.as_str());
211145
claim_headers.insert(HEADER_CLAIM_KEY, key.as_str());
212146

213-
let ack = self
214-
.inner
215-
.publish_with_headers(subject_str, claim_headers, Bytes::new())
216-
.await
217-
.map_err(ClaimCheckPublishError::PublishFailed)?;
218-
219-
Ok(ClaimCheckAckFuture {
220-
inner: Box::pin(ack.into_future()),
221-
_se: std::marker::PhantomData,
222-
})
223-
}
224-
}
225-
226-
impl<P, S> JetStreamContext for ClaimCheckPublisher<P, S>
227-
where
228-
P: JetStreamContext,
229-
S: Send + Sync + Clone + 'static,
230-
{
231-
type Error = P::Error;
232-
type Stream = P::Stream;
233-
234-
fn get_or_create_stream<C: Into<stream::Config> + Send>(
235-
&self,
236-
config: C,
237-
) -> impl std::future::Future<Output = Result<Self::Stream, Self::Error>> + Send {
238-
self.inner.get_or_create_stream(config)
147+
super::publish::publish_event(
148+
&self.publisher,
149+
subject,
150+
claim_headers,
151+
Bytes::new(),
152+
ack_timeout,
153+
)
154+
.await
239155
}
240156
}
241157

@@ -289,58 +205,57 @@ mod integration_tests {
289205
use super::*;
290206
use crate::jetstream::mocks::MockJetStreamPublisher;
291207
use crate::jetstream::mocks::MockObjectStore;
292-
use crate::jetstream::publish::publish_event;
293208

294209
#[tokio::test]
295210
async fn small_payload_publishes_directly() {
296-
let inner = MockJetStreamPublisher::new();
211+
let publisher = MockJetStreamPublisher::new();
297212
let store = MockObjectStore::new();
298-
let publisher = ClaimCheckPublisher::new(
299-
inner.clone(),
213+
let cc = ClaimCheckPublisher::new(
214+
publisher.clone(),
300215
store.clone(),
301216
"test-bucket".to_string(),
302217
MaxPayload::from_server_limit(1024 + PROTOCOL_OVERHEAD),
303218
);
304219

305-
let outcome = publish_event(
306-
&publisher,
307-
"test.subject".to_string(),
308-
HeaderMap::new(),
309-
Bytes::from(vec![0u8; 512]),
310-
Duration::from_secs(5),
311-
)
312-
.await;
220+
let outcome = cc
221+
.publish_event(
222+
"test.subject".to_string(),
223+
HeaderMap::new(),
224+
Bytes::from(vec![0u8; 512]),
225+
Duration::from_secs(5),
226+
)
227+
.await;
313228

314229
assert!(outcome.is_ok());
315-
let messages = inner.published_messages();
230+
let messages = publisher.published_messages();
316231
assert_eq!(messages.len(), 1);
317232
assert_eq!(messages[0].payload.len(), 512);
318233
assert!(store.stored_objects().is_empty());
319234
}
320235

321236
#[tokio::test]
322237
async fn large_payload_stores_in_object_store_and_publishes_claim() {
323-
let inner = MockJetStreamPublisher::new();
238+
let publisher = MockJetStreamPublisher::new();
324239
let store = MockObjectStore::new();
325-
let publisher = ClaimCheckPublisher::new(
326-
inner.clone(),
240+
let cc = ClaimCheckPublisher::new(
241+
publisher.clone(),
327242
store.clone(),
328243
"test-bucket".to_string(),
329244
MaxPayload::from_server_limit(1024 + PROTOCOL_OVERHEAD),
330245
);
331246

332-
let outcome = publish_event(
333-
&publisher,
334-
"test.subject".to_string(),
335-
HeaderMap::new(),
336-
Bytes::from(vec![0u8; 2048]),
337-
Duration::from_secs(5),
338-
)
339-
.await;
247+
let outcome = cc
248+
.publish_event(
249+
"test.subject".to_string(),
250+
HeaderMap::new(),
251+
Bytes::from(vec![0u8; 2048]),
252+
Duration::from_secs(5),
253+
)
254+
.await;
340255

341256
assert!(outcome.is_ok());
342257

343-
let messages = inner.published_messages();
258+
let messages = publisher.published_messages();
344259
assert_eq!(messages.len(), 1);
345260
assert!(messages[0].payload.is_empty());
346261
assert_eq!(
@@ -375,60 +290,60 @@ mod integration_tests {
375290

376291
#[tokio::test]
377292
async fn payload_at_exact_threshold_publishes_directly() {
378-
let inner = MockJetStreamPublisher::new();
293+
let publisher = MockJetStreamPublisher::new();
379294
let store = MockObjectStore::new();
380-
let publisher = ClaimCheckPublisher::new(
381-
inner.clone(),
295+
let cc = ClaimCheckPublisher::new(
296+
publisher.clone(),
382297
store.clone(),
383298
"test-bucket".to_string(),
384299
MaxPayload::from_server_limit(1024 + PROTOCOL_OVERHEAD),
385300
);
386301

387-
let outcome = publish_event(
388-
&publisher,
389-
"test.subject".to_string(),
390-
HeaderMap::new(),
391-
Bytes::from(vec![0u8; 1024]),
392-
Duration::from_secs(5),
393-
)
394-
.await;
302+
let outcome = cc
303+
.publish_event(
304+
"test.subject".to_string(),
305+
HeaderMap::new(),
306+
Bytes::from(vec![0u8; 1024]),
307+
Duration::from_secs(5),
308+
)
309+
.await;
395310

396311
assert!(outcome.is_ok());
397-
assert_eq!(inner.published_messages()[0].payload.len(), 1024);
312+
assert_eq!(publisher.published_messages()[0].payload.len(), 1024);
398313
assert!(store.stored_objects().is_empty());
399314
}
400315

401316
#[tokio::test]
402-
async fn object_store_failure_returns_publish_failed() {
403-
let inner = MockJetStreamPublisher::new();
317+
async fn object_store_failure_returns_store_failed() {
318+
let publisher = MockJetStreamPublisher::new();
404319
let store = MockObjectStore::new();
405320
store.fail_next_put();
406-
let publisher = ClaimCheckPublisher::new(
407-
inner.clone(),
321+
let cc = ClaimCheckPublisher::new(
322+
publisher.clone(),
408323
store,
409324
"test-bucket".to_string(),
410325
MaxPayload::from_server_limit(1024 + PROTOCOL_OVERHEAD),
411326
);
412327

413-
let outcome = publish_event(
414-
&publisher,
415-
"test.subject".to_string(),
416-
HeaderMap::new(),
417-
Bytes::from(vec![0u8; 2048]),
418-
Duration::from_secs(5),
419-
)
420-
.await;
328+
let outcome = cc
329+
.publish_event(
330+
"test.subject".to_string(),
331+
HeaderMap::new(),
332+
Bytes::from(vec![0u8; 2048]),
333+
Duration::from_secs(5),
334+
)
335+
.await;
421336

422337
assert!(!outcome.is_ok());
423-
assert!(inner.published_messages().is_empty());
338+
assert!(publisher.published_messages().is_empty());
424339
}
425340

426341
#[tokio::test]
427342
async fn large_payload_preserves_original_headers() {
428-
let inner = MockJetStreamPublisher::new();
343+
let publisher = MockJetStreamPublisher::new();
429344
let store = MockObjectStore::new();
430-
let publisher = ClaimCheckPublisher::new(
431-
inner.clone(),
345+
let cc = ClaimCheckPublisher::new(
346+
publisher.clone(),
432347
store,
433348
"test-bucket".to_string(),
434349
MaxPayload::from_server_limit(1024 + PROTOCOL_OVERHEAD),
@@ -437,17 +352,17 @@ mod integration_tests {
437352
let mut headers = HeaderMap::new();
438353
headers.insert("X-Custom", "value");
439354

440-
let outcome = publish_event(
441-
&publisher,
442-
"test.subject".to_string(),
443-
headers,
444-
Bytes::from(vec![0u8; 2048]),
445-
Duration::from_secs(5),
446-
)
447-
.await;
355+
let outcome = cc
356+
.publish_event(
357+
"test.subject".to_string(),
358+
headers,
359+
Bytes::from(vec![0u8; 2048]),
360+
Duration::from_secs(5),
361+
)
362+
.await;
448363

449364
assert!(outcome.is_ok());
450-
let msg = &inner.published_messages()[0];
365+
let msg = &publisher.published_messages()[0];
451366
assert_eq!(msg.headers.get("X-Custom").unwrap().as_str(), "value");
452367
assert!(msg.headers.get(HEADER_CLAIM_CHECK).is_some());
453368
}

0 commit comments

Comments
 (0)