1+ use std:: fmt;
12use std:: time:: Duration ;
23
34use crate :: config:: GithubConfig ;
45use crate :: signature;
6+ use async_nats:: jetstream:: context:: CreateStreamError ;
57use axum:: {
68 Router ,
79 body:: Bytes ,
@@ -16,6 +18,85 @@ use tower_http::limit::RequestBodyLimitLayer;
1618use tracing:: { info, instrument, warn} ;
1719use trogon_nats:: jetstream:: { JetStreamContext , JetStreamPublisher , NatsJetStreamClient } ;
1820
21+ #[ derive( Debug ) ]
22+ pub enum ServeError {
23+ Provision ( CreateStreamError ) ,
24+ Io ( std:: io:: Error ) ,
25+ }
26+
27+ impl fmt:: Display for ServeError {
28+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
29+ match self {
30+ ServeError :: Provision ( e) => write ! ( f, "stream provisioning failed: {e}" ) ,
31+ ServeError :: Io ( e) => write ! ( f, "server IO error: {e}" ) ,
32+ }
33+ }
34+ }
35+
36+ impl std:: error:: Error for ServeError {
37+ fn source ( & self ) -> Option < & ( dyn std:: error:: Error + ' static ) > {
38+ match self {
39+ ServeError :: Provision ( e) => Some ( e) ,
40+ ServeError :: Io ( e) => Some ( e) ,
41+ }
42+ }
43+ }
44+
45+ impl From < std:: io:: Error > for ServeError {
46+ fn from ( e : std:: io:: Error ) -> Self {
47+ ServeError :: Io ( e)
48+ }
49+ }
50+
51+ enum PublishOutcome < E : fmt:: Display > {
52+ Published ,
53+ PublishFailed ( E ) ,
54+ AckFailed ( E ) ,
55+ AckTimedOut ( Duration ) ,
56+ }
57+
58+ impl < E : fmt:: Display > PublishOutcome < E > {
59+ fn into_status ( self ) -> StatusCode {
60+ match self {
61+ PublishOutcome :: Published => {
62+ info ! ( "Published GitHub event to NATS" ) ;
63+ StatusCode :: OK
64+ }
65+ PublishOutcome :: PublishFailed ( e) => {
66+ warn ! ( error = %e, "Failed to publish GitHub event to NATS" ) ;
67+ StatusCode :: INTERNAL_SERVER_ERROR
68+ }
69+ PublishOutcome :: AckFailed ( e) => {
70+ warn ! ( error = %e, "NATS ack failed" ) ;
71+ StatusCode :: INTERNAL_SERVER_ERROR
72+ }
73+ PublishOutcome :: AckTimedOut ( timeout) => {
74+ warn ! ( ?timeout, "NATS ack timed out" ) ;
75+ StatusCode :: INTERNAL_SERVER_ERROR
76+ }
77+ }
78+ }
79+ }
80+
81+ async fn publish_event < P : JetStreamPublisher > (
82+ js : & P ,
83+ subject : String ,
84+ headers : async_nats:: HeaderMap ,
85+ body : Bytes ,
86+ ack_timeout : Duration ,
87+ ) -> PublishOutcome < P :: PublishError > {
88+ let ack_future = match js. publish_with_headers ( subject, headers, body) . await {
89+ Ok ( f) => f,
90+ Err ( e) => return PublishOutcome :: PublishFailed ( e) ,
91+ } ;
92+
93+ match tokio:: time:: timeout ( ack_timeout, ack_future) . await {
94+ Ok ( Ok ( _) ) => PublishOutcome :: Published ,
95+ Ok ( Err ( e) ) => PublishOutcome :: AckFailed ( e) ,
96+ Err ( _) => PublishOutcome :: AckTimedOut ( ack_timeout) ,
97+ }
98+ }
99+
19100#[ derive( Clone ) ]
20101struct AppState < P : JetStreamPublisher > {
21102 js : P ,
@@ -61,12 +142,9 @@ pub fn router(config: &GithubConfig, nats: async_nats::Client) -> Router {
61142 . with_state ( state)
62143}
63144
64- pub async fn serve (
65- config : GithubConfig ,
66- nats : async_nats:: Client ,
67- ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
145+ pub async fn serve ( config : GithubConfig , nats : async_nats:: Client ) -> Result < ( ) , ServeError > {
68146 let js = NatsJetStreamClient :: new ( async_nats:: jetstream:: new ( nats. clone ( ) ) ) ;
69- provision ( & js, & config) . await ?;
147+ provision ( & js, & config) . await . map_err ( ServeError :: Provision ) ?;
70148
71149 let app = router ( & config, nats) ;
72150
@@ -174,32 +252,9 @@ async fn handle_webhook<P: JetStreamPublisher>(
174252 nats_headers. insert ( "X-GitHub-Event" , event. as_str ( ) ) ;
175253 nats_headers. insert ( "X-GitHub-Delivery" , delivery. as_str ( ) ) ;
176254
177- match state
178- . js
179- . publish_with_headers ( subject. clone ( ) , nats_headers, body)
255+ publish_event ( & state. js , subject, nats_headers, body, state. nats_ack_timeout )
180256 . await
181- {
182- Ok ( ack_future) => {
183- match tokio:: time:: timeout ( state. nats_ack_timeout , ack_future) . await {
184- Ok ( Ok ( _) ) => {
185- info ! ( "Published GitHub event to NATS" ) ;
186- StatusCode :: OK
187- }
188- Ok ( Err ( e) ) => {
189- warn ! ( error = %e, "NATS ack failed" ) ;
190- StatusCode :: INTERNAL_SERVER_ERROR
191- }
192- Err ( _) => {
193- warn ! ( timeout = ?state. nats_ack_timeout, "NATS ack timed out" ) ;
194- StatusCode :: INTERNAL_SERVER_ERROR
195- }
196- }
197- }
198- Err ( e) => {
199- warn ! ( error = %e, "Failed to publish GitHub event to NATS" ) ;
200- StatusCode :: INTERNAL_SERVER_ERROR
201- }
202- }
257+ . into_status ( )
203258}
204259
205260#[ cfg( test) ]
0 commit comments