11use crate :: config:: http:: REQWEST_DEFAULT_USER_AGENT ;
2- use futures_util:: future;
2+ use log:: { error, warn} ;
3+ use rand:: RngExt ;
34use reqwest:: { IntoUrl , Request , RequestBuilder , Response } ;
5+ use std:: time:: Duration ;
46use tower:: retry:: Policy ;
57
8+ pub const MAX_RETRIES : usize = 3 ;
9+ const BACKOFF_MS : & [ u64 ] = & [ 250 , 500 , 1000 ] ;
10+ const JITTER_MAX_MS : u64 = 100 ;
11+
612#[ derive( Debug , Clone ) ]
7- pub struct RetryPolicy ( pub usize ) ;
13+ pub struct RetryPolicy {
14+ provider : & ' static str ,
15+ remaining : usize ,
16+ max : usize ,
17+ }
818
9- impl < E > Policy < Request , Response , E > for RetryPolicy {
10- type Future = future:: Ready < ( ) > ;
19+ impl RetryPolicy {
20+ pub fn new ( provider : & ' static str ) -> Self {
21+ Self :: with_max ( provider, MAX_RETRIES )
22+ }
1123
12- fn retry ( & mut self , _: & mut Request , result : & mut Result < Response , E > ) -> Option < Self :: Future > {
13- if self . 0 == 0 {
14- return None ;
24+ pub fn with_max ( provider : & ' static str , max : usize ) -> Self {
25+ Self {
26+ provider,
27+ remaining : max,
28+ max,
1529 }
30+ }
31+ }
32+
33+ #[ derive( Debug , PartialEq , Eq ) ]
34+ enum RetryDecision {
35+ Skip ,
36+ Retry { delay_ms : u64 } ,
37+ Exhausted ,
38+ }
1639
17- if result. is_err ( ) {
18- self . 0 -= 1 ;
19- Some ( future:: ready ( ( ) ) )
20- } else if let Ok ( res) = result {
21- if res. status ( ) . is_server_error ( ) {
22- self . 0 -= 1 ;
23- Some ( future:: ready ( ( ) ) )
24- } else {
40+ fn decide_retry ( remaining : usize , max : usize , is_retryable : bool ) -> RetryDecision {
41+ if !is_retryable {
42+ return RetryDecision :: Skip ;
43+ }
44+ if remaining == 0 {
45+ return RetryDecision :: Exhausted ;
46+ }
47+ let attempt_done = max - remaining + 1 ;
48+ let base = BACKOFF_MS [ ( attempt_done - 1 ) . min ( BACKOFF_MS . len ( ) - 1 ) ] ;
49+ let jitter: u64 = rand:: rng ( ) . random_range ( 0 ..=JITTER_MAX_MS ) ;
50+ RetryDecision :: Retry {
51+ delay_ms : base + jitter,
52+ }
53+ }
54+
55+ impl < E : std:: fmt:: Display > Policy < Request , Response , E > for RetryPolicy {
56+ type Future = tokio:: time:: Sleep ;
57+
58+ fn retry (
59+ & mut self ,
60+ req : & mut Request ,
61+ result : & mut Result < Response , E > ,
62+ ) -> Option < Self :: Future > {
63+ let is_retryable = match result {
64+ Err ( _) => true ,
65+ Ok ( res) => res. status ( ) . is_server_error ( ) ,
66+ } ;
67+ let attempt_done = self . max - self . remaining + 1 ;
68+ match decide_retry ( self . remaining , self . max , is_retryable) {
69+ RetryDecision :: Skip => None ,
70+ RetryDecision :: Exhausted => {
71+ let cause = match result {
72+ Err ( e) => format ! ( "network error: {e}" ) ,
73+ Ok ( res) => format ! ( "HTTP {}" , res. status( ) . as_u16( ) ) ,
74+ } ;
75+ error ! (
76+ "{} request to {} failed after {} retries ({cause})" ,
77+ self . provider,
78+ req. url( ) ,
79+ self . max
80+ ) ;
2581 None
2682 }
27- } else {
28- None
83+ RetryDecision :: Retry { delay_ms } => {
84+ let cause = match result {
85+ Err ( e) => format ! ( "network error: {e}" ) ,
86+ Ok ( res) => format ! ( "HTTP {}" , res. status( ) . as_u16( ) ) ,
87+ } ;
88+ warn ! (
89+ "{} request to {} returned {cause}; retrying ({attempt_done}/{}) after {delay_ms}ms" ,
90+ self . provider,
91+ req. url( ) ,
92+ self . max
93+ ) ;
94+ self . remaining -= 1 ;
95+ Some ( tokio:: time:: sleep ( Duration :: from_millis ( delay_ms) ) )
96+ }
2997 }
3098 }
3199
@@ -34,6 +102,54 @@ impl<E> Policy<Request, Response, E> for RetryPolicy {
34102 }
35103}
36104
105+ #[ cfg( test) ]
106+ mod tests {
107+ use super :: * ;
108+
109+ #[ test]
110+ fn skips_when_not_retryable ( ) {
111+ assert_eq ! ( decide_retry( 3 , 3 , false ) , RetryDecision :: Skip ) ;
112+ }
113+
114+ #[ test]
115+ fn exhausted_when_remaining_is_zero ( ) {
116+ assert_eq ! ( decide_retry( 0 , 3 , true ) , RetryDecision :: Exhausted ) ;
117+ }
118+
119+ #[ test]
120+ fn retries_when_remaining_and_retryable ( ) {
121+ match decide_retry ( 3 , 3 , true ) {
122+ RetryDecision :: Retry { delay_ms } => {
123+ assert ! ( ( BACKOFF_MS [ 0 ] ..=BACKOFF_MS [ 0 ] + JITTER_MAX_MS ) . contains( & delay_ms) ) ;
124+ }
125+ other => panic ! ( "expected Retry, got {other:?}" ) ,
126+ }
127+ }
128+
129+ #[ test]
130+ fn backoff_grows_with_attempts ( ) {
131+ fn base_for ( remaining : usize , max : usize ) -> u64 {
132+ let attempt_done = max - remaining + 1 ;
133+ BACKOFF_MS [ ( attempt_done - 1 ) . min ( BACKOFF_MS . len ( ) - 1 ) ]
134+ }
135+ assert_eq ! ( base_for( 3 , 3 ) , 250 ) ;
136+ assert_eq ! ( base_for( 2 , 3 ) , 500 ) ;
137+ assert_eq ! ( base_for( 1 , 3 ) , 1000 ) ;
138+ }
139+
140+ #[ test]
141+ fn backoff_clamps_to_last_step_after_schedule_ends ( ) {
142+ fn base_for ( remaining : usize , max : usize ) -> u64 {
143+ let attempt_done = max - remaining + 1 ;
144+ BACKOFF_MS [ ( attempt_done - 1 ) . min ( BACKOFF_MS . len ( ) - 1 ) ]
145+ }
146+ assert_eq ! ( base_for( 4 , 5 ) , 500 ) ;
147+ assert_eq ! ( base_for( 3 , 5 ) , 1000 ) ;
148+ assert_eq ! ( base_for( 2 , 5 ) , 1000 ) ;
149+ assert_eq ! ( base_for( 1 , 5 ) , 1000 ) ;
150+ }
151+ }
152+
37153pub trait RequestClientExt {
38154 fn get_default_user_agent < U : IntoUrl > ( & self , url : U ) -> RequestBuilder ;
39155}
0 commit comments