Skip to content

Commit df83123

Browse files
committed
fix(upload): retry streamed uploads on transient failures
The on-disk/streamed upload path (WallTime/Memory) used STREAMING_CLIENT, which has no retry middleware, so a transient failure killed the upload after a single attempt. The retry middleware can't help here because it retries by cloning the request, and a streamed body isn't cloneable. Add send_streamed_with_retry, which rebuilds the stream from disk on each attempt while reusing reqwest_retry's ExponentialBackoff policy and DefaultRetryableStrategy, so the streamed path retries with the same backoff and transient classification as the middleware-backed path.
1 parent 4765d43 commit df83123

2 files changed

Lines changed: 73 additions & 20 deletions

File tree

src/request_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use reqwest::ClientBuilder;
44
use reqwest_middleware::{ClientBuilder as ClientWithMiddlewareBuilder, ClientWithMiddleware};
55
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
66

7-
const UPLOAD_RETRY_COUNT: u32 = 3;
7+
pub const UPLOAD_RETRY_COUNT: u32 = 3;
88
const OIDC_RETRY_COUNT: u32 = 10;
99
const USER_AGENT: &str = "codspeed-runner";
1010

src/upload/uploader.rs

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@ use crate::run_environment::RunEnvironment;
66
use crate::upload::{UploadError, profile_archive::ProfileArchiveContent};
77
use crate::{
88
prelude::*,
9-
request_client::{REQUEST_CLIENT, STREAMING_CLIENT},
9+
request_client::{REQUEST_CLIENT, STREAMING_CLIENT, UPLOAD_RETRY_COUNT},
1010
};
1111
use async_compression::tokio::write::GzipEncoder;
1212
use console::style;
1313
use reqwest::StatusCode;
14+
use reqwest_retry::{
15+
DefaultRetryableStrategy, RetryDecision, RetryPolicy, Retryable, RetryableStrategy,
16+
policies::ExponentialBackoff,
17+
};
1418
use serde_json::Value;
1519
use std::collections::BTreeMap;
20+
use std::time::SystemTime;
1621
use tokio::fs::File;
1722
use tokio::io::AsyncWriteExt;
1823
use tokio_tar::Builder;
@@ -182,6 +187,64 @@ async fn retrieve_upload_data(
182187
}
183188
}
184189

190+
/// Streams a file-backed body with retries. The retry middleware can't replay a
191+
/// stream body (a consumed stream isn't cloneable), so we rebuild it from disk on
192+
/// each attempt and reuse reqwest_retry's backoff policy + transient classification
193+
/// to match the behavior of the middleware-backed in-memory path.
194+
async fn send_streamed_with_retry(
195+
upload_data: &UploadData,
196+
path: &std::path::Path,
197+
archive_size: u64,
198+
archive_hash: &str,
199+
encoding: Option<String>,
200+
) -> Result<reqwest::Response> {
201+
let policy = ExponentialBackoff::builder().build_with_max_retries(UPLOAD_RETRY_COUNT);
202+
let start = SystemTime::now();
203+
let mut n_past_retries = 0;
204+
205+
loop {
206+
let file = File::open(path)
207+
.await
208+
.context(format!("Failed to open file at path: {}", path.display()))?;
209+
let stream = tokio_util::io::ReaderStream::new(file);
210+
let body = reqwest::Body::wrap_stream(stream);
211+
212+
let mut request = STREAMING_CLIENT
213+
.put(upload_data.upload_url.clone())
214+
.header("Content-Type", "application/x-tar")
215+
.header("Content-Length", archive_size)
216+
.header("Content-MD5", archive_hash);
217+
if let Some(encoding) = &encoding {
218+
request = request.header("Content-Encoding", encoding);
219+
}
220+
221+
// Wrap into reqwest_middleware::Error so the default classifier can be reused.
222+
let result = request
223+
.body(body)
224+
.send()
225+
.await
226+
.map_err(reqwest_middleware::Error::Reqwest);
227+
228+
let is_transient =
229+
matches!(DefaultRetryableStrategy.handle(&result), Some(Retryable::Transient));
230+
if is_transient {
231+
if let RetryDecision::Retry { execute_after } =
232+
policy.should_retry(start, n_past_retries)
233+
{
234+
let wait = execute_after
235+
.duration_since(SystemTime::now())
236+
.unwrap_or_default();
237+
debug!("Streamed upload attempt failed (transient), retrying in {wait:?}");
238+
tokio::time::sleep(wait).await;
239+
n_past_retries += 1;
240+
continue;
241+
}
242+
}
243+
244+
return Ok(result?);
245+
}
246+
}
247+
185248
async fn upload_profile_archive(
186249
upload_data: &UploadData,
187250
profile_archive: ProfileArchive,
@@ -206,24 +269,14 @@ async fn upload_profile_archive(
206269
}
207270
content @ ProfileArchiveContent::UncompressedOnDisk { path }
208271
| content @ ProfileArchiveContent::CompressedOnDisk { path } => {
209-
// Use streaming client without retry middleware for file streams
210-
let file = File::open(path)
211-
.await
212-
.context(format!("Failed to open file at path: {}", path.display()))?;
213-
let stream = tokio_util::io::ReaderStream::new(file);
214-
let body = reqwest::Body::wrap_stream(stream);
215-
216-
let mut request = STREAMING_CLIENT
217-
.put(upload_data.upload_url.clone())
218-
.header("Content-Type", "application/x-tar")
219-
.header("Content-Length", archive_size)
220-
.header("Content-MD5", archive_hash);
221-
222-
if let Some(encoding) = content.encoding() {
223-
request = request.header("Content-Encoding", encoding);
224-
}
225-
226-
request.body(body).send().await?
272+
send_streamed_with_retry(
273+
upload_data,
274+
path,
275+
archive_size,
276+
&archive_hash,
277+
content.encoding(),
278+
)
279+
.await?
227280
}
228281
};
229282

0 commit comments

Comments
 (0)