@@ -18,7 +18,11 @@ use tokio_stream::{
1818} ;
1919use tonic:: transport:: Channel ;
2020
21- use crate :: { config:: GrpcConfig , event:: Event , metrics:: EventCounter } ;
21+ use crate :: {
22+ config:: { BackoffConfig , GrpcConfig } ,
23+ event:: Event ,
24+ metrics:: EventCounter ,
25+ } ;
2226
2327struct Backoff {
2428 initial : Duration ,
@@ -46,6 +50,12 @@ impl Backoff {
4650 }
4751}
4852
53+ impl From < & BackoffConfig > for Backoff {
54+ fn from ( value : & BackoffConfig ) -> Self {
55+ Backoff :: new ( value. initial ( ) , value. max ( ) )
56+ }
57+ }
58+
4959pub struct Client {
5060 rx : broadcast:: Receiver < Arc < Event > > ,
5161 running : watch:: Receiver < bool > ,
@@ -148,8 +158,7 @@ impl Client {
148158
149159 async fn run ( & mut self ) -> anyhow:: Result < bool > {
150160 let connector = self . get_connector ( ) . await ?;
151- let backoff_config = self . config . borrow ( ) . backoff . clone ( ) ;
152- let mut backoff = Backoff :: new ( backoff_config. initial ( ) , backoff_config. max ( ) ) ;
161+ let mut backoff = Backoff :: from ( & self . config . borrow ( ) . backoff ) ;
153162 loop {
154163 info ! ( "Attempting to connect to gRPC server..." ) ;
155164 let channel = match self . create_channel ( connector. clone ( ) ) . await {
@@ -187,9 +196,6 @@ impl Client {
187196 Ok ( _) => info!( "gRPC stream ended" ) ,
188197 Err ( e) => warn!( "gRPC stream error: {e:?}" ) ,
189198 }
190- let delay = backoff. next( ) ;
191- warn!( "Reconnecting in {delay:?}..." ) ;
192- sleep( delay) . await ;
193199 }
194200 _ = self . config. changed( ) => return Ok ( true ) ,
195201 _ = self . running. changed( ) => return Ok ( * self . running. borrow( ) ) ,
@@ -235,9 +241,9 @@ mod tests {
235241 #[ test]
236242 fn backoff_reset ( ) {
237243 let mut b = Backoff :: new ( Duration :: from_secs ( 1 ) , Duration :: from_secs ( 60 ) ) ;
238- b. next ( ) ;
239- b. next ( ) ;
240- b. next ( ) ;
244+ assert_eq ! ( b. next( ) , Duration :: from_secs ( 1 ) ) ;
245+ assert_eq ! ( b. next( ) , Duration :: from_secs ( 2 ) ) ;
246+ assert_eq ! ( b. next( ) , Duration :: from_secs ( 4 ) ) ;
241247 b. reset ( ) ;
242248 assert_eq ! ( b. next( ) , Duration :: from_secs( 1 ) ) ;
243249 assert_eq ! ( b. next( ) , Duration :: from_secs( 2 ) ) ;
0 commit comments