Skip to content

Commit dbfcbfb

Browse files
committed
feat(slack): add trogon-source-slack webhook receiver
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 6fdf052 commit dbfcbfb

File tree

12 files changed

+1442
-52
lines changed

12 files changed

+1442
-52
lines changed

rsworkspace/Cargo.lock

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/acp-telemetry/src/service_name.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub enum ServiceName {
66
AcpNatsStdio,
77
AcpNatsWs,
88
TrogonSourceGithub,
9+
TrogonSourceSlack,
910
}
1011

1112
impl ServiceName {
@@ -14,6 +15,7 @@ impl ServiceName {
1415
Self::AcpNatsStdio => "acp-nats-stdio",
1516
Self::AcpNatsWs => "acp-nats-ws",
1617
Self::TrogonSourceGithub => "trogon-source-github",
18+
Self::TrogonSourceSlack => "trogon-source-slack",
1719
}
1820
}
1921
}
@@ -32,7 +34,14 @@ mod tests {
3234
fn as_str_returns_expected_values() {
3335
assert_eq!(ServiceName::AcpNatsStdio.as_str(), "acp-nats-stdio");
3436
assert_eq!(ServiceName::AcpNatsWs.as_str(), "acp-nats-ws");
35-
assert_eq!(ServiceName::TrogonSourceGithub.as_str(), "trogon-source-github");
37+
assert_eq!(
38+
ServiceName::TrogonSourceGithub.as_str(),
39+
"trogon-source-github"
40+
);
41+
assert_eq!(
42+
ServiceName::TrogonSourceSlack.as_str(),
43+
"trogon-source-slack"
44+
);
3645
}
3746

3847
#[test]
@@ -43,5 +52,9 @@ mod tests {
4352
format!("{}", ServiceName::TrogonSourceGithub),
4453
"trogon-source-github"
4554
);
55+
assert_eq!(
56+
format!("{}", ServiceName::TrogonSourceSlack),
57+
"trogon-source-slack"
58+
);
4659
}
4760
}

rsworkspace/crates/trogon-nats/src/jetstream/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#[cfg(not(coverage))]
22
pub mod client;
33
pub mod message;
4+
pub mod publish;
45
pub mod traits;
56

67
#[cfg(feature = "test-support")]
@@ -15,6 +16,7 @@ pub use message::{
1516
JsAck, JsAckWith, JsDispatchMessage, JsDoubleAck, JsDoubleAckWith, JsMessageRef,
1617
JsRequestMessage,
1718
};
19+
pub use publish::{PublishOutcome, publish_event};
1820
pub use traits::{
1921
JetStreamConsumer, JetStreamContext, JetStreamCreateConsumer, JetStreamGetStream,
2022
JetStreamPublisher, JsMessageOf,
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::fmt;
2+
use std::time::Duration;
3+
4+
use bytes::Bytes;
5+
use tracing::error;
6+
7+
use crate::jetstream::JetStreamPublisher;
8+
9+
#[derive(Debug)]
10+
pub enum PublishOutcome<E: fmt::Display> {
11+
Published,
12+
PublishFailed(E),
13+
AckFailed(E),
14+
AckTimedOut(Duration),
15+
}
16+
17+
impl<E: fmt::Display> PublishOutcome<E> {
18+
pub fn is_ok(&self) -> bool {
19+
matches!(self, PublishOutcome::Published)
20+
}
21+
22+
pub fn log_on_error(&self, source_name: &str) {
23+
match self {
24+
PublishOutcome::Published => {}
25+
PublishOutcome::PublishFailed(e) => {
26+
error!(error = %e, source = source_name, "Failed to publish event to NATS");
27+
}
28+
PublishOutcome::AckFailed(e) => {
29+
error!(error = %e, source = source_name, "NATS ack failed");
30+
}
31+
PublishOutcome::AckTimedOut(timeout) => {
32+
error!(?timeout, source = source_name, "NATS ack timed out");
33+
}
34+
}
35+
}
36+
}
37+
38+
pub async fn publish_event<P: JetStreamPublisher>(
39+
js: &P,
40+
subject: String,
41+
headers: async_nats::HeaderMap,
42+
body: Bytes,
43+
ack_timeout: Duration,
44+
) -> PublishOutcome<P::PublishError> {
45+
let ack_future = match js.publish_with_headers(subject, headers, body).await {
46+
Ok(f) => f,
47+
Err(e) => return PublishOutcome::PublishFailed(e),
48+
};
49+
50+
match tokio::time::timeout(ack_timeout, ack_future).await {
51+
Ok(Ok(_)) => PublishOutcome::Published,
52+
Ok(Err(e)) => PublishOutcome::AckFailed(e),
53+
Err(_) => PublishOutcome::AckTimedOut(ack_timeout),
54+
}
55+
}
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use super::*;
60+
61+
#[cfg(feature = "test-support")]
62+
mod with_mocks {
63+
use super::*;
64+
use crate::jetstream::MockJetStreamPublisher;
65+
66+
#[tokio::test]
67+
async fn publish_event_returns_published_on_success() {
68+
let publisher = MockJetStreamPublisher::new();
69+
let result = publish_event(
70+
&publisher,
71+
"test.subject".to_string(),
72+
async_nats::HeaderMap::new(),
73+
Bytes::from_static(b"payload"),
74+
Duration::from_secs(10),
75+
)
76+
.await;
77+
assert!(result.is_ok());
78+
}
79+
80+
#[tokio::test]
81+
async fn publish_event_returns_publish_failed_on_error() {
82+
let publisher = MockJetStreamPublisher::new();
83+
publisher.fail_next_js_publish();
84+
let result = publish_event(
85+
&publisher,
86+
"test.subject".to_string(),
87+
async_nats::HeaderMap::new(),
88+
Bytes::from_static(b"payload"),
89+
Duration::from_secs(10),
90+
)
91+
.await;
92+
assert!(matches!(result, PublishOutcome::PublishFailed(_)));
93+
}
94+
}
95+
96+
#[test]
97+
fn log_on_error_does_nothing_for_published() {
98+
let outcome: PublishOutcome<String> = PublishOutcome::Published;
99+
outcome.log_on_error("test");
100+
}
101+
}

rsworkspace/crates/trogon-source-github/src/server.rs

Lines changed: 13 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use axum::{
1515
use std::future::Future;
1616
use std::pin::Pin;
1717
use tower_http::limit::RequestBodyLimitLayer;
18-
use tracing::{error, info, instrument, warn};
19-
use trogon_nats::jetstream::{JetStreamContext, JetStreamPublisher};
18+
use tracing::{info, instrument, warn};
19+
use trogon_nats::jetstream::{JetStreamContext, JetStreamPublisher, PublishOutcome, publish_event};
2020

2121
#[cfg(not(coverage))]
2222
#[derive(Debug)]
@@ -53,52 +53,13 @@ impl From<std::io::Error> for ServeError {
5353
}
5454
}
5555

56-
enum PublishOutcome<E: fmt::Display> {
57-
Published,
58-
PublishFailed(E),
59-
AckFailed(E),
60-
AckTimedOut(Duration),
61-
}
62-
63-
impl<E: fmt::Display> PublishOutcome<E> {
64-
fn into_status(self) -> StatusCode {
65-
match self {
66-
PublishOutcome::Published => {
67-
info!("Published GitHub event to NATS");
68-
StatusCode::OK
69-
}
70-
PublishOutcome::PublishFailed(e) => {
71-
error!(error = %e, "Failed to publish GitHub event to NATS");
72-
StatusCode::INTERNAL_SERVER_ERROR
73-
}
74-
PublishOutcome::AckFailed(e) => {
75-
error!(error = %e, "NATS ack failed");
76-
StatusCode::INTERNAL_SERVER_ERROR
77-
}
78-
PublishOutcome::AckTimedOut(timeout) => {
79-
error!(?timeout, "NATS ack timed out");
80-
StatusCode::INTERNAL_SERVER_ERROR
81-
}
82-
}
83-
}
84-
}
85-
86-
async fn publish_event<P: JetStreamPublisher>(
87-
js: &P,
88-
subject: String,
89-
headers: async_nats::HeaderMap,
90-
body: Bytes,
91-
ack_timeout: Duration,
92-
) -> PublishOutcome<P::PublishError> {
93-
let ack_future = match js.publish_with_headers(subject, headers, body).await {
94-
Ok(f) => f,
95-
Err(e) => return PublishOutcome::PublishFailed(e),
96-
};
97-
98-
match tokio::time::timeout(ack_timeout, ack_future).await {
99-
Ok(Ok(_)) => PublishOutcome::Published,
100-
Ok(Err(e)) => PublishOutcome::AckFailed(e),
101-
Err(_) => PublishOutcome::AckTimedOut(ack_timeout),
56+
fn outcome_to_status<E: fmt::Display>(outcome: PublishOutcome<E>) -> StatusCode {
57+
if outcome.is_ok() {
58+
info!("Published GitHub event to NATS");
59+
StatusCode::OK
60+
} else {
61+
outcome.log_on_error("github");
62+
StatusCode::INTERNAL_SERVER_ERROR
10263
}
10364
}
10465

@@ -238,15 +199,16 @@ async fn handle_webhook_inner<P: JetStreamPublisher>(
238199
nats_headers.insert(NATS_HEADER_EVENT, event.as_str());
239200
nats_headers.insert(NATS_HEADER_DELIVERY, delivery.as_str());
240201

241-
publish_event(
202+
let outcome = publish_event(
242203
&state.js,
243204
subject,
244205
nats_headers,
245206
body,
246207
state.nats_ack_timeout,
247208
)
248-
.await
249-
.into_status()
209+
.await;
210+
211+
outcome_to_status(outcome)
250212
}
251213

252214
#[cfg(test)]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[package]
2+
name = "trogon-source-slack"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[lints]
7+
workspace = true
8+
9+
[[bin]]
10+
name = "trogon-source-slack"
11+
path = "src/main.rs"
12+
13+
[dependencies]
14+
acp-telemetry = { workspace = true }
15+
async-nats = { workspace = true, features = ["jetstream"] }
16+
axum = { workspace = true }
17+
bytes = { workspace = true }
18+
bytesize = "2.3.1"
19+
hex = "0.4"
20+
hmac = "0.12"
21+
sha2 = "0.10"
22+
tokio = { workspace = true, features = ["full"] }
23+
tower-http = { workspace = true, features = ["limit"] }
24+
serde_json = { workspace = true }
25+
tracing = { workspace = true }
26+
trogon-nats = { workspace = true }
27+
trogon-std = { workspace = true }
28+
29+
[dev-dependencies]
30+
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
31+
tower = "0.5"
32+
tracing-subscriber = { workspace = true }
33+
trogon-nats = { workspace = true, features = ["test-support"] }
34+
trogon-std = { workspace = true, features = ["test-support"] }

0 commit comments

Comments
 (0)