@@ -6,13 +6,17 @@ use crate::run_environment::RunEnvironment;
66use crate :: upload:: { UploadError , profile_archive:: ProfileArchiveContent } ;
77use crate :: {
88 prelude:: * ,
9- request_client:: { REQUEST_CLIENT , STREAMING_CLIENT } ,
9+ request_client:: { REQUEST_CLIENT , STREAMING_CLIENT , upload_backoff } ,
1010} ;
1111use async_compression:: tokio:: write:: GzipEncoder ;
1212use console:: style;
1313use reqwest:: StatusCode ;
14+ use reqwest_retry:: {
15+ DefaultRetryableStrategy , RetryDecision , RetryPolicy , Retryable , RetryableStrategy ,
16+ } ;
1417use serde_json:: Value ;
1518use std:: collections:: BTreeMap ;
19+ use std:: time:: SystemTime ;
1620use tokio:: fs:: File ;
1721use tokio:: io:: AsyncWriteExt ;
1822use tokio_tar:: Builder ;
@@ -182,6 +186,66 @@ async fn retrieve_upload_data(
182186 }
183187}
184188
189+ /// Streams a file-backed body with retries. The retry middleware can't replay a
190+ /// stream body (a consumed stream isn't cloneable), so we rebuild it from disk on
191+ /// each attempt and reuse reqwest_retry's backoff policy + transient classification
192+ /// to match the behavior of the middleware-backed in-memory path.
193+ async fn send_streamed_with_retry (
194+ upload_data : & UploadData ,
195+ path : & std:: path:: Path ,
196+ archive_size : u64 ,
197+ archive_hash : & str ,
198+ encoding : Option < String > ,
199+ ) -> Result < reqwest:: Response > {
200+ let policy = upload_backoff ( ) ;
201+ let start = SystemTime :: now ( ) ;
202+ let mut n_past_retries = 0 ;
203+
204+ loop {
205+ let file = File :: open ( path)
206+ . await
207+ . context ( format ! ( "Failed to open file at path: {}" , path. display( ) ) ) ?;
208+ let stream = tokio_util:: io:: ReaderStream :: new ( file) ;
209+ let body = reqwest:: Body :: wrap_stream ( stream) ;
210+
211+ let mut request = STREAMING_CLIENT
212+ . put ( upload_data. upload_url . clone ( ) )
213+ . header ( "Content-Type" , "application/x-tar" )
214+ . header ( "Content-Length" , archive_size)
215+ . header ( "Content-MD5" , archive_hash) ;
216+ if let Some ( encoding) = & encoding {
217+ request = request. header ( "Content-Encoding" , encoding) ;
218+ }
219+
220+ // Wrap into reqwest_middleware::Error so the default classifier can be reused.
221+ let result = request
222+ . body ( body)
223+ . send ( )
224+ . await
225+ . map_err ( reqwest_middleware:: Error :: Reqwest ) ;
226+
227+ let is_transient = matches ! (
228+ DefaultRetryableStrategy . handle( & result) ,
229+ Some ( Retryable :: Transient )
230+ ) ;
231+ if is_transient {
232+ if let RetryDecision :: Retry { execute_after } =
233+ policy. should_retry ( start, n_past_retries)
234+ {
235+ let wait = execute_after
236+ . duration_since ( SystemTime :: now ( ) )
237+ . unwrap_or_default ( ) ;
238+ debug ! ( "Streamed upload attempt failed (transient), retrying in {wait:?}" ) ;
239+ tokio:: time:: sleep ( wait) . await ;
240+ n_past_retries += 1 ;
241+ continue ;
242+ }
243+ }
244+
245+ return Ok ( result?) ;
246+ }
247+ }
248+
185249async fn upload_profile_archive (
186250 upload_data : & UploadData ,
187251 profile_archive : ProfileArchive ,
@@ -206,24 +270,14 @@ async fn upload_profile_archive(
206270 }
207271 content @ ProfileArchiveContent :: UncompressedOnDisk { path }
208272 | content @ ProfileArchiveContent :: CompressedOnDisk { path } => {
209- // Use streaming client without retry middleware for file streams
210- let file = File :: open ( path)
211- . await
212- . context ( format ! ( "Failed to open file at path: {}" , path. display( ) ) ) ?;
213- let stream = tokio_util:: io:: ReaderStream :: new ( file) ;
214- let body = reqwest:: Body :: wrap_stream ( stream) ;
215-
216- let mut request = STREAMING_CLIENT
217- . put ( upload_data. upload_url . clone ( ) )
218- . header ( "Content-Type" , "application/x-tar" )
219- . header ( "Content-Length" , archive_size)
220- . header ( "Content-MD5" , archive_hash) ;
221-
222- if let Some ( encoding) = content. encoding ( ) {
223- request = request. header ( "Content-Encoding" , encoding) ;
224- }
225-
226- request. body ( body) . send ( ) . await ?
273+ send_streamed_with_retry (
274+ upload_data,
275+ path,
276+ archive_size,
277+ & archive_hash,
278+ content. encoding ( ) ,
279+ )
280+ . await ?
227281 }
228282 } ;
229283
@@ -378,4 +432,111 @@ mod tests {
378432 )
379433 . await ;
380434 }
435+
436+ /// Number of upload attempts a transient failure should produce: the initial
437+ /// attempt plus [`UPLOAD_RETRY_COUNT`] retries.
438+ const EXPECTED_ATTEMPTS : usize = crate :: request_client:: UPLOAD_RETRY_COUNT as usize + 1 ;
439+
440+ /// Spawns a local TCP server that answers `503` (a retryable status) to each of
441+ /// the next `max_conns` connections, then exits. Returns the URL, a counter of
442+ /// connections received (= upload attempts), and the server's join handle so the
443+ /// caller can wait for clean shutdown instead of leaking the thread.
444+ fn spawn_mock_returning_503 (
445+ max_conns : usize ,
446+ ) -> (
447+ String ,
448+ std:: sync:: Arc < std:: sync:: atomic:: AtomicUsize > ,
449+ std:: thread:: JoinHandle < ( ) > ,
450+ ) {
451+ use std:: io:: { Read , Write } ;
452+ use std:: net:: TcpListener ;
453+ use std:: sync:: Arc ;
454+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
455+
456+ let listener = TcpListener :: bind ( "127.0.0.1:0" ) . unwrap ( ) ;
457+ let url = format ! ( "http://{}/upload" , listener. local_addr( ) . unwrap( ) ) ;
458+ let hits = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
459+
460+ let hits_loop = hits. clone ( ) ;
461+ let handle = std:: thread:: spawn ( move || {
462+ for stream in listener. incoming ( ) . take ( max_conns) {
463+ let Ok ( mut stream) = stream else { continue } ;
464+ hits_loop. fetch_add ( 1 , Ordering :: SeqCst ) ;
465+ let mut buf = [ 0u8 ; 2048 ] ;
466+ let _ = stream. read ( & mut buf) ;
467+ let body = "transient" ;
468+ let resp = format ! (
469+ "HTTP/1.1 503 Service Unavailable\r \n Content-Length: {}\r \n Connection: close\r \n \r \n {}" ,
470+ body. len( ) ,
471+ body
472+ ) ;
473+ let _ = stream. write_all ( resp. as_bytes ( ) ) ;
474+ }
475+ } ) ;
476+
477+ ( url, hits, handle)
478+ }
479+
480+ fn upload_data_for ( url : String ) -> UploadData {
481+ UploadData {
482+ status : "success" . to_string ( ) ,
483+ upload_url : url,
484+ run_id : "test-run" . to_string ( ) ,
485+ }
486+ }
487+
488+ /// WallTime/Memory path: an on-disk archive is streamed via `STREAMING_CLIENT`,
489+ /// which has no retry middleware, so `send_streamed_with_retry` rebuilds the
490+ /// stream on each attempt. A transient `503` must be retried `UPLOAD_RETRY_COUNT`
491+ /// times.
492+ #[ tokio:: test]
493+ async fn streamed_upload_is_retried ( ) {
494+ use std:: sync:: atomic:: Ordering ;
495+
496+ let ( url, hits, server) = spawn_mock_returning_503 ( EXPECTED_ATTEMPTS ) ;
497+
498+ let path = tempfile:: NamedTempFile :: new ( )
499+ . unwrap ( )
500+ . into_temp_path ( )
501+ . keep ( )
502+ . unwrap ( ) ;
503+ std:: fs:: write ( & path, b"profile-archive" ) . unwrap ( ) ;
504+ let archive = ProfileArchive :: new_uncompressed_on_disk ( path) . unwrap ( ) ;
505+
506+ let result = upload_profile_archive ( & upload_data_for ( url) , archive) . await ;
507+ server. join ( ) . unwrap ( ) ;
508+
509+ assert ! (
510+ result. is_err( ) ,
511+ "a 503 should surface as an error after retries"
512+ ) ;
513+ assert_eq ! (
514+ hits. load( Ordering :: SeqCst ) ,
515+ EXPECTED_ATTEMPTS ,
516+ "streamed upload should be attempted 1 + UPLOAD_RETRY_COUNT times"
517+ ) ;
518+ }
519+
520+ /// Valgrind path: an in-memory archive goes through `REQUEST_CLIENT`, which retries
521+ /// transient failures `UPLOAD_RETRY_COUNT` times. Confirms the same `503` mock is
522+ /// genuinely retryable, so the single attempt above is due to the client, not the
523+ /// status code.
524+ #[ tokio:: test]
525+ async fn in_memory_upload_is_retried ( ) {
526+ use std:: sync:: atomic:: Ordering ;
527+
528+ let ( url, hits, server) = spawn_mock_returning_503 ( EXPECTED_ATTEMPTS ) ;
529+
530+ let archive = ProfileArchive :: new_compressed_in_memory ( b"profile-archive" . to_vec ( ) ) ;
531+
532+ let result = upload_profile_archive ( & upload_data_for ( url) , archive) . await ;
533+ server. join ( ) . unwrap ( ) ;
534+
535+ assert ! ( result. is_err( ) , "a 503 should surface as an error" ) ;
536+ assert_eq ! (
537+ hits. load( Ordering :: SeqCst ) ,
538+ EXPECTED_ATTEMPTS ,
539+ "in-memory upload should be attempted 1 + UPLOAD_RETRY_COUNT times"
540+ ) ;
541+ }
381542}
0 commit comments