@@ -26,6 +26,7 @@ use tokio::sync::oneshot::Sender;
2626// Use a `tokio::time::Instant` to facilitate time-based unit testing.
2727use tokio:: task:: JoinSet ;
2828use tokio:: time:: { Duration , Instant , Interval , interval_at} ;
29+ use tokio_util:: task:: TaskTracker ;
2930
3031// An ack ID is less than 200 bytes. The limit for a request is 512kB. It should
3132// be safe to fit 2500 Ack IDs in a single RPC.
@@ -117,6 +118,9 @@ where
117118 // How long messages can be kept under lease
118119 max_lease : Duration ,
119120
121+ // In flight acks and nacks.
122+ pending_acks_nacks : TaskTracker ,
123+
120124 // In flight lease extension operations.
121125 //
122126 // These are held separate from pending acks/nacks because we do not need to
@@ -149,6 +153,7 @@ where
149153 flush_interval,
150154 extend_interval,
151155 max_lease : options. max_lease ,
156+ pending_acks_nacks : TaskTracker :: new ( ) ,
152157 pending_extends : JoinSet :: new ( ) ,
153158 }
154159 }
@@ -201,22 +206,29 @@ where
201206 }
202207
203208 /// Flush pending acks/nacks
204- pub ( super ) async fn flush ( & mut self ) {
205- // TODO(#3975) - await these concurrently.
209+ pub ( super ) fn flush ( & mut self ) {
206210 let ( to_ack, to_nack) = self . leases . drain ( ) ;
207211 if !to_ack. is_empty ( ) {
208- self . leaser . ack ( to_ack) . await ;
212+ let leaser = self . leaser . clone ( ) ;
213+ self . pending_acks_nacks
214+ . spawn ( async move { leaser. ack ( to_ack) . await } ) ;
209215 }
210216 if !to_nack. is_empty ( ) {
211- self . leaser . nack ( to_nack) . await ;
217+ let leaser = self . leaser . clone ( ) ;
218+ self . pending_acks_nacks
219+ . spawn ( async move { leaser. nack ( to_nack) . await } ) ;
212220 }
213221
214222 let ( to_ack, to_nack) = self . eo_leases . drain ( ) ;
215223 if !to_ack. is_empty ( ) {
216- self . leaser . confirmed_ack ( to_ack) . await ;
224+ let leaser = self . leaser . clone ( ) ;
225+ self . pending_acks_nacks
226+ . spawn ( async move { leaser. confirmed_ack ( to_ack) . await } ) ;
217227 }
218228 if !to_nack. is_empty ( ) {
219- self . leaser . nack ( to_nack) . await ;
229+ let leaser = self . leaser . clone ( ) ;
230+ self . pending_acks_nacks
231+ . spawn ( async move { leaser. nack ( to_nack) . await } ) ;
220232 }
221233 }
222234
@@ -246,18 +258,20 @@ where
246258 ///
247259 /// This flushes all pending acks and nacks all other messages.
248260 pub ( super ) async fn shutdown ( mut self ) {
249- // TODO(#3975) - await these concurrently.
250-
251261 // Note that if `WaitForProcessing` was selected by the application,
252262 // there are no messages under lease. They have all been processed.
253263 self . leases . evict ( ) ;
254264 let ( to_ack, to_nack) = self . leases . drain ( ) ;
255265 if !to_ack. is_empty ( ) {
256- self . leaser . ack ( to_ack) . await ;
266+ let leaser = self . leaser . clone ( ) ;
267+ self . pending_acks_nacks
268+ . spawn ( async move { leaser. ack ( to_ack) . await } ) ;
257269 }
258270 if !to_nack. is_empty ( ) {
259271 // TODO(#4847) - this nack needs to be broken into batches.
260- self . leaser . nack ( to_nack) . await ;
272+ let leaser = self . leaser . clone ( ) ;
273+ self . pending_acks_nacks
274+ . spawn ( async move { leaser. nack ( to_nack) . await } ) ;
261275 }
262276
263277 // TODO(#5109) - evicting exactly-once leases is ok, but not ideal.
@@ -269,9 +283,15 @@ where
269283 let ( _, to_nack) = self . eo_leases . drain ( ) ;
270284 if !to_nack. is_empty ( ) {
271285 // TODO(#4847) - this nack needs to be broken into batches.
272- self . leaser . nack ( to_nack) . await ;
286+ let leaser = self . leaser . clone ( ) ;
287+ self . pending_acks_nacks
288+ . spawn ( async move { leaser. nack ( to_nack) . await } ) ;
273289 }
274290
291+ // Wait for pending acks/nacks to complete.
292+ self . pending_acks_nacks . close ( ) ;
293+ self . pending_acks_nacks . wait ( ) . await ;
294+
275295 // Wait for pending lease extensions to complete. This is not useful in
276296 // practice, because we are nacking all the messages, but it simplifies
277297 // our tests.
@@ -334,6 +354,16 @@ pub(super) mod tests {
334354 let _ = pending_extends. join_all ( ) . await ;
335355 }
336356
357+ async fn flush_and_await < L > ( state : & mut LeaseState < L > )
358+ where
359+ L : Leaser + Clone + Send + ' static ,
360+ {
361+ state. flush ( ) ;
362+ let pending_acks_nacks = std:: mem:: take ( & mut state. pending_acks_nacks ) ;
363+ pending_acks_nacks. close ( ) ;
364+ pending_acks_nacks. wait ( ) . await ;
365+ }
366+
337367 #[ tokio:: test( start_paused = true ) ]
338368 async fn basic_add_ack_nack ( ) {
339369 let mock = MockLeaser :: new ( ) ;
@@ -529,7 +559,7 @@ pub(super) mod tests {
529559 // no messages under lease management.
530560 let mut state = LeaseState :: new ( Arc :: new ( mock) , LeaseOptions :: default ( ) ) ;
531561 extend_and_await ( & mut state) . await ;
532- state. flush ( ) . await ;
562+ state. flush ( ) ;
533563 state. shutdown ( ) . await ;
534564 }
535565
@@ -588,7 +618,7 @@ pub(super) mod tests {
588618 state. eo_leases
589619 ) ;
590620
591- state . flush ( ) . await ;
621+ flush_and_await ( & mut state ) . await ;
592622 assert_eq ! (
593623 TestLeases {
594624 under_lease: test_ids( 20 ..100 ) ,
@@ -621,6 +651,43 @@ pub(super) mod tests {
621651 ) ;
622652 }
623653
654+ #[ tokio:: test( start_paused = true ) ]
655+ async fn pending_acks_nacks_size_management ( ) {
656+ let mut mock = MockLeaser :: new ( ) ;
657+ mock. expect_ack ( )
658+ . times ( 1 )
659+ . withf ( |v| * v == vec ! [ test_id( 1 ) ] )
660+ . returning ( |_| ( ) ) ;
661+ mock. expect_nack ( )
662+ . times ( 1 )
663+ . withf ( |v| * v == vec ! [ test_id( 2 ) ] )
664+ . returning ( |_| ( ) ) ;
665+
666+ let mut state = LeaseState :: new ( Arc :: new ( mock) , LeaseOptions :: default ( ) ) ;
667+
668+ state. add ( test_id ( 1 ) , at_least_once_info ( ) ) ;
669+ state. process ( Ack ( test_id ( 1 ) ) ) ;
670+
671+ state. flush ( ) ;
672+ // Yield execution so the ack attempt can execute.
673+ tokio:: task:: yield_now ( ) . await ;
674+ assert ! (
675+ state. pending_acks_nacks. is_empty( ) ,
676+ "The ack task should have completed. We should not hold onto it."
677+ ) ;
678+
679+ state. add ( test_id ( 2 ) , at_least_once_info ( ) ) ;
680+ state. process ( Nack ( test_id ( 2 ) ) ) ;
681+
682+ state. flush ( ) ;
683+ // Yield execution so the nack attempt can execute.
684+ tokio:: task:: yield_now ( ) . await ;
685+ assert ! (
686+ state. pending_acks_nacks. is_empty( ) ,
687+ "The nack task should have completed. We should not hold onto it."
688+ ) ;
689+ }
690+
624691 #[ tokio:: test( start_paused = true ) ]
625692 async fn extend_at_least_once ( ) {
626693 let mut seq = mockall:: Sequence :: new ( ) ;
@@ -727,7 +794,7 @@ pub(super) mod tests {
727794 extend_and_await ( & mut state) . await ;
728795
729796 // Flush the acks and confirm.
730- state . flush ( ) . await ;
797+ flush_and_await ( & mut state ) . await ;
731798 state. confirm ( ack_results) ;
732799
733800 // We should not extend the confirmed acks.
@@ -872,7 +939,7 @@ pub(super) mod tests {
872939 // With MAX_IDS_PER_RPC pending acks, the batch is full. We should flush it now.
873940 assert_eq ! ( state. next_event( ) . await , LeaseEvent :: Flush ) ;
874941 assert_eq ! ( start. elapsed( ) , Duration :: ZERO ) ;
875- state . flush ( ) . await ;
942+ flush_and_await ( & mut state ) . await ;
876943
877944 // With the batch is not full. The next event should occur on the interval timer.
878945 for i in MAX_IDS_PER_RPC ..( 2 * MAX_IDS_PER_RPC - 1 ) {
@@ -910,7 +977,7 @@ pub(super) mod tests {
910977 }
911978 assert_eq ! ( state. next_event( ) . await , LeaseEvent :: Flush ) ;
912979 assert_eq ! ( start. elapsed( ) , Duration :: ZERO ) ;
913- state . flush ( ) . await ;
980+ flush_and_await ( & mut state ) . await ;
914981
915982 // With the batch is not full. The next event should occur on the interval timer.
916983 for i in MAX_IDS_PER_RPC ..( 2 * MAX_IDS_PER_RPC - 1 ) {
@@ -948,7 +1015,7 @@ pub(super) mod tests {
9481015 // With MAX_IDS_PER_RPC pending nacks, the batch is full. We should flush it now.
9491016 assert_eq ! ( state. next_event( ) . await , LeaseEvent :: Flush ) ;
9501017 assert_eq ! ( start. elapsed( ) , Duration :: ZERO ) ;
951- state . flush ( ) . await ;
1018+ flush_and_await ( & mut state ) . await ;
9521019
9531020 // With the batch is not full. The next event should occur on the interval timer.
9541021 for i in MAX_IDS_PER_RPC ..( 2 * MAX_IDS_PER_RPC - 1 ) {
@@ -987,7 +1054,7 @@ pub(super) mod tests {
9871054 // With MAX_IDS_PER_RPC pending nacks for exactly once leases, the batch is full. We should flush it now.
9881055 assert_eq ! ( state. next_event( ) . await , LeaseEvent :: Flush ) ;
9891056 assert_eq ! ( start. elapsed( ) , Duration :: ZERO ) ;
990- state . flush ( ) . await ;
1057+ flush_and_await ( & mut state ) . await ;
9911058
9921059 // With the batch is not full. The next event should occur on the interval timer.
9931060 for i in MAX_IDS_PER_RPC ..( 2 * MAX_IDS_PER_RPC - 1 ) {
0 commit comments