Skip to content

Commit e3957e2

Browse files
authored
Setter method for storage credentials loader + passing TableIdent (#46)
* builder method for storage credentials loader * pass tableident to loader, and catalog setter for credentials loader * cargo fmt
1 parent 00c1747 commit e3957e2

5 files changed

Lines changed: 105 additions & 13 deletions

File tree

crates/catalog/rest/src/catalog.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,15 @@ impl RestCatalog {
376376
}
377377
}
378378

379+
/// Set a custom storage credentials loader.
380+
///
381+
/// This is intended to be called after catalog construction, so the loader
382+
/// can hold a reference to the catalog (e.g., `Arc<RestCatalog>`) and call
383+
/// catalog-specific methods like `load_table_with_credentials`.
384+
pub fn set_storage_credentials_loader(&mut self, loader: Arc<dyn StorageCredentialsLoader>) {
385+
self.user_config.storage_credentials_loader = Some(loader);
386+
}
387+
379388
/// Add an extension to the file IO builder.
380389
pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
381390
self.file_io_extensions.add(ext);
@@ -430,6 +439,7 @@ impl RestCatalog {
430439
metadata_location: Option<&str>,
431440
extra_config: Option<HashMap<String, String>>,
432441
storage_credential: Option<StorageCredential>,
442+
table_ident: Option<&TableIdent>,
433443
) -> Result<FileIO> {
434444
let mut props = self.context().await?.config.props.clone();
435445
if let Some(config) = extra_config {
@@ -458,6 +468,9 @@ impl RestCatalog {
458468
file_io_builder =
459469
file_io_builder.with_extension(MetadataLocation(loc.to_string()));
460470
}
471+
if let Some(ident) = table_ident {
472+
file_io_builder = file_io_builder.with_extension(ident.clone());
473+
}
461474
file_io_builder = file_io_builder.with_extension(loader.clone());
462475
}
463476

@@ -570,7 +583,10 @@ impl RestCatalog {
570583
&self.user_config.storage_credentials_loader
571584
{
572585
let credential = storage_credentials_loader
573-
.load_credentials(response.metadata_location.as_deref().unwrap_or(""))
586+
.load_credentials(
587+
table_ident,
588+
response.metadata_location.as_deref().unwrap_or(""),
589+
)
574590
.await?;
575591
config.extend(credential.config.clone());
576592
Some(credential)
@@ -583,6 +599,7 @@ impl RestCatalog {
583599
response.metadata_location.as_deref(),
584600
Some(config),
585601
final_credential,
602+
Some(table_ident),
586603
)
587604
.await?;
588605

@@ -891,7 +908,7 @@ impl Catalog for RestCatalog {
891908

892909
// TODO: Support vended credentials here.
893910
let file_io = self
894-
.load_file_io(Some(metadata_location), Some(config), None)
911+
.load_file_io(Some(metadata_location), Some(config), None, None)
895912
.await?;
896913

897914
let table_builder = Table::builder()
@@ -1033,7 +1050,7 @@ impl Catalog for RestCatalog {
10331050

10341051
// TODO: Support vended credentials here.
10351052
let file_io = self
1036-
.load_file_io(Some(metadata_location), None, None)
1053+
.load_file_io(Some(metadata_location), None, None, None)
10371054
.await?;
10381055

10391056
Table::builder()
@@ -1100,7 +1117,7 @@ impl Catalog for RestCatalog {
11001117

11011118
// TODO: Support vended credentials here.
11021119
let file_io = self
1103-
.load_file_io(Some(&response.metadata_location), None, None)
1120+
.load_file_io(Some(&response.metadata_location), None, None, None)
11041121
.await?;
11051122

11061123
Table::builder()
@@ -3034,7 +3051,11 @@ mod tests {
30343051

30353052
#[async_trait::async_trait]
30363053
impl StorageCredentialsLoader for DummyCredentialLoader {
3037-
async fn load_credentials(&self, _location: &str) -> Result<StorageCredential> {
3054+
async fn load_credentials(
3055+
&self,
3056+
_table_ident: &TableIdent,
3057+
_location: &str,
3058+
) -> Result<StorageCredential> {
30383059
self.was_called.store(true, Ordering::SeqCst);
30393060
let mut config = HashMap::new();
30403061
config.insert("custom.key".to_string(), "custom.value".to_string());

crates/iceberg/src/io/opendal/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use super::{
4545
FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, MetadataLocation, OutputFile,
4646
Storage, StorageConfig, StorageCredential, StorageCredentialsLoader, StorageFactory,
4747
};
48+
use crate::catalog::TableIdent;
4849
use crate::{Error, ErrorKind, Result};
4950

5051
#[cfg(feature = "storage-azdls")]
@@ -233,12 +234,22 @@ impl OpenDalStorage {
233234
.get::<MetadataLocation>()
234235
.map(|l| l.0.clone())
235236
.unwrap_or_default();
237+
let table_ident = extensions
238+
.get::<TableIdent>()
239+
.map(|arc| (*arc).clone())
240+
.unwrap_or_else(|| {
241+
TableIdent::new(
242+
crate::NamespaceIdent::new("unknown".to_string()),
243+
"unknown".to_string(),
244+
)
245+
});
236246
let backend = RefreshableOpenDalStorageBuilder::new()
237247
.scheme(scheme_str)
238248
.base_props(props)
239249
.credentials_loader(Arc::clone(&loader))
240250
.initial_credentials(initial_creds)
241251
.location(location)
252+
.table_ident(table_ident)
242253
.extensions(extensions)
243254
.build()?;
244255
return Ok(Self::Refreshable {
@@ -534,7 +545,11 @@ mod tests {
534545

535546
#[async_trait::async_trait]
536547
impl StorageCredentialsLoader for TestCredentialLoader {
537-
async fn load_credentials(&self, _location: &str) -> crate::Result<StorageCredential> {
548+
async fn load_credentials(
549+
&self,
550+
_table_ident: &TableIdent,
551+
_location: &str,
552+
) -> crate::Result<StorageCredential> {
538553
Ok(StorageCredential {
539554
prefix: "s3://test/".to_string(),
540555
config: HashMap::new(),

crates/iceberg/src/io/refreshable_accessor.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ mod tests {
227227
use std::sync::atomic::{AtomicUsize, Ordering};
228228

229229
use super::*;
230+
use crate::NamespaceIdent;
231+
use crate::catalog::TableIdent;
230232
use crate::io::refreshable_storage::RefreshableOpenDalStorageBuilder;
231233
use crate::io::{StorageCredential, StorageCredentialsLoader};
232234

@@ -259,7 +261,11 @@ mod tests {
259261

260262
#[async_trait::async_trait]
261263
impl StorageCredentialsLoader for SequenceLoader {
262-
async fn load_credentials(&self, _location: &str) -> crate::Result<StorageCredential> {
264+
async fn load_credentials(
265+
&self,
266+
_table_ident: &TableIdent,
267+
_location: &str,
268+
) -> crate::Result<StorageCredential> {
263269
self.call_count.fetch_add(1, Ordering::SeqCst);
264270
let mut responses = self.responses.lock().unwrap();
265271
Ok(responses.pop_front().unwrap_or_else(dummy_credential))
@@ -382,6 +388,10 @@ mod tests {
382388
.scheme("memory".to_string())
383389
.base_props(HashMap::new())
384390
.credentials_loader(Arc::clone(&loader))
391+
.table_ident(TableIdent::new(
392+
NamespaceIdent::new("test_ns".to_string()),
393+
"test_table".to_string(),
394+
))
385395
.build()
386396
.expect("Failed to build storage");
387397

@@ -477,6 +487,10 @@ mod tests {
477487
.scheme("memory".to_string())
478488
.base_props(HashMap::new())
479489
.credentials_loader(Arc::clone(&loader) as _)
490+
.table_ident(TableIdent::new(
491+
NamespaceIdent::new("test_ns".to_string()),
492+
"test_table".to_string(),
493+
))
480494
.build()
481495
.expect("Failed to build storage");
482496

crates/iceberg/src/io/refreshable_storage.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use tokio::sync::Mutex as AsyncMutex;
2525

2626
use super::opendal::OpenDalStorage;
2727
use super::refreshable_accessor::RefreshableAccessor;
28+
use crate::catalog::TableIdent;
2829
use crate::io::file_io::Extensions;
2930
use crate::io::{StorageCredential, StorageCredentialsLoader};
3031
use crate::{Error, ErrorKind, Result};
@@ -52,6 +53,9 @@ pub struct RefreshableOpenDalStorage {
5253
/// Metadata location passed to `load_credentials`
5354
location: String,
5455

56+
/// Table identifier passed to `load_credentials`
57+
table_ident: TableIdent,
58+
5559
/// Cached AccessorInfo (created lazily from first operator)
5660
cached_info: Mutex<Option<Arc<AccessorInfo>>>,
5761

@@ -88,6 +92,7 @@ impl RefreshableOpenDalStorage {
8892
credentials_loader: Arc<dyn StorageCredentialsLoader>,
8993
initial_credentials: Option<StorageCredential>,
9094
location: String,
95+
table_ident: TableIdent,
9196
extensions: Extensions,
9297
) -> Result<Self> {
9398
// Build initial inner_storage from base_props + initial_credentials
@@ -104,6 +109,7 @@ impl RefreshableOpenDalStorage {
104109
credentials_loader,
105110
extensions,
106111
location,
112+
table_ident,
107113
cached_info: Mutex::new(None),
108114
credential_version: AtomicU64::new(0),
109115
refresh_lock: AsyncMutex::new(()),
@@ -197,7 +203,7 @@ impl RefreshableOpenDalStorage {
197203
// We are the one who should call the loader
198204
let new_creds = self
199205
.credentials_loader
200-
.load_credentials(&self.location)
206+
.load_credentials(&self.table_ident, &self.location)
201207
.await?;
202208
self.do_refresh(new_creds)?;
203209
Ok(self.credential_version.load(Ordering::Acquire))
@@ -212,6 +218,7 @@ pub struct RefreshableOpenDalStorageBuilder {
212218
credentials_loader: Option<Arc<dyn StorageCredentialsLoader>>,
213219
initial_credentials: Option<StorageCredential>,
214220
location: String,
221+
table_ident: Option<TableIdent>,
215222
extensions: Extensions,
216223
}
217224

@@ -251,6 +258,12 @@ impl RefreshableOpenDalStorageBuilder {
251258
self
252259
}
253260

261+
/// Set the table identifier passed to `load_credentials`
262+
pub fn table_ident(mut self, table_ident: TableIdent) -> Self {
263+
self.table_ident = Some(table_ident);
264+
self
265+
}
266+
254267
/// Set the extensions
255268
pub fn extensions(mut self, extensions: Extensions) -> Self {
256269
self.extensions = extensions;
@@ -268,6 +281,8 @@ impl RefreshableOpenDalStorageBuilder {
268281
})?,
269282
self.initial_credentials,
270283
self.location,
284+
self.table_ident
285+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "table_ident is required"))?,
271286
self.extensions,
272287
)?))
273288
}
@@ -278,6 +293,7 @@ mod tests {
278293
use std::sync::atomic::{AtomicUsize, Ordering};
279294

280295
use super::*;
296+
use crate::NamespaceIdent;
281297
use crate::io::StorageCredential;
282298

283299
// --- Test helpers ---
@@ -288,7 +304,11 @@ mod tests {
288304

289305
#[async_trait::async_trait]
290306
impl StorageCredentialsLoader for SimpleLoader {
291-
async fn load_credentials(&self, _location: &str) -> Result<StorageCredential> {
307+
async fn load_credentials(
308+
&self,
309+
_table_ident: &TableIdent,
310+
_location: &str,
311+
) -> Result<StorageCredential> {
292312
Ok(StorageCredential {
293313
prefix: "memory:/refreshed/".to_string(),
294314
config: HashMap::from([("refreshed_key".to_string(), "refreshed_val".to_string())]),
@@ -322,7 +342,11 @@ mod tests {
322342

323343
#[async_trait::async_trait]
324344
impl StorageCredentialsLoader for TrackingRefreshLoader {
325-
async fn load_credentials(&self, _location: &str) -> Result<StorageCredential> {
345+
async fn load_credentials(
346+
&self,
347+
_table_ident: &TableIdent,
348+
_location: &str,
349+
) -> Result<StorageCredential> {
326350
let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
327351
Ok(StorageCredential {
328352
prefix: format!("memory:/refresh-{n}/"),
@@ -331,13 +355,21 @@ mod tests {
331355
}
332356
}
333357

358+
fn test_table_ident() -> TableIdent {
359+
TableIdent::new(
360+
NamespaceIdent::new("test_ns".to_string()),
361+
"test_table".to_string(),
362+
)
363+
}
364+
334365
fn build_memory_refreshable(
335366
loader: Arc<dyn StorageCredentialsLoader>,
336367
) -> Arc<RefreshableOpenDalStorage> {
337368
RefreshableOpenDalStorageBuilder::new()
338369
.scheme("memory".to_string())
339370
.base_props(HashMap::new())
340371
.credentials_loader(loader)
372+
.table_ident(test_table_ident())
341373
.build()
342374
.expect("Failed to build RefreshableOpenDalStorage for memory")
343375
}
@@ -346,7 +378,7 @@ mod tests {
346378
async fn refresh(storage: &RefreshableOpenDalStorage) -> Result<()> {
347379
let new_creds = storage
348380
.credentials_loader
349-
.load_credentials(&storage.location)
381+
.load_credentials(&storage.table_ident, &storage.location)
350382
.await?;
351383
storage.do_refresh(new_creds)
352384
}

crates/iceberg/src/io/storage_credential.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::collections::HashMap;
1919
use std::fmt::Debug;
2020

2121
use crate::Result;
22+
use crate::catalog::TableIdent;
2223

2324
/// Storage credentials for accessing cloud storage.
2425
///
@@ -54,7 +55,11 @@ pub struct MetadataLocation(pub String);
5455
///
5556
/// #[async_trait::async_trait]
5657
/// impl StorageCredentialsLoader for MyCredentialLoader {
57-
/// async fn load_credentials(&self, location: &str) -> iceberg::Result<StorageCredential> {
58+
/// async fn load_credentials(
59+
/// &self,
60+
/// _table_ident: &iceberg::TableIdent,
61+
/// location: &str,
62+
/// ) -> iceberg::Result<StorageCredential> {
5863
/// // Fetch fresh credentials from your credential service
5964
/// let mut config = HashMap::new();
6065
/// config.insert("access_key_id".to_string(), "fresh-key".to_string());
@@ -85,6 +90,11 @@ pub trait StorageCredentialsLoader: Send + Sync + Debug {
8590
/// Load storage credentials using custom user-defined logic.
8691
///
8792
/// # Arguments
93+
/// * `table_ident` - The table identifier for which credentials are being loaded
8894
/// * `location` - The full path being accessed (e.g., "s3://bucket/path/file.parquet")
89-
async fn load_credentials(&self, location: &str) -> Result<StorageCredential>;
95+
async fn load_credentials(
96+
&self,
97+
table_ident: &TableIdent,
98+
location: &str,
99+
) -> Result<StorageCredential>;
90100
}

0 commit comments

Comments
 (0)