@@ -38,63 +38,97 @@ pub struct Client {
3838macro_rules! impl_inner_call {
3939 ( $self: expr, $name: ident $( , $args: expr) * ) => {
4040 {
41- let mut errors = vec![ ] ;
42- loop {
43- let read_client = $self. client_type. read( ) . unwrap( ) ;
44- let res = match & * read_client {
41+ impl_inner_call_impl( $self, || {
42+ match & * $self. client_type. read( ) . unwrap( ) {
4543 ClientType :: TCP ( inner) => inner. $name( $( $args, ) * ) ,
4644 ClientType :: SSL ( inner) => inner. $name( $( $args, ) * ) ,
4745 ClientType :: Socks5 ( inner) => inner. $name( $( $args, ) * ) ,
48- } ;
49- drop( read_client) ;
50- match res {
51- Ok ( val) => return Ok ( val) ,
52- Err ( Error :: Protocol ( _) | Error :: AlreadySubscribed ( _) ) => {
53- return res;
54- } ,
46+ }
47+ } )
48+ } }
49+ }
50+
51+ fn impl_inner_call_impl < T > (
52+ self_ : & Client ,
53+ mut f : impl FnMut ( ) -> Result < T , Error > ,
54+ ) -> Result < T , Error > {
55+ let mut errors = vec ! [ ] ;
56+ loop {
57+ match f ( ) {
58+ Ok ( val) => return Ok ( val) ,
59+ res @ Err ( Error :: Protocol ( _) | Error :: AlreadySubscribed ( _) ) => {
60+ return res;
61+ }
62+ Err ( e) => impl_inner_call_impl_err ( self_, & mut errors, e) ?,
63+ }
64+ }
65+ }
66+
67+ fn impl_inner_call_impl_err (
68+ self_ : & Client ,
69+ errors : & mut Vec < Error > ,
70+ e : Error ,
71+ ) -> Result < ( ) , Error > {
72+ let failed_attempts = errors. len ( ) + 1 ;
73+
74+ warn ! (
75+ "call '{}' failed with {}, retry: {}/{}" ,
76+ stringify!( $name) ,
77+ e,
78+ failed_attempts,
79+ self_. config. retry( )
80+ ) ;
81+
82+ errors. push ( e) ;
83+
84+ if retries_exhausted ( failed_attempts, self_. config . retry ( ) ) {
85+ warn ! (
86+ "call '{}' failed after {} attempts" ,
87+ stringify!( $name) ,
88+ failed_attempts
89+ ) ;
90+ return Err ( Error :: AllAttemptsErrored ( std:: mem:: take ( errors) ) ) ;
91+ }
92+
93+ // Only one thread will try to recreate the client getting the write lock,
94+ // other eventual threads will get Err and will block at the beginning of
95+ // previous loop when trying to read()
96+ if let Ok ( mut write_client) = self_. client_type . try_write ( ) {
97+ loop {
98+ std:: thread:: sleep ( std:: time:: Duration :: from_secs (
99+ ( 1 << errors. len ( ) ) . min ( 30 ) as u64
100+ ) ) ;
101+ match ClientType :: from_config ( & self_. url , & self_. config ) {
102+ Ok ( new_client) => {
103+ info ! ( "Succesfully created new client" ) ;
104+ * write_client = new_client;
105+ break ;
106+ }
55107 Err ( e) => {
56108 let failed_attempts = errors. len ( ) + 1 ;
57109
58- if retries_exhausted ( failed_attempts , $self . config . retry ( ) ) {
59- warn! ( "call '{}' failed after {} attempts" , stringify! ( $name ) , failed_attempts ) ;
60- return Err ( Error :: AllAttemptsErrored ( errors ) ) ;
61- }
62-
63- warn! ( "call '{}' failed with {}, retry: {}/{}" , stringify! ( $name ) , e , failed_attempts , $self . config . retry ( ) ) ;
110+ warn ! (
111+ "re-creating client failed with {}, retry: {}/{}" ,
112+ e ,
113+ failed_attempts ,
114+ self_ . config . retry ( )
115+ ) ;
64116
65117 errors. push ( e) ;
66118
67- // Only one thread will try to recreate the client getting the write lock,
68- // other eventual threads will get Err and will block at the beginning of
69- // previous loop when trying to read()
70- if let Ok ( mut write_client) = $self. client_type. try_write( ) {
71- loop {
72- std:: thread:: sleep( std:: time:: Duration :: from_secs( ( 1 << errors. len( ) ) . min( 30 ) as u64 ) ) ;
73- match ClientType :: from_config( & $self. url, & $self. config) {
74- Ok ( new_client) => {
75- info!( "Succesfully created new client" ) ;
76- * write_client = new_client;
77- break ;
78- } ,
79- Err ( e) => {
80- let failed_attempts = errors. len( ) + 1 ;
81-
82- if retries_exhausted( failed_attempts, $self. config. retry( ) ) {
83- warn!( "re-creating client failed after {} attempts" , failed_attempts) ;
84- return Err ( Error :: AllAttemptsErrored ( errors) ) ;
85- }
86-
87- warn!( "re-creating client failed with {}, retry: {}/{}" , e, failed_attempts, $self. config. retry( ) ) ;
88-
89- errors. push( e) ;
90- }
91- }
92- }
119+ if retries_exhausted ( failed_attempts, self_. config . retry ( ) ) {
120+ warn ! (
121+ "re-creating client failed after {} attempts" ,
122+ failed_attempts
123+ ) ;
124+ return Err ( Error :: AllAttemptsErrored ( std:: mem:: take ( errors) ) ) ;
93125 }
94- } ,
126+ }
95127 }
96- } }
128+ }
97129 }
130+
131+ Ok ( ( ) )
98132}
99133
100134fn retries_exhausted ( failed_attempts : usize , configured_retries : u8 ) -> bool {
@@ -178,7 +212,7 @@ impl ElectrumApi for Client {
178212 // `RawClient::internal_raw_call_with_vec` method.
179213
180214 let vec = params. into_iter ( ) . collect :: < Vec < Param > > ( ) ;
181- impl_inner_call ! ( self , internal_raw_call_with_vec, method_name, vec. clone( ) ) ;
215+ impl_inner_call ! ( self , internal_raw_call_with_vec, method_name, vec. clone( ) )
182216 }
183217
184218 #[ inline]
@@ -420,4 +454,49 @@ mod tests {
420454
421455 assert ! ( !exhausted)
422456 }
457+
458+ #[ test]
459+ fn impl_inner_call_all_attempts_has_all_errors ( ) {
460+ use std:: io:: { Read , Write } ;
461+ use std:: net:: TcpListener ;
462+
463+ let listener = TcpListener :: bind ( "127.0.0.1:0" ) . unwrap ( ) ;
464+ let port = listener. local_addr ( ) . unwrap ( ) . port ( ) ;
465+ std:: thread:: spawn ( move || {
466+ for stream in listener. incoming ( ) {
467+ let mut stream = stream. unwrap ( ) ;
468+ let mut buf = [ 0u8 ; 512 ] ;
469+ let _ = stream. read ( & mut buf) . unwrap ( ) ;
470+ stream
471+ . write_all (
472+ br#"{"jsonrpc": "2.0", "result": ["ElectrumX 1.18.0", "1.6"], "id": 0}"# ,
473+ )
474+ . unwrap ( ) ;
475+ }
476+ } ) ;
477+
478+ let client = Client :: from_config (
479+ & format ! ( "127.0.0.1:{}" , port) ,
480+ crate :: config:: ConfigBuilder :: new ( ) . retry ( 3 ) . build ( ) ,
481+ )
482+ . unwrap ( ) ;
483+ let msg = |n| format ! ( "error #{}" , n) ;
484+
485+ let mut n = 0 ;
486+ let res: Result < ( ) , _ > = impl_inner_call_impl ( & client, || {
487+ n += 1 ;
488+ Err ( Error :: Message ( msg ( n) ) )
489+ } ) ;
490+ assert_eq ! ( n, 4 ) ;
491+
492+ let err = res. unwrap_err ( ) ;
493+ let Error :: AllAttemptsErrored ( vec) = err else {
494+ panic ! ( )
495+ } ;
496+ assert_eq ! ( vec. len( ) , n) ;
497+ for ( i, err) in vec. into_iter ( ) . enumerate ( ) {
498+ let Error :: Message ( m) = err else { panic ! ( ) } ;
499+ assert_eq ! ( m, msg( i + 1 ) ) ;
500+ }
501+ }
423502}
0 commit comments