Skip to content

Commit 2f75159

Browse files
committed
for download retry on rate limit reached from S3 gateway
1 parent aef6526 commit 2f75159

4 files changed

Lines changed: 322 additions & 13 deletions

File tree

crates/fula-client/src/encryption.rs

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,50 @@ fn flush_backoff_delay(attempt: usize) -> std::time::Duration {
8888
static FLUSH_BACKOFF_COUNT: std::sync::atomic::AtomicU64 =
8989
std::sync::atomic::AtomicU64::new(0);
9090

91+
/// Fixed delay base (ms) between transient-error retries on HAMT blob-backend
92+
/// GET/PUT. Chosen so a fully drained nginx `limit_req` burst (token bucket
93+
/// with sub-second refill at the gateway's configured rate) has time to refill
94+
/// before the next attempt. Not exponential: the rate-limit condition resets
95+
/// per second, so later attempts don't benefit from longer waits.
96+
#[cfg(not(target_arch = "wasm32"))]
97+
const BLOB_BACKEND_RETRY_BASE_MS: u64 = 300;
98+
/// Maximum added jitter for blob-backend retries. De-synchronises fan-out
99+
/// retries so many concurrent walkers don't all wake at the same instant.
100+
#[cfg(not(target_arch = "wasm32"))]
101+
const BLOB_BACKEND_RETRY_JITTER_MS: u64 = 100;
102+
/// Total attempts (including the first try) for blob-backend GET/PUT.
103+
#[cfg(not(target_arch = "wasm32"))]
104+
const BLOB_BACKEND_MAX_ATTEMPTS: u32 = 4;
105+
106+
/// Compute the fixed-plus-jitter sleep duration for a blob-backend retry.
107+
///
108+
/// Same jitter source (`SystemTime::subsec_nanos()`) as `flush_backoff_delay`;
109+
/// no `rand` dependency pulled in just for this.
110+
#[cfg(not(target_arch = "wasm32"))]
111+
fn blob_backend_retry_delay() -> std::time::Duration {
112+
use std::time::{SystemTime, UNIX_EPOCH};
113+
let jitter = SystemTime::now()
114+
.duration_since(UNIX_EPOCH)
115+
.map(|d| u64::from(d.subsec_nanos()))
116+
.unwrap_or(0)
117+
% (BLOB_BACKEND_RETRY_JITTER_MS + 1);
118+
std::time::Duration::from_millis(BLOB_BACKEND_RETRY_BASE_MS + jitter)
119+
}
120+
121+
/// Process-wide counter bumped each time `S3BlobBackend::{get,put}` sleeps on
122+
/// a transient-5xx retry. Observable via [`blob_backend_retry_count`] so the
123+
/// fault-injection integration test can assert the retry path actually ran.
124+
#[cfg(not(target_arch = "wasm32"))]
125+
static BLOB_BACKEND_RETRY_COUNT: std::sync::atomic::AtomicU64 =
126+
std::sync::atomic::AtomicU64::new(0);
127+
128+
/// Read the total number of blob-backend transient-5xx retries since process
129+
/// start. Native-only.
130+
#[cfg(not(target_arch = "wasm32"))]
131+
pub fn blob_backend_retry_count() -> u64 {
132+
BLOB_BACKEND_RETRY_COUNT.load(std::sync::atomic::Ordering::Relaxed)
133+
}
134+
91135
/// Read the total number of flush-forest backoff sleeps observed since
92136
/// process start. Native-only.
93137
#[cfg(not(target_arch = "wasm32"))]
@@ -294,21 +338,70 @@ fn client_err_to_crypto(err: ClientError) -> CryptoError {
294338
#[cfg(not(target_arch = "wasm32"))]
295339
#[async_trait::async_trait]
296340
impl BlobBackend for S3BlobBackend {
341+
/// Retries transient 5xx responses (nginx `limit_req` 503, upstream
342+
/// 429/500/502/503/504, S3 `SlowDown`/`InternalError`/`ServiceUnavailable`)
343+
/// with a fixed 300 ms + 0-100 ms jitter delay, up to 4 attempts total.
344+
/// Non-transient errors (auth failure, NotFound, etc.) short-circuit.
297345
async fn get(&self, path: &str) -> fula_crypto::Result<Vec<u8>> {
298-
let bytes = self
299-
.inner
300-
.get_object(&self.bucket, path)
301-
.await
302-
.map_err(client_err_to_crypto)?;
303-
Ok(bytes.to_vec())
346+
let mut attempt: u32 = 0;
347+
loop {
348+
attempt += 1;
349+
match self.inner.get_object(&self.bucket, path).await {
350+
Ok(bytes) => return Ok(bytes.to_vec()),
351+
Err(e)
352+
if attempt < BLOB_BACKEND_MAX_ATTEMPTS
353+
&& crate::multipart::is_transient(&e) =>
354+
{
355+
tracing::debug!(
356+
bucket = %self.bucket,
357+
path = %path,
358+
attempt,
359+
error = %e,
360+
"S3BlobBackend::get retrying transient 5xx"
361+
);
362+
BLOB_BACKEND_RETRY_COUNT
363+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
364+
tokio::time::sleep(blob_backend_retry_delay()).await;
365+
continue;
366+
}
367+
Err(e) => return Err(client_err_to_crypto(e)),
368+
}
369+
}
304370
}
305371

372+
/// Same retry policy as `get`. `put_object` is idempotent on v7 HAMT
373+
/// node keys — they are content-addressed (blake3 over the plaintext
374+
/// node), so re-uploading the same bytes at the same path is safe.
306375
async fn put(&self, path: &str, bytes: Vec<u8>) -> fula_crypto::Result<()> {
307-
self.inner
308-
.put_object(&self.bucket, path, bytes)
309-
.await
310-
.map(|_| ())
311-
.map_err(client_err_to_crypto)
376+
let mut attempt: u32 = 0;
377+
loop {
378+
attempt += 1;
379+
// Clone the body each attempt: reqwest consumes the body, and we
380+
// want to re-send the same bytes on retry. The retry path is cold
381+
// and HAMT node blobs are small (sub-KB typical), so this is
382+
// negligible on the happy path too.
383+
let body = bytes.clone();
384+
match self.inner.put_object(&self.bucket, path, body).await {
385+
Ok(_) => return Ok(()),
386+
Err(e)
387+
if attempt < BLOB_BACKEND_MAX_ATTEMPTS
388+
&& crate::multipart::is_transient(&e) =>
389+
{
390+
tracing::debug!(
391+
bucket = %self.bucket,
392+
path = %path,
393+
attempt,
394+
error = %e,
395+
"S3BlobBackend::put retrying transient 5xx"
396+
);
397+
BLOB_BACKEND_RETRY_COUNT
398+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
399+
tokio::time::sleep(blob_backend_retry_delay()).await;
400+
continue;
401+
}
402+
Err(e) => return Err(client_err_to_crypto(e)),
403+
}
404+
}
312405
}
313406
}
314407

crates/fula-client/src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub(crate) static TEST_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()
5757

5858
pub use client::FulaClient;
5959
pub use config::Config;
60-
pub use encryption::{EncryptedClient, EncryptionConfig, DecryptedObjectInfo, FileMetadata, DirectoryListing, PinningCredentials};
60+
pub use encryption::{EncryptedClient, EncryptionConfig, DecryptedObjectInfo, FileMetadata, DirectoryListing, PinningCredentials, S3BlobBackend};
6161

6262
/// Crash-injection atomics used by the workspace integration tests to
6363
/// simulate client-side aborts between phases of `migrate_v1_to_v7_internal`.
@@ -102,6 +102,16 @@ pub fn flush_backoff_count() -> u64 {
102102
encryption::flush_backoff_count()
103103
}
104104

105+
/// Total `S3BlobBackend::{get, put}` retries triggered by transient 5xx
106+
/// responses (typically nginx `limit_req` 503 on bursty HAMT walks) since
107+
/// process start. Monotonic process-wide counter.
108+
///
109+
/// Native-only; the wasm32 `BlobBackend` impl is single-attempt.
110+
#[cfg(not(target_arch = "wasm32"))]
111+
pub fn blob_backend_retry_count() -> u64 {
112+
encryption::blob_backend_retry_count()
113+
}
114+
105115
/// Count of WAL groups discarded on load due to partial-group truncation
106116
/// (M-4). A transactional multi-entry op written via the internal
107117
/// `append_group` path is applied all-or-none on replay: if any member is

crates/fula-client/src/multipart.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,10 @@ where
458458
///
459459
/// Native-only: `retry_idempotent` short-circuits to a single attempt on
460460
/// wasm32 (no sleep primitive), so the classifier has no caller there.
461+
/// Shared with `S3BlobBackend::{get, put}` in `encryption.rs` so the
462+
/// blob-backend retry loop matches the same transient set.
461463
#[cfg(not(target_arch = "wasm32"))]
462-
fn is_transient(err: &ClientError) -> bool {
464+
pub(crate) fn is_transient(err: &ClientError) -> bool {
463465
match err {
464466
ClientError::Http(e) => {
465467
// `is_connect` exists only on native reqwest; the wasm build
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
//! Regression test for the v7 LIST-failure fix.
2+
//!
3+
//! Background: after the v1→v7 migration of a large bucket, LIST walks the
4+
//! sharded HAMT via `list_all_files`, which fans out to many concurrent
5+
//! `S3BlobBackend::get` calls on `/bucket/__fula_forest_v7_nodes/<key>`. When
6+
//! the gateway is fronted by an nginx `limit_req` with a `burst` smaller than
7+
//! the in-flight count, some requests come back as HTTP 503 with an empty
8+
//! body — a single one of those aborted the whole LIST.
9+
//!
10+
//! The fix adds bounded fixed-delay retries (300 ms base + 0-100 ms jitter,
11+
//! up to 4 attempts) in `S3BlobBackend::{get, put}` for the transient class
12+
//! HTTP 429/500/502/503/504 / `SlowDown` / `InternalError` /
13+
//! `ServiceUnavailable`. This test exercises that retry path end-to-end
14+
//! against a `wiremock::MockServer` that emits 503 → 503 → 200 and asserts:
15+
//!
16+
//! 1. The outer call returns `Ok`.
17+
//! 2. `blob_backend_retry_count()` observes exactly the number of retry
18+
//! sleeps we expect.
19+
//! 3. Non-transient responses (e.g. 404) are NOT retried.
20+
21+
#![cfg(not(target_arch = "wasm32"))]
22+
23+
use fula_client::{Config, FulaClient, S3BlobBackend, blob_backend_retry_count};
24+
use fula_crypto::BlobBackend;
25+
use std::sync::Arc;
26+
use std::sync::atomic::{AtomicUsize, Ordering};
27+
use wiremock::matchers::{method, path};
28+
use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
29+
30+
/// The retry counter is process-wide; tests in this file all mutate it.
31+
/// Serialize counter-sensitive tests under a shared lock so the `before →
32+
/// after` delta measured by each test is actually its own increments.
33+
static COUNTER_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
34+
35+
/// Responder that emits the Nth response from a list; cycles if called more
36+
/// than `responses.len()` times. Lets us script "503, 503, 200" against a
37+
/// single path without wiremock's expectation counters fighting us.
38+
struct Scripted {
39+
calls: Arc<AtomicUsize>,
40+
responses: Vec<ResponseTemplate>,
41+
}
42+
43+
impl Respond for Scripted {
44+
fn respond(&self, _req: &Request) -> ResponseTemplate {
45+
let idx = self.calls.fetch_add(1, Ordering::SeqCst);
46+
let slot = idx.min(self.responses.len() - 1);
47+
self.responses[slot].clone()
48+
}
49+
}
50+
51+
fn mk_client(endpoint: &str) -> FulaClient {
52+
// 10 s per-request timeout is ample for mock responses; the test doesn't
53+
// need a real connect timeout.
54+
let cfg = Config::new(endpoint);
55+
FulaClient::new(cfg).expect("build FulaClient")
56+
}
57+
58+
#[tokio::test]
59+
async fn get_retries_through_two_503s_then_succeeds() {
60+
let _guard = COUNTER_LOCK.lock().unwrap_or_else(|p| p.into_inner());
61+
let server = MockServer::start().await;
62+
63+
let calls = Arc::new(AtomicUsize::new(0));
64+
let responder = Scripted {
65+
calls: calls.clone(),
66+
responses: vec![
67+
ResponseTemplate::new(503),
68+
ResponseTemplate::new(503),
69+
ResponseTemplate::new(200).set_body_bytes(b"hamt-node-bytes".to_vec()),
70+
],
71+
};
72+
Mock::given(method("GET"))
73+
.and(path("/images/__fula_forest_v7_nodes/deadbeef"))
74+
.respond_with(responder)
75+
.mount(&server)
76+
.await;
77+
78+
let before = blob_backend_retry_count();
79+
let client = mk_client(&server.uri());
80+
let backend = S3BlobBackend::new(client, "images".to_string());
81+
82+
let got = backend
83+
.get("__fula_forest_v7_nodes/deadbeef")
84+
.await
85+
.expect("retry should absorb two 503s");
86+
87+
assert_eq!(got, b"hamt-node-bytes");
88+
assert_eq!(
89+
calls.load(Ordering::SeqCst),
90+
3,
91+
"mock should have been hit once per attempt"
92+
);
93+
let retries = blob_backend_retry_count() - before;
94+
assert_eq!(retries, 2, "two 503s → two retry sleeps, then success");
95+
}
96+
97+
#[tokio::test]
98+
async fn put_retries_through_one_503_then_succeeds() {
99+
let _guard = COUNTER_LOCK.lock().unwrap_or_else(|p| p.into_inner());
100+
let server = MockServer::start().await;
101+
102+
let calls = Arc::new(AtomicUsize::new(0));
103+
let responder = Scripted {
104+
calls: calls.clone(),
105+
responses: vec![
106+
ResponseTemplate::new(503),
107+
ResponseTemplate::new(200).insert_header("ETag", "\"abc123\""),
108+
],
109+
};
110+
Mock::given(method("PUT"))
111+
.and(path("/images/__fula_forest_v7_nodes/cafebabe"))
112+
.respond_with(responder)
113+
.mount(&server)
114+
.await;
115+
116+
let before = blob_backend_retry_count();
117+
let client = mk_client(&server.uri());
118+
let backend = S3BlobBackend::new(client, "images".to_string());
119+
120+
backend
121+
.put("__fula_forest_v7_nodes/cafebabe", b"encrypted-node-blob".to_vec())
122+
.await
123+
.expect("retry should absorb the 503");
124+
125+
assert_eq!(calls.load(Ordering::SeqCst), 2);
126+
let retries = blob_backend_retry_count() - before;
127+
assert_eq!(retries, 1, "one 503 → one retry sleep");
128+
}
129+
130+
#[tokio::test]
131+
async fn get_gives_up_after_max_attempts_on_persistent_503() {
132+
let _guard = COUNTER_LOCK.lock().unwrap_or_else(|p| p.into_inner());
133+
let server = MockServer::start().await;
134+
135+
let calls = Arc::new(AtomicUsize::new(0));
136+
let responder = Scripted {
137+
calls: calls.clone(),
138+
// Always 503 — more entries than attempts so Scripted never wraps.
139+
responses: vec![
140+
ResponseTemplate::new(503),
141+
ResponseTemplate::new(503),
142+
ResponseTemplate::new(503),
143+
ResponseTemplate::new(503),
144+
ResponseTemplate::new(503),
145+
],
146+
};
147+
Mock::given(method("GET"))
148+
.and(path("/images/__fula_forest_v7_nodes/persistently-broken"))
149+
.respond_with(responder)
150+
.mount(&server)
151+
.await;
152+
153+
let client = mk_client(&server.uri());
154+
let backend = S3BlobBackend::new(client, "images".to_string());
155+
156+
let err = backend
157+
.get("__fula_forest_v7_nodes/persistently-broken")
158+
.await
159+
.expect_err("persistent 503 must eventually surface");
160+
161+
// Four total attempts = `BLOB_BACKEND_MAX_ATTEMPTS`.
162+
assert_eq!(calls.load(Ordering::SeqCst), 4, "attempts capped at 4");
163+
// Error message should mention the storage-backend failure; we don't
164+
// pin the exact string (ClientError::to_string is wrapped via
165+
// client_err_to_crypto → CryptoError::Storage).
166+
let msg = err.to_string();
167+
assert!(
168+
msg.to_lowercase().contains("storage") || msg.contains("503"),
169+
"unexpected error message: {msg}"
170+
);
171+
}
172+
173+
#[tokio::test]
174+
async fn get_does_not_retry_on_404() {
175+
let _guard = COUNTER_LOCK.lock().unwrap_or_else(|p| p.into_inner());
176+
let server = MockServer::start().await;
177+
178+
let calls = Arc::new(AtomicUsize::new(0));
179+
let responder = Scripted {
180+
calls: calls.clone(),
181+
responses: vec![ResponseTemplate::new(404)],
182+
};
183+
Mock::given(method("GET"))
184+
.and(path("/images/__fula_forest_v7_nodes/not-there"))
185+
.respond_with(responder)
186+
.mount(&server)
187+
.await;
188+
189+
let before = blob_backend_retry_count();
190+
let client = mk_client(&server.uri());
191+
let backend = S3BlobBackend::new(client, "images".to_string());
192+
193+
let _err = backend
194+
.get("__fula_forest_v7_nodes/not-there")
195+
.await
196+
.expect_err("404 must not be retried");
197+
198+
assert_eq!(calls.load(Ordering::SeqCst), 1, "404 is terminal");
199+
assert_eq!(
200+
blob_backend_retry_count(),
201+
before,
202+
"non-transient error must not bump retry counter"
203+
);
204+
}

0 commit comments

Comments
 (0)