@@ -5,7 +5,7 @@ use crate::sinks::util::retries::RetryLogic;
55use databricks_zerobus_ingest_sdk:: { ConnectorFactory , ProxyConnector , ZerobusSdk , ZerobusStream } ;
66use futures:: future:: BoxFuture ;
77use std:: sync:: Arc ;
8- use tokio:: sync:: Mutex ;
8+ use tokio:: sync:: { Mutex , RwLock } ;
99use tower:: Service ;
1010use tracing:: warn;
1111use vector_lib:: finalization:: { EventFinalizers , Finalizable } ;
@@ -112,21 +112,44 @@ pub enum StreamMode {
112112}
113113
114114/// The active stream.
115+ ///
116+ /// The SDK's `ZerobusStream::close()` requires `&mut self`, but ingests need
117+ /// shared access to call `&self` methods concurrently. We resolve this with an
118+ /// `RwLock`: ingests hold a read guard across `ingest_records_offset`, and
119+ /// `close()` takes the write guard, pulls the stream out of the `Option`, and
120+ /// awaits its SDK-level close on the owned value. Any holder of an `Arc` can
121+ /// invoke `close()`, so the graceful path always runs — there is no
122+ /// `try_unwrap`/`get_mut` race.
115123enum ActiveStream {
116- Proto ( Box < ZerobusStream > ) ,
124+ Proto ( RwLock < Option < Box < ZerobusStream > > > ) ,
117125 /// Test-only variant that returns a pre-configured error on ingest.
118126 #[ cfg( test) ]
119127 Mock ( MockStream ) ,
120128}
121129
122130impl ActiveStream {
131+ fn proto ( stream : ZerobusStream ) -> Self {
132+ ActiveStream :: Proto ( RwLock :: new ( Some ( Box :: new ( stream) ) ) )
133+ }
134+
123135 /// Gracefully flush and close the underlying SDK stream.
124136 ///
125- /// Safe to call before the value is dropped — the SDK's own `Drop`
126- /// implementation is a no-op on already-closed streams.
127- async fn close ( & mut self ) {
137+ /// Waits for any in-flight ingests (read-lock holders) to complete, then
138+ /// pulls the stream out of the slot and runs the SDK's awaitable `close()`
139+ /// on the owned value (released-lock so further ingests fail fast with
140+ /// `StreamClosed` rather than blocking).
141+ ///
142+ /// Idempotent: a second call after the stream has been taken is a no-op.
143+ /// The SDK's own `Drop` is also a no-op once close has run.
144+ async fn close ( & self ) {
128145 let result = match self {
129- ActiveStream :: Proto ( s) => s. close ( ) . await ,
146+ ActiveStream :: Proto ( lock) => {
147+ let taken = lock. write ( ) . await . take ( ) ;
148+ match taken {
149+ Some ( mut stream) => stream. close ( ) . await ,
150+ None => return ,
151+ }
152+ }
130153 #[ cfg( test) ]
131154 ActiveStream :: Mock ( m) => {
132155 m. closed . store ( true , std:: sync:: atomic:: Ordering :: Relaxed ) ;
@@ -146,6 +169,38 @@ pub struct MockStream {
146169 next_error : std:: sync:: Mutex < Option < databricks_zerobus_ingest_sdk:: ZerobusError > > ,
147170 /// Shared flag set to `true` when `ActiveStream::close()` is called.
148171 closed : Arc < std:: sync:: atomic:: AtomicBool > ,
172+ /// Optional gate: when set, each ingest call signals `started` and then
173+ /// waits to acquire a `release` permit before returning. Lets tests
174+ /// deterministically force two ingests to overlap (each holding an `Arc`
175+ /// clone of the `ActiveStream`) before they fail.
176+ gate : Option < MockGate > ,
177+ }
178+
179+ #[ cfg( test) ]
180+ struct MockGate {
181+ started : Arc < tokio:: sync:: Semaphore > ,
182+ release : Arc < tokio:: sync:: Semaphore > ,
183+ }
184+
185+ #[ cfg( test) ]
186+ #[ derive( Clone ) ]
187+ pub struct MockGateHandle {
188+ started : Arc < tokio:: sync:: Semaphore > ,
189+ release : Arc < tokio:: sync:: Semaphore > ,
190+ }
191+
192+ #[ cfg( test) ]
193+ impl MockGateHandle {
194+ /// Wait until `n` ingests have entered the gated region.
195+ pub async fn wait_for_started ( & self , n : u32 ) {
196+ let permit = self . started . acquire_many ( n) . await . unwrap ( ) ;
197+ permit. forget ( ) ;
198+ }
199+
200+ /// Release `n` queued ingests so they can return their result.
201+ pub fn release ( & self , n : u32 ) {
202+ self . release . add_permits ( n as usize ) ;
203+ }
149204}
150205
151206#[ cfg( test) ]
@@ -154,16 +209,30 @@ impl MockStream {
154209 Self {
155210 next_error : std:: sync:: Mutex :: new ( None ) ,
156211 closed : Arc :: new ( std:: sync:: atomic:: AtomicBool :: new ( false ) ) ,
212+ gate : None ,
157213 }
158214 }
159215
160216 pub fn failing ( error : databricks_zerobus_ingest_sdk:: ZerobusError ) -> Self {
161217 Self {
162218 next_error : std:: sync:: Mutex :: new ( Some ( error) ) ,
163219 closed : Arc :: new ( std:: sync:: atomic:: AtomicBool :: new ( false ) ) ,
220+ gate : None ,
164221 }
165222 }
166223
224+ /// Install a gate so ingests block until the test releases them.
225+ /// Returns a handle the test uses to coordinate.
226+ pub fn with_gate ( mut self ) -> ( Self , MockGateHandle ) {
227+ let started = Arc :: new ( tokio:: sync:: Semaphore :: new ( 0 ) ) ;
228+ let release = Arc :: new ( tokio:: sync:: Semaphore :: new ( 0 ) ) ;
229+ self . gate = Some ( MockGate {
230+ started : Arc :: clone ( & started) ,
231+ release : Arc :: clone ( & release) ,
232+ } ) ;
233+ ( self , MockGateHandle { started, release } )
234+ }
235+
167236 /// Returns a shared handle to the closed flag for test assertions.
168237 pub fn closed_flag ( & self ) -> Arc < std:: sync:: atomic:: AtomicBool > {
169238 Arc :: clone ( & self . closed )
@@ -174,7 +243,13 @@ impl MockStream {
174243 * self . next_error . lock ( ) . unwrap ( ) = Some ( error) ;
175244 }
176245
177- fn try_ingest ( & self ) -> Result < ( ) , databricks_zerobus_ingest_sdk:: ZerobusError > {
246+ async fn try_ingest ( & self ) -> Result < ( ) , databricks_zerobus_ingest_sdk:: ZerobusError > {
247+ if let Some ( gate) = & self . gate {
248+ gate. started . add_permits ( 1 ) ;
249+ // Acquire and immediately forget — we don't need to release the
250+ // permit on drop, the test's `release()` call hands them out.
251+ gate. release . acquire ( ) . await . unwrap ( ) . forget ( ) ;
252+ }
178253 match self . next_error . lock ( ) . unwrap ( ) . take ( ) {
179254 Some ( e) => Err ( e) ,
180255 None => Ok ( ( ) ) ,
@@ -269,7 +344,7 @@ impl ZerobusService {
269344 . build ( )
270345 . await
271346 . map_err ( |e| ZerobusSinkError :: StreamInitError { source : e } ) ?;
272- ActiveStream :: Proto ( Box :: new ( stream) )
347+ ActiveStream :: proto ( stream)
273348 }
274349 } ;
275350
@@ -281,20 +356,16 @@ impl ZerobusService {
281356
282357 /// Gracefully close and remove the active stream.
283358 ///
284- /// Should be called after all in-flight ingests have completed (e.g.,
285- /// after the driver returns) so that the slot holds the sole `Arc`
286- /// reference to the stream.
359+ /// `ActiveStream::close()` takes `&self`, so this works regardless of how
360+ /// many `Arc` clones are still in flight: the inner write lock waits for
361+ /// any concurrent ingests to release their read guards before the SDK
362+ /// flush + close runs. The slot lock is released before close starts so
363+ /// concurrent `get_or_create_stream` calls aren't blocked on the SDK
364+ /// shutdown path.
287365 pub async fn close_stream ( & self ) {
288- if let Some ( stream) = self . stream . lock ( ) . await . take ( ) {
289- match Arc :: try_unwrap ( stream) {
290- Ok ( mut stream) => stream. close ( ) . await ,
291- Err ( _) => {
292- warn ! (
293- message =
294- "Zerobus stream has outstanding references, skipping graceful close."
295- ) ;
296- }
297- }
366+ let stream = self . stream . lock ( ) . await . take ( ) ;
367+ if let Some ( stream) = stream {
368+ stream. close ( ) . await ;
298369 }
299370 }
300371
@@ -311,14 +382,19 @@ impl ZerobusService {
311382 payload : ZerobusPayload ,
312383 events_byte_size : GroupedCountByteSize ,
313384 ) -> Result < ZerobusResponse , ZerobusSinkError > {
314- let mut stream = self . get_or_create_stream ( ) . await ?;
385+ let stream = self . get_or_create_stream ( ) . await ?;
315386
316- // Lock is not held here — other tasks can ingest concurrently.
387+ // Slot lock is not held here — concurrent ingests acquire read guards
388+ // on the inner `RwLock` and run truly in parallel.
317389 let result = match ( payload, stream. as_ref ( ) ) {
318- ( ZerobusPayload :: Records ( records) , ActiveStream :: Proto ( stream) ) => {
319- match stream. ingest_records_offset ( records) . await {
390+ ( ZerobusPayload :: Records ( records) , ActiveStream :: Proto ( lock) ) => {
391+ let guard = lock. read ( ) . await ;
392+ let Some ( s) = guard. as_ref ( ) else {
393+ return Err ( ZerobusSinkError :: StreamClosed ) ;
394+ } ;
395+ match s. ingest_records_offset ( records) . await {
320396 Ok ( Some ( offset) ) if self . require_acknowledgements => {
321- stream . wait_for_offset ( offset) . await . map ( |_| ( ) )
397+ s . wait_for_offset ( offset) . await . map ( |_| ( ) )
322398 }
323399 Ok ( None ) if self . require_acknowledgements => {
324400 return Err ( ZerobusSinkError :: MissingAckOffset ) ;
@@ -328,25 +404,27 @@ impl ZerobusService {
328404 }
329405 }
330406 #[ cfg( test) ]
331- ( ZerobusPayload :: Records ( _) , ActiveStream :: Mock ( mock) ) => mock. try_ingest ( ) ,
407+ ( ZerobusPayload :: Records ( _) , ActiveStream :: Mock ( mock) ) => mock. try_ingest ( ) . await ,
332408 } ;
333409
334410 match result {
335411 Ok ( ( ) ) => Ok ( ZerobusResponse { events_byte_size } ) ,
336412 Err ( e) => {
337413 if e. is_retryable ( ) {
338- // Only clear the slot if it still points to the same stream that
339- // failed. Another concurrent task may have already replaced it
340- // with a fresh stream after recovering from its own error .
414+ // Clear the slot so the next attempt creates a fresh stream,
415+ // but only if it still points to the same stream that failed —
416+ // a concurrent task may have already replaced it .
341417 {
342418 let mut guard = self . stream . lock ( ) . await ;
343419 if guard. as_ref ( ) . is_some_and ( |s| Arc :: ptr_eq ( s, & stream) ) {
344420 guard. take ( ) ;
345421 }
346422 }
347- if let Some ( active) = Arc :: get_mut ( & mut stream) {
348- active. close ( ) . await ;
349- }
423+ // `close()` takes `&self`, so we can always run the graceful
424+ // path here regardless of how many other `Arc` clones are in
425+ // flight. The write lock will wait for any concurrent ingests
426+ // holding read guards to drain before flushing.
427+ stream. close ( ) . await ;
350428 }
351429 Err ( ZerobusSinkError :: IngestionError { source : e } )
352430 }
@@ -441,6 +519,7 @@ impl RetryLogic for ZerobusRetryLogic {
441519 ZerobusSinkError :: ZerobusError { source }
442520 | ZerobusSinkError :: StreamInitError { source }
443521 | ZerobusSinkError :: IngestionError { source } => source. is_retryable ( ) ,
522+ ZerobusSinkError :: StreamClosed => true ,
444523 ZerobusSinkError :: ConfigError { .. }
445524 | ZerobusSinkError :: EncodingError { .. }
446525 | ZerobusSinkError :: MissingAckOffset => false ,
@@ -599,4 +678,59 @@ mod tests {
599678 assert ! ( !service. has_active_stream( ) . await ) ;
600679 assert ! ( closed. load( std:: sync:: atomic:: Ordering :: Relaxed ) ) ;
601680 }
681+
682+ /// Regression test for the "silent abort-only Drop" issue: when two
683+ /// ingests are in flight (each holding an `Arc<ActiveStream>`) and one
684+ /// fails retryably, the failing task must still run the graceful close
685+ /// path. Under the previous design `Arc::get_mut` returned `None` here
686+ /// because the second task held a clone, so close was skipped and the
687+ /// stream fell to abort-only Drop.
688+ #[ tokio:: test]
689+ async fn retryable_failure_with_concurrent_ingest_still_closes ( ) {
690+ let ( mock, gate) = MockStream :: failing ( ZerobusError :: ChannelCreationError (
691+ "connection reset" . to_string ( ) ,
692+ ) )
693+ . with_gate ( ) ;
694+ let closed = mock. closed_flag ( ) ;
695+
696+ let service = ZerobusService :: new_with_mock ( test_config ( ) , mock, false )
697+ . await
698+ . unwrap ( ) ;
699+
700+ // Spawn two concurrent ingests. Each will obtain its own `Arc` clone
701+ // from `get_or_create_stream`, then block in the gate.
702+ let s1 = service. clone ( ) ;
703+ let t1 = tokio:: spawn ( async move {
704+ s1. ingest ( dummy_payload ( ) , GroupedCountByteSize :: new_untagged ( ) )
705+ . await
706+ } ) ;
707+ let s2 = service. clone ( ) ;
708+ let t2 = tokio:: spawn ( async move {
709+ s2. ingest ( dummy_payload ( ) , GroupedCountByteSize :: new_untagged ( ) )
710+ . await
711+ } ) ;
712+
713+ // Wait until both ingests are inside the gate (both `Arc`s alive).
714+ gate. wait_for_started ( 2 ) . await ;
715+
716+ // Release both. The failing one will go through the retry-cleanup
717+ // path while the other still holds an `Arc`. Under the old design
718+ // `Arc::get_mut` would return `None` and close would be skipped.
719+ gate. release ( 2 ) ;
720+
721+ let r1 = t1. await . unwrap ( ) ;
722+ let r2 = t2. await . unwrap ( ) ;
723+
724+ // At least one task observed the retryable error (the mock only
725+ // produces a single error, but ordering between tasks is undefined).
726+ assert ! ( r1. is_err( ) || r2. is_err( ) ) ;
727+
728+ // The graceful close path must have run despite concurrent `Arc`s.
729+ assert ! (
730+ closed. load( std:: sync:: atomic:: Ordering :: Relaxed ) ,
731+ "graceful close did not run; stream would have leaked under old design"
732+ ) ;
733+ // And the slot was cleared so the next ingest creates a fresh stream.
734+ assert ! ( !service. has_active_stream( ) . await ) ;
735+ }
602736}
0 commit comments