|
| 1 | +use std::fmt; |
| 2 | +use std::time::Duration; |
| 3 | + |
| 4 | +use bytes::Bytes; |
| 5 | +use tracing::error; |
| 6 | + |
| 7 | +use crate::jetstream::JetStreamPublisher; |
| 8 | + |
| 9 | +#[derive(Debug)] |
| 10 | +pub enum PublishOutcome<E: fmt::Display> { |
| 11 | + Published, |
| 12 | + PublishFailed(E), |
| 13 | + AckFailed(E), |
| 14 | + AckTimedOut(Duration), |
| 15 | +} |
| 16 | + |
| 17 | +impl<E: fmt::Display> PublishOutcome<E> { |
| 18 | + pub fn is_ok(&self) -> bool { |
| 19 | + matches!(self, PublishOutcome::Published) |
| 20 | + } |
| 21 | + |
| 22 | + pub fn log_on_error(&self, source_name: &str) { |
| 23 | + match self { |
| 24 | + PublishOutcome::Published => {} |
| 25 | + PublishOutcome::PublishFailed(e) => { |
| 26 | + error!(error = %e, source = source_name, "Failed to publish event to NATS"); |
| 27 | + } |
| 28 | + PublishOutcome::AckFailed(e) => { |
| 29 | + error!(error = %e, source = source_name, "NATS ack failed"); |
| 30 | + } |
| 31 | + PublishOutcome::AckTimedOut(timeout) => { |
| 32 | + error!(?timeout, source = source_name, "NATS ack timed out"); |
| 33 | + } |
| 34 | + } |
| 35 | + } |
| 36 | +} |
| 37 | + |
| 38 | +pub async fn publish_event<P: JetStreamPublisher>( |
| 39 | + js: &P, |
| 40 | + subject: String, |
| 41 | + headers: async_nats::HeaderMap, |
| 42 | + body: Bytes, |
| 43 | + ack_timeout: Duration, |
| 44 | +) -> PublishOutcome<P::PublishError> { |
| 45 | + let ack_future = match js.publish_with_headers(subject, headers, body).await { |
| 46 | + Ok(f) => f, |
| 47 | + Err(e) => return PublishOutcome::PublishFailed(e), |
| 48 | + }; |
| 49 | + |
| 50 | + match tokio::time::timeout(ack_timeout, ack_future).await { |
| 51 | + Ok(Ok(_)) => PublishOutcome::Published, |
| 52 | + Ok(Err(e)) => PublishOutcome::AckFailed(e), |
| 53 | + Err(_) => PublishOutcome::AckTimedOut(ack_timeout), |
| 54 | + } |
| 55 | +} |
| 56 | + |
| 57 | +#[cfg(test)] |
| 58 | +mod tests { |
| 59 | + use super::*; |
| 60 | + |
| 61 | + #[cfg(feature = "test-support")] |
| 62 | + mod with_mocks { |
| 63 | + use super::*; |
| 64 | + use crate::jetstream::MockJetStreamPublisher; |
| 65 | + |
| 66 | + #[tokio::test] |
| 67 | + async fn publish_event_returns_published_on_success() { |
| 68 | + let publisher = MockJetStreamPublisher::new(); |
| 69 | + let result = publish_event( |
| 70 | + &publisher, |
| 71 | + "test.subject".to_string(), |
| 72 | + async_nats::HeaderMap::new(), |
| 73 | + Bytes::from_static(b"payload"), |
| 74 | + Duration::from_secs(10), |
| 75 | + ) |
| 76 | + .await; |
| 77 | + assert!(result.is_ok()); |
| 78 | + } |
| 79 | + |
| 80 | + #[tokio::test] |
| 81 | + async fn publish_event_returns_publish_failed_on_error() { |
| 82 | + let publisher = MockJetStreamPublisher::new(); |
| 83 | + publisher.fail_next_js_publish(); |
| 84 | + let result = publish_event( |
| 85 | + &publisher, |
| 86 | + "test.subject".to_string(), |
| 87 | + async_nats::HeaderMap::new(), |
| 88 | + Bytes::from_static(b"payload"), |
| 89 | + Duration::from_secs(10), |
| 90 | + ) |
| 91 | + .await; |
| 92 | + assert!(matches!(result, PublishOutcome::PublishFailed(_))); |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + #[test] |
| 97 | + fn log_on_error_does_nothing_for_published() { |
| 98 | + let outcome: PublishOutcome<String> = PublishOutcome::Published; |
| 99 | + outcome.log_on_error("test"); |
| 100 | + } |
| 101 | +} |
0 commit comments