@@ -9,7 +9,7 @@ use databricks_zerobus_ingest_sdk::{ConnectorFactory, ProxyConnector, ZerobusSdk
99use futures:: future:: BoxFuture ;
1010use std:: sync:: Arc ;
1111use tokio:: sync:: { Mutex , OnceCell , RwLock } ;
12- use tower:: Service ;
12+ use tower:: { Layer , Service } ;
1313use tracing:: warn;
1414use vector_lib:: codecs:: encoding:: {
1515 BatchEncoder , BatchOutput , BatchSerializerConfig , ProtoBatchSerializerConfig ,
@@ -71,14 +71,40 @@ pub struct ZerobusRequest {
7171}
7272
7373/// Response type for the Zerobus service.
74+ ///
75+ /// Carries the final `EventStatus` so the driver can mark finalizers correctly:
76+ /// `Delivered` on success, `Errored` when the retry budget was exhausted on a
77+ /// transient failure (asking the source / disk buffer to replay), and `Err`
78+ /// from `Service::call` reserved for permanent failures (driver maps to
79+ /// `Rejected`).
7480#[ derive( Debug ) ]
7581pub struct ZerobusResponse {
7682 pub events_byte_size : GroupedCountByteSize ,
83+ pub status : vector_lib:: event:: EventStatus ,
84+ }
85+
86+ impl ZerobusResponse {
87+ const fn delivered ( events_byte_size : GroupedCountByteSize ) -> Self {
88+ Self {
89+ events_byte_size,
90+ status : vector_lib:: event:: EventStatus :: Delivered ,
91+ }
92+ }
93+
94+ /// Synthesize a response signalling a transient failure that exhausted the
95+ /// retry budget. Carries a telemetry-aware zero `events_byte_size` because
96+ /// the driver only consumes `events_sent()` on the `Delivered` path.
97+ fn errored ( ) -> Self {
98+ Self {
99+ events_byte_size : vector_lib:: config:: telemetry ( ) . create_request_count_byte_size ( ) ,
100+ status : vector_lib:: event:: EventStatus :: Errored ,
101+ }
102+ }
77103}
78104
79105impl DriverResponse for ZerobusResponse {
80106 fn event_status ( & self ) -> vector_lib:: event:: EventStatus {
81- vector_lib :: event :: EventStatus :: Delivered
107+ self . status
82108 }
83109
84110 fn events_sent ( & self ) -> & GroupedCountByteSize {
@@ -441,7 +467,7 @@ impl ZerobusService {
441467 } ;
442468
443469 match result {
444- Ok ( ( ) ) => Ok ( ZerobusResponse { events_byte_size } ) ,
470+ Ok ( ( ) ) => Ok ( ZerobusResponse :: delivered ( events_byte_size) ) ,
445471 Err ( e) => {
446472 if e. is_retryable ( ) {
447473 // Clear the slot so the next attempt creates a fresh stream,
@@ -559,6 +585,80 @@ impl RetryLogic for ZerobusRetryLogic {
559585 }
560586}
561587
588+ /// Tower layer that converts retry-budget-exhausted retryable errors into a
589+ /// successful `ZerobusResponse` carrying `EventStatus::Errored`.
590+ ///
591+ /// Wraps the retry layer from the outside. When the retry layer returns:
592+ /// - `Ok(resp)` — pass through unchanged.
593+ /// - `Err(e)` where `e.is_retryable()` — convert to `Ok(ZerobusResponse::errored())`
594+ /// so the driver marks finalizers `Errored` (transient — source / disk
595+ /// buffer may replay) rather than `Rejected` (permanent drop).
596+ /// - `Err(e)` permanent — propagate so the driver maps to `Rejected`.
597+ ///
598+ /// Without this layer the driver maps every `Err` from `Service::call` to
599+ /// `EventStatus::Rejected`, which would drop transient-but-exhausted failures
600+ /// as if they were permanent.
601+ #[ derive( Clone , Debug , Default ) ]
602+ pub struct RetryableErrorAsErroredLayer ;
603+
604+ impl < S > Layer < S > for RetryableErrorAsErroredLayer {
605+ type Service = RetryableErrorAsErrored < S > ;
606+
607+ fn layer ( & self , inner : S ) -> Self :: Service {
608+ RetryableErrorAsErrored { inner }
609+ }
610+ }
611+
612+ #[ derive( Clone , Debug ) ]
613+ pub struct RetryableErrorAsErrored < S > {
614+ inner : S ,
615+ }
616+
617+ impl < S > Service < ZerobusRequest > for RetryableErrorAsErrored < S >
618+ where
619+ S : Service < ZerobusRequest , Response = ZerobusResponse , Error = crate :: Error > ,
620+ S :: Future : Send + ' static ,
621+ {
622+ type Response = ZerobusResponse ;
623+ type Error = crate :: Error ;
624+ type Future = BoxFuture < ' static , Result < Self :: Response , Self :: Error > > ;
625+
626+ fn poll_ready (
627+ & mut self ,
628+ cx : & mut std:: task:: Context < ' _ > ,
629+ ) -> std:: task:: Poll < Result < ( ) , Self :: Error > > {
630+ self . inner . poll_ready ( cx)
631+ }
632+
633+ fn call ( & mut self , req : ZerobusRequest ) -> Self :: Future {
634+ let fut = self . inner . call ( req) ;
635+ Box :: pin ( async move {
636+ match fut. await {
637+ Ok ( resp) => Ok ( resp) ,
638+ Err ( e) => {
639+ // The Tower stack boxes errors above us (retry, timeout,
640+ // adaptive-concurrency). Downcast to inspect retryability;
641+ // anything that isn't a `ZerobusSinkError` (e.g. a timeout
642+ // `Elapsed`) is conservatively treated as transient.
643+ let retryable = match e. downcast_ref :: < ZerobusSinkError > ( ) {
644+ Some ( zb) => zb. is_retryable ( ) ,
645+ None => true ,
646+ } ;
647+ if retryable {
648+ warn ! (
649+ message = "Zerobus retry budget exhausted on transient error; signaling Errored so source or buffer may replay." ,
650+ error = %e,
651+ ) ;
652+ Ok ( ZerobusResponse :: errored ( ) )
653+ } else {
654+ Err ( e)
655+ }
656+ }
657+ }
658+ } )
659+ }
660+ }
661+
562662#[ cfg( test) ]
563663mod tests {
564664 use super :: * ;
@@ -809,4 +909,102 @@ mod tests {
809909 // And the slot was cleared so the next ingest creates a fresh stream.
810910 assert ! ( !service. has_active_stream( ) . await ) ;
811911 }
912+
913+ fn dummy_request ( ) -> ZerobusRequest {
914+ ZerobusRequest {
915+ events : Arc :: new ( vec ! [ ] ) ,
916+ metadata : RequestMetadata :: default ( ) ,
917+ finalizers : EventFinalizers :: default ( ) ,
918+ }
919+ }
920+
921+ #[ tokio:: test]
922+ async fn retryable_err_after_exhaustion_becomes_ok_errored ( ) {
923+ use tower:: ServiceExt ;
924+ let inner = tower:: service_fn ( |_req : ZerobusRequest | async move {
925+ let err: crate :: Error = Box :: new ( ZerobusSinkError :: SchemaError {
926+ message : "UC 503" . to_string ( ) ,
927+ retryable : true ,
928+ } ) ;
929+ Err :: < ZerobusResponse , _ > ( err)
930+ } ) ;
931+ let mut svc = RetryableErrorAsErrored { inner } ;
932+ let resp = svc
933+ . ready ( )
934+ . await
935+ . unwrap ( )
936+ . call ( dummy_request ( ) )
937+ . await
938+ . unwrap ( ) ;
939+ assert_eq ! ( resp. status, vector_lib:: event:: EventStatus :: Errored ) ;
940+ }
941+
942+ #[ tokio:: test]
943+ async fn non_retryable_err_propagates ( ) {
944+ use tower:: ServiceExt ;
945+ let inner = tower:: service_fn ( |_req : ZerobusRequest | async move {
946+ let err: crate :: Error = Box :: new ( ZerobusSinkError :: EncodingError {
947+ message : "bad" . to_string ( ) ,
948+ } ) ;
949+ Err :: < ZerobusResponse , _ > ( err)
950+ } ) ;
951+ let mut svc = RetryableErrorAsErrored { inner } ;
952+ let err = svc
953+ . ready ( )
954+ . await
955+ . unwrap ( )
956+ . call ( dummy_request ( ) )
957+ . await
958+ . unwrap_err ( ) ;
959+ let zb = err. downcast_ref :: < ZerobusSinkError > ( ) . unwrap ( ) ;
960+ assert ! ( matches!( zb, ZerobusSinkError :: EncodingError { .. } ) ) ;
961+ }
962+
963+ #[ tokio:: test]
964+ async fn unknown_err_treated_as_transient ( ) {
965+ use tower:: ServiceExt ;
966+ // Simulate a Tower-layer error that isn't a ZerobusSinkError (e.g.
967+ // timeout `Elapsed`): conservatively becomes Errored, not Rejected.
968+ #[ derive( Debug ) ]
969+ struct Other ;
970+ impl std:: fmt:: Display for Other {
971+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
972+ write ! ( f, "other" )
973+ }
974+ }
975+ impl std:: error:: Error for Other { }
976+
977+ let inner = tower:: service_fn ( |_req : ZerobusRequest | async move {
978+ let err: crate :: Error = Box :: new ( Other ) ;
979+ Err :: < ZerobusResponse , _ > ( err)
980+ } ) ;
981+ let mut svc = RetryableErrorAsErrored { inner } ;
982+ let resp = svc
983+ . ready ( )
984+ . await
985+ . unwrap ( )
986+ . call ( dummy_request ( ) )
987+ . await
988+ . unwrap ( ) ;
989+ assert_eq ! ( resp. status, vector_lib:: event:: EventStatus :: Errored ) ;
990+ }
991+
992+ #[ tokio:: test]
993+ async fn ok_response_passes_through ( ) {
994+ use tower:: ServiceExt ;
995+ let inner = tower:: service_fn ( |_req : ZerobusRequest | async move {
996+ Ok :: < _ , crate :: Error > ( ZerobusResponse :: delivered (
997+ GroupedCountByteSize :: new_untagged ( ) ,
998+ ) )
999+ } ) ;
1000+ let mut svc = RetryableErrorAsErrored { inner } ;
1001+ let resp = svc
1002+ . ready ( )
1003+ . await
1004+ . unwrap ( )
1005+ . call ( dummy_request ( ) )
1006+ . await
1007+ . unwrap ( ) ;
1008+ assert_eq ! ( resp. status, vector_lib:: event:: EventStatus :: Delivered ) ;
1009+ }
8121010}
0 commit comments