Skip to content

Commit c7d0ac6

Browse files
authored
Fix minor race condition in refreshable storage + avoid double retry layer (#54)
* Retry once on all errors and preserve original errors * Fix minor race condition in refreshable storage * the other side of the coin * no OpenDAL RetryLayer for refreshable storage, we have custom retry for credential refresh
1 parent 0e7e1b5 commit c7d0ac6

2 files changed

Lines changed: 18 additions & 4 deletions

File tree

crates/iceberg/src/io/refreshable_storage.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ impl RefreshableOpenDalStorage {
125125
let path_string = path.to_string();
126126
let (operator, relative_path) = storage_guard.create_operator(&path_string)?;
127127
let relative_path = relative_path.to_string();
128+
// Read version while still holding the lock so the accessor and version
129+
// are guaranteed to be consistent. Otherwise a concurrent do_refresh
130+
// could bump the version between dropping the lock and reading it,
131+
// causing the accessor to appear up-to-date when it's actually stale.
132+
let version = self.credential_version();
128133
drop(storage_guard);
129134

130135
let accessor = operator.into_inner();
@@ -136,8 +141,6 @@ impl RefreshableOpenDalStorage {
136141
*info_guard = Some(accessor.info());
137142
}
138143
}
139-
140-
let version = self.credential_version();
141144
let refreshable_accessor =
142145
RefreshableAccessor::new(accessor, version, path.to_string(), Arc::clone(self));
143146

@@ -153,8 +156,13 @@ impl RefreshableOpenDalStorage {
153156
let new_storage =
154157
OpenDalStorage::build_from_props(&self.scheme, full_props, &self.extensions)?;
155158

156-
*self.lock_inner_storage() = new_storage;
159+
// Update storage and bump version atomically (while holding the lock)
160+
// so that refreshable_create_operator always sees a consistent
161+
// (storage, version) pair.
162+
let mut guard = self.lock_inner_storage();
163+
*guard = new_storage;
157164
self.credential_version.fetch_add(1, Ordering::Release);
165+
drop(guard);
158166

159167
Ok(())
160168
}

crates/iceberg/src/io/storage/opendal/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,13 @@ impl OpenDalStorage {
426426

427427
// Transient errors are common for object stores; however there's no
428428
// harm in retrying temporary failures for other storage backends as well.
429-
let operator = operator.layer(RetryLayer::new());
429+
// The Refreshable variant already has a RetryLayer from the inner
430+
// create_operator call, so skip adding a second one to avoid
431+
// unnecessary credential refreshes on transient errors.
432+
let operator = match self {
433+
OpenDalStorage::Refreshable { .. } => operator,
434+
_ => operator.layer(RetryLayer::new()),
435+
};
430436
Ok((operator, relative_path))
431437
}
432438

0 commit comments

Comments
 (0)