@@ -6,13 +6,17 @@ use crate::run_environment::RunEnvironment;
66use crate :: upload:: { UploadError , profile_archive:: ProfileArchiveContent } ;
77use crate :: {
88 prelude:: * ,
9- request_client:: { REQUEST_CLIENT , STREAMING_CLIENT } ,
9+ request_client:: { REQUEST_CLIENT , STREAMING_CLIENT , upload_backoff } ,
1010} ;
1111use async_compression:: tokio:: write:: GzipEncoder ;
1212use console:: style;
1313use reqwest:: StatusCode ;
14+ use reqwest_retry:: {
15+ DefaultRetryableStrategy , RetryDecision , RetryPolicy , Retryable , RetryableStrategy ,
16+ } ;
1417use serde_json:: Value ;
1518use std:: collections:: BTreeMap ;
19+ use std:: time:: SystemTime ;
1620use tokio:: fs:: File ;
1721use tokio:: io:: AsyncWriteExt ;
1822use tokio_tar:: Builder ;
@@ -182,6 +186,63 @@ async fn retrieve_upload_data(
182186 }
183187}
184188
189+ /// The retry middleware can't replay a consumed stream, so we rebuild the body from
190+ /// disk on each attempt. Response-level errors (4xx/5xx) are left for the caller.
191+ async fn send_streamed_with_retry (
192+ upload_data : & UploadData ,
193+ path : & std:: path:: Path ,
194+ archive_size : u64 ,
195+ archive_hash : & str ,
196+ encoding : Option < String > ,
197+ ) -> Result < reqwest:: Response > {
198+ let policy = upload_backoff ( ) ;
199+ let start = SystemTime :: now ( ) ;
200+ let mut n_past_retries = 0 ;
201+
202+ loop {
203+ let file = File :: open ( path)
204+ . await
205+ . context ( format ! ( "Failed to open file at path: {}" , path. display( ) ) ) ?;
206+ let stream = tokio_util:: io:: ReaderStream :: new ( file) ;
207+ let body = reqwest:: Body :: wrap_stream ( stream) ;
208+
209+ let mut request = STREAMING_CLIENT
210+ . put ( upload_data. upload_url . clone ( ) )
211+ . header ( "Content-Type" , "application/x-tar" )
212+ . header ( "Content-Length" , archive_size)
213+ . header ( "Content-MD5" , archive_hash) ;
214+ if let Some ( encoding) = & encoding {
215+ request = request. header ( "Content-Encoding" , encoding) ;
216+ }
217+
218+ let result = request
219+ . body ( body)
220+ . send ( )
221+ . await
222+ . map_err ( reqwest_middleware:: Error :: Reqwest ) ;
223+
224+ let is_transient = matches ! (
225+ DefaultRetryableStrategy . handle( & result) ,
226+ Some ( Retryable :: Transient )
227+ ) ;
228+ if is_transient {
229+ if let RetryDecision :: Retry { execute_after } =
230+ policy. should_retry ( start, n_past_retries)
231+ {
232+ let wait = execute_after
233+ . duration_since ( SystemTime :: now ( ) )
234+ . unwrap_or_default ( ) ;
235+ debug ! ( "Streamed upload attempt failed (transient), retrying in {wait:?}" ) ;
236+ tokio:: time:: sleep ( wait) . await ;
237+ n_past_retries += 1 ;
238+ continue ;
239+ }
240+ }
241+
242+ return Ok ( result?) ;
243+ }
244+ }
245+
185246async fn upload_profile_archive (
186247 upload_data : & UploadData ,
187248 profile_archive : ProfileArchive ,
@@ -206,24 +267,14 @@ async fn upload_profile_archive(
206267 }
207268 content @ ProfileArchiveContent :: UncompressedOnDisk { path }
208269 | content @ ProfileArchiveContent :: CompressedOnDisk { path } => {
209- // Use streaming client without retry middleware for file streams
210- let file = File :: open ( path)
211- . await
212- . context ( format ! ( "Failed to open file at path: {}" , path. display( ) ) ) ?;
213- let stream = tokio_util:: io:: ReaderStream :: new ( file) ;
214- let body = reqwest:: Body :: wrap_stream ( stream) ;
215-
216- let mut request = STREAMING_CLIENT
217- . put ( upload_data. upload_url . clone ( ) )
218- . header ( "Content-Type" , "application/x-tar" )
219- . header ( "Content-Length" , archive_size)
220- . header ( "Content-MD5" , archive_hash) ;
221-
222- if let Some ( encoding) = content. encoding ( ) {
223- request = request. header ( "Content-Encoding" , encoding) ;
224- }
225-
226- request. body ( body) . send ( ) . await ?
270+ send_streamed_with_retry (
271+ upload_data,
272+ path,
273+ archive_size,
274+ & archive_hash,
275+ content. encoding ( ) ,
276+ )
277+ . await ?
227278 }
228279 } ;
229280
@@ -378,4 +429,103 @@ mod tests {
378429 )
379430 . await ;
380431 }
432+
433+ const EXPECTED_ATTEMPTS : usize = crate :: request_client:: UPLOAD_RETRY_COUNT as usize + 1 ;
434+
435+ /// Answers `503` to each of the next `max_conns` connections, then exits. Returns
436+ /// the URL, a counter of connections received, and the server's join handle.
437+ fn spawn_mock_returning_503 (
438+ max_conns : usize ,
439+ ) -> (
440+ String ,
441+ std:: sync:: Arc < std:: sync:: atomic:: AtomicUsize > ,
442+ std:: thread:: JoinHandle < ( ) > ,
443+ ) {
444+ use std:: io:: { Read , Write } ;
445+ use std:: net:: TcpListener ;
446+ use std:: sync:: Arc ;
447+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
448+
449+ let listener = TcpListener :: bind ( "127.0.0.1:0" ) . unwrap ( ) ;
450+ let url = format ! ( "http://{}/upload" , listener. local_addr( ) . unwrap( ) ) ;
451+ let hits = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
452+
453+ let hits_loop = hits. clone ( ) ;
454+ let handle = std:: thread:: spawn ( move || {
455+ for stream in listener. incoming ( ) . take ( max_conns) {
456+ let Ok ( mut stream) = stream else { continue } ;
457+ hits_loop. fetch_add ( 1 , Ordering :: SeqCst ) ;
458+ let mut buf = [ 0u8 ; 2048 ] ;
459+ let _ = stream. read ( & mut buf) ;
460+ let body = "transient" ;
461+ let resp = format ! (
462+ "HTTP/1.1 503 Service Unavailable\r \n Content-Length: {}\r \n Connection: close\r \n \r \n {}" ,
463+ body. len( ) ,
464+ body
465+ ) ;
466+ let _ = stream. write_all ( resp. as_bytes ( ) ) ;
467+ }
468+ } ) ;
469+
470+ ( url, hits, handle)
471+ }
472+
473+ fn upload_data_for ( url : String ) -> UploadData {
474+ UploadData {
475+ status : "success" . to_string ( ) ,
476+ upload_url : url,
477+ run_id : "test-run" . to_string ( ) ,
478+ }
479+ }
480+
481+ /// On-disk archives stream through `send_streamed_with_retry`, which retries
482+ /// transient failures itself since `STREAMING_CLIENT` has no retry middleware.
483+ #[ tokio:: test]
484+ async fn streamed_upload_is_retried ( ) {
485+ use std:: sync:: atomic:: Ordering ;
486+
487+ let ( url, hits, server) = spawn_mock_returning_503 ( EXPECTED_ATTEMPTS ) ;
488+
489+ let path = tempfile:: NamedTempFile :: new ( )
490+ . unwrap ( )
491+ . into_temp_path ( )
492+ . keep ( )
493+ . unwrap ( ) ;
494+ std:: fs:: write ( & path, b"profile-archive" ) . unwrap ( ) ;
495+ let archive = ProfileArchive :: new_uncompressed_on_disk ( path) . unwrap ( ) ;
496+
497+ let result = upload_profile_archive ( & upload_data_for ( url) , archive) . await ;
498+ server. join ( ) . unwrap ( ) ;
499+
500+ assert ! (
501+ result. is_err( ) ,
502+ "a 503 should surface as an error after retries"
503+ ) ;
504+ assert_eq ! (
505+ hits. load( Ordering :: SeqCst ) ,
506+ EXPECTED_ATTEMPTS ,
507+ "streamed upload should be attempted 1 + UPLOAD_RETRY_COUNT times"
508+ ) ;
509+ }
510+
511+ /// In-memory archives go through `REQUEST_CLIENT`, whose retry middleware handles
512+ /// transient failures.
513+ #[ tokio:: test]
514+ async fn in_memory_upload_is_retried ( ) {
515+ use std:: sync:: atomic:: Ordering ;
516+
517+ let ( url, hits, server) = spawn_mock_returning_503 ( EXPECTED_ATTEMPTS ) ;
518+
519+ let archive = ProfileArchive :: new_compressed_in_memory ( b"profile-archive" . to_vec ( ) ) ;
520+
521+ let result = upload_profile_archive ( & upload_data_for ( url) , archive) . await ;
522+ server. join ( ) . unwrap ( ) ;
523+
524+ assert ! ( result. is_err( ) , "a 503 should surface as an error" ) ;
525+ assert_eq ! (
526+ hits. load( Ordering :: SeqCst ) ,
527+ EXPECTED_ATTEMPTS ,
528+ "in-memory upload should be attempted 1 + UPLOAD_RETRY_COUNT times"
529+ ) ;
530+ }
381531}
0 commit comments