@@ -385,8 +385,14 @@ impl<C: NumaflowTypeConfig> JetStreamReader<C> {
385385 . map_err ( |e| Error :: ISB ( format ! ( "Failed to acquire semaphore permit: {e}" ) ) ) ?;
386386
387387 // Apply rate limiting if configured.
388- let effective_batch_size = match & self . rate_limiter {
388+ let ( effective_batch_size, token_grabbed_epoch ) = match & self . rate_limiter {
389389 Some ( rate_limiter) => {
390+ // Note the epoch at which we got the estimated batch size from the rate limiter
391+ let cur_epoch = std:: time:: SystemTime :: now ( )
392+ . duration_since ( std:: time:: UNIX_EPOCH )
393+ . expect ( "Time went backwards beyond unix epoch" )
394+ . as_secs ( ) ;
395+
390396 let tokens_acquired = rate_limiter
391397 . acquire_n ( Some ( batch_size) , Some ( Duration :: from_secs ( 1 ) ) )
392398 . await ;
@@ -396,9 +402,9 @@ impl<C: NumaflowTypeConfig> JetStreamReader<C> {
396402 return Ok ( vec ! [ ] ) ;
397403 }
398404
399- effective_batch_size
405+ ( effective_batch_size, cur_epoch )
400406 }
401- None => batch_size,
407+ None => ( batch_size, 0 ) ,
402408 } ;
403409
404410 let start = Instant :: now ( ) ;
@@ -436,6 +442,23 @@ impl<C: NumaflowTypeConfig> JetStreamReader<C> {
436442 }
437443 } ;
438444
445+ // deposit the unused tokens back to the rate limiter
446+ // utilize the cur_epoch we calculated when we initially acquired the tokens
447+ // to ensure that the tokens are deposited for the correct epoch.
448+ match & self . rate_limiter {
449+ Some ( rate_limiter) => {
450+ rate_limiter
451+ . deposit_unused (
452+ effective_batch_size
453+ . checked_sub ( jetstream_messages. len ( ) )
454+ . unwrap_or ( 0 ) ,
455+ token_grabbed_epoch,
456+ )
457+ . await ;
458+ }
459+ None => { }
460+ }
461+
439462 pipeline_metrics ( )
440463 . jetstream_isb
441464 . read_time_total
0 commit comments