Skip to content

Commit 8c3add6

Browse files
authored
Retry once on all errors and preserve original errors (#51)
1 parent c4598e0 commit 8c3add6

2 files changed

Lines changed: 79 additions & 43 deletions

File tree

crates/iceberg/src/io/refreshable_accessor.rs

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ use opendal::raw::*;
2323
use super::refreshable_storage::RefreshableOpenDalStorage;
2424
use crate::Result;
2525

26-
/// An OpenDAL accessor that wraps another accessor and retries on PermissionDenied
27-
/// after refreshing credentials.
26+
/// An OpenDAL accessor that wraps another accessor and retries after refreshing
27+
/// credentials when any operation fails.
2828
///
2929
/// Each instance has its own inner accessor and shares credential state with
3030
/// other accessors via `Arc<RefreshableOpenDalStorage>`. Credentials are only
31-
/// refreshed when an operation fails with PermissionDenied, not proactively.
31+
/// refreshed when an operation fails, not proactively.
3232
///
33-
/// Concurrency: if multiple accessors hit PermissionDenied simultaneously, only
34-
/// one will call the external credential loader (via double-checked locking on
35-
/// `RefreshableOpenDalStorage::refresh_on_permission_denied`). The others will
36-
/// detect the version bump and simply rebuild their accessor from the already-refreshed
33+
/// Concurrency: if multiple accessors hit errors simultaneously, only one will
34+
/// call the external credential loader (via double-checked locking on
35+
/// `RefreshableOpenDalStorage::try_refresh_credentials`). The others will detect
36+
/// the version bump and simply rebuild their accessor from the already-refreshed
3737
/// credentials.
3838
pub(crate) struct RefreshableAccessor {
3939
/// The current backend's accessor paired with the credential version it was built from.
@@ -81,14 +81,15 @@ impl RefreshableAccessor {
8181
Ok(new_accessor)
8282
}
8383

84-
/// Run an operation with automatic retry on PermissionDenied after credential refresh.
84+
/// Run an operation with automatic retry after credential refresh on any error.
8585
///
86-
/// 1. Gets the current accessor (no refresh) and runs the operation.
87-
/// 2. If it fails with PermissionDenied, calls `refresh_on_permission_denied`
88-
/// with the accessor's credential version.
86+
/// 1. Gets the current accessor and runs the operation.
87+
/// 2. If it fails (any error), calls `try_refresh_credentials` with the
88+
/// accessor's credential version.
8989
/// 3. If credentials were refreshed (by us or another concurrent accessor),
9090
/// rebuilds our accessor and retries the operation once.
91-
/// 4. Otherwise, returns the original error.
91+
/// 4. If the retry also fails, returns an error that preserves both the
92+
/// original and retry error messages.
9293
async fn with_credential_retry<F, Fut, T>(&self, op: F) -> opendal::Result<T>
9394
where
9495
F: Fn(Accessor) -> Fut,
@@ -98,30 +99,44 @@ impl RefreshableAccessor {
9899
let result = op(accessor).await;
99100

100101
match result {
101-
Err(err) if err.kind() == opendal::ErrorKind::PermissionDenied => {
102+
Err(original_err) => {
103+
let original_display = original_err.to_string();
104+
let original_kind = original_err.kind();
105+
102106
let new_version = self
103107
.storage
104-
.refresh_on_permission_denied(version)
108+
.try_refresh_credentials(version)
105109
.await
106110
.map_err(|e| {
107111
opendal::Error::new(
108-
opendal::ErrorKind::PermissionDenied,
112+
original_kind,
109113
format!(
110-
"Operation failed with PermissionDenied and credential \
111-
refresh also failed: {e}"
114+
"Operation failed and credential refresh also failed: \
115+
{e}. Original error: {original_display}"
112116
),
113117
)
114-
.set_source(err)
115118
})?;
116119

117120
let new_accessor = self.rebuild_accessor(new_version).map_err(|e| {
118121
opendal::Error::new(
119122
opendal::ErrorKind::Unexpected,
120-
"Failed to rebuild accessor after credential refresh",
123+
format!(
124+
"Failed to rebuild accessor after credential refresh. \
125+
Original error: {original_display}"
126+
),
121127
)
122128
.set_source(e)
123129
})?;
124-
op(new_accessor).await
130+
131+
op(new_accessor).await.map_err(|retry_err| {
132+
opendal::Error::new(
133+
retry_err.kind(),
134+
format!(
135+
"Retry after credential refresh also failed: {retry_err}. \
136+
Original error: {original_display}"
137+
),
138+
)
139+
})
125140
}
126141
other => other,
127142
}
@@ -208,10 +223,11 @@ impl Access for RefreshableAccessor {
208223
///
209224
/// `with_credential_retry` works as follows:
210225
/// 1. Gets the current accessor (no refresh) and runs the operation.
211-
/// 2. On `PermissionDenied`, calls `refresh_on_permission_denied` with the
212-
/// accessor's credential version.
226+
/// 2. On any error, calls `try_refresh_credentials` with the accessor's
227+
/// credential version.
213228
/// 3. If credentials were refreshed, rebuilds the accessor and retries once.
214-
/// 4. Otherwise, returns the original error.
229+
/// 4. If the retry also fails, returns an error preserving both original and
230+
/// retry error messages.
215231
///
216232
/// To test this, we inject a `FailingAccessor` (returns a configurable error on `stat`)
217233
/// as the initial inner accessor, while the shared storage's `inner_storage` is a real
@@ -422,11 +438,11 @@ mod tests {
422438
/// Flow:
423439
/// 1. `get_accessor` → no refresh → FailingAccessor used
424440
/// 2. `stat` → PermissionDenied
425-
/// 3. `refresh_on_permission_denied` → loader call #1 → do_refresh
441+
/// 3. `try_refresh_credentials` → loader call #1 → do_refresh
426442
/// 4. `rebuild_accessor` → memory accessor used
427443
/// 5. Memory backend `stat("nonexistent")` → NotFound (not PermissionDenied)
428444
#[tokio::test]
429-
async fn test_retry_on_permission_denied_with_successful_refresh() {
445+
async fn test_retry_after_credential_refresh() {
430446
let loader = Arc::new(SequenceLoader::new(vec![dummy_credential()]));
431447

432448
let accessor = build_refreshable_storage_and_accessor(
@@ -451,36 +467,56 @@ mod tests {
451467
assert_eq!(loader.call_count(), 1);
452468
}
453469

454-
/// Only PermissionDenied should trigger credential retry. Other errors (network,
455-
/// not-found, etc.) should not — retrying with fresh credentials wouldn't help.
470+
/// Any error triggers credential retry. When both the original and retry
471+
/// operations fail, the error message should preserve both.
456472
///
457473
/// Flow:
458-
/// 1. `get_accessor` → no refresh → FailingAccessor → NotFound
459-
/// 2. `with_credential_retry` sees NotFound → no retry → returns error immediately
474+
/// 1. `get_accessor` → FailingAccessor → Unexpected error
475+
/// 2. `try_refresh_credentials` → loader call → do_refresh
476+
/// 3. `rebuild_accessor` → memory accessor
477+
/// 4. Memory backend `stat("nonexistent")` → NotFound
478+
/// 5. Final error includes both "Unexpected" original and "NotFound" retry info
460479
#[tokio::test]
461-
async fn test_non_permission_denied_error_is_not_retried() {
462-
let loader = Arc::new(SequenceLoader::new(vec![]));
480+
async fn test_any_error_triggers_retry_and_preserves_both_errors() {
481+
let loader = Arc::new(SequenceLoader::new(vec![dummy_credential()]));
463482

464483
let accessor = build_refreshable_storage_and_accessor(
465484
Arc::clone(&loader) as _,
466-
opendal::ErrorKind::NotFound,
485+
opendal::ErrorKind::Unexpected,
467486
);
468487

469488
let result = accessor.stat("nonexistent", OpStat::new()).await;
470489

471490
assert!(result.is_err());
472491
let err = result.unwrap_err();
473-
assert_eq!(err.kind(), opendal::ErrorKind::NotFound);
492+
// The retry error kind comes from the memory backend (NotFound)
493+
assert_eq!(
494+
err.kind(),
495+
opendal::ErrorKind::NotFound,
496+
"Expected NotFound from retry, got {:?}",
497+
err.kind()
498+
);
474499

475-
// No loader calls at all — only PermissionDenied triggers refresh
476-
assert_eq!(loader.call_count(), 0);
500+
// Error message should mention both the retry failure and original error
501+
let err_msg = err.to_string();
502+
assert!(
503+
err_msg.contains("Original error"),
504+
"Error should reference original error: {err_msg}"
505+
);
506+
assert!(
507+
err_msg.contains("Unexpected"),
508+
"Error should contain original Unexpected error kind: {err_msg}"
509+
);
510+
511+
// 1 loader call — retry happened
512+
assert_eq!(loader.call_count(), 1);
477513
}
478514

479-
/// When multiple concurrent callers hit PermissionDenied, only one should
480-
/// call the external credential loader. The others should detect the version
481-
/// bump and skip the loader call.
515+
/// When multiple concurrent callers trigger credential refresh, only one
516+
/// should call the external credential loader. The others should detect the
517+
/// version bump and skip the loader call.
482518
#[tokio::test]
483-
async fn test_concurrent_permission_denied_calls_loader_only_once() {
519+
async fn test_concurrent_refresh_calls_loader_only_once() {
484520
let loader = Arc::new(SequenceLoader::new(vec![dummy_credential()]));
485521

486522
let storage = RefreshableOpenDalStorageBuilder::new()
@@ -496,12 +532,12 @@ mod tests {
496532

497533
let version = storage.credential_version();
498534

499-
// Spawn 10 concurrent refresh_on_permission_denied calls with the same version
535+
// Spawn 10 concurrent try_refresh_credentials calls with the same version
500536
let mut handles = Vec::new();
501537
for _ in 0..10 {
502538
let storage = Arc::clone(&storage);
503539
handles.push(tokio::spawn(async move {
504-
storage.refresh_on_permission_denied(version).await
540+
storage.try_refresh_credentials(version).await
505541
}));
506542
}
507543

crates/iceberg/src/io/refreshable_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl RefreshableOpenDalStorage {
174174
self.cached_info.lock().unwrap()
175175
}
176176

177-
/// Refresh credentials in response to a PermissionDenied error.
177+
/// Try to refresh credentials after an operation failure.
178178
///
179179
/// Uses double-checked locking with a version number:
180180
/// 1. If `credential_version > accessor_version`, someone already refreshed —
@@ -184,7 +184,7 @@ impl RefreshableOpenDalStorage {
184184
/// already refreshed while we waited — return the current version.
185185
/// 4. Call the loader, then `do_refresh`.
186186
/// 5. Return the new version.
187-
pub(crate) async fn refresh_on_permission_denied(&self, accessor_version: u64) -> Result<u64> {
187+
pub(crate) async fn try_refresh_credentials(&self, accessor_version: u64) -> Result<u64> {
188188
// Fast path: someone already refreshed since this accessor was built
189189
let current = self.credential_version.load(Ordering::Acquire);
190190
if current > accessor_version {

0 commit comments

Comments
 (0)