Skip to content

Commit 05f96f5

Browse files
committed
Move code
1 parent 7bddfee commit 05f96f5

1 file changed

Lines changed: 75 additions & 75 deletions

File tree

  • crates/iceberg/src/io/storage/opendal

crates/iceberg/src/io/storage/opendal/mod.rs

Lines changed: 75 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,81 @@ impl FileWrite for opendal::Writer {
480480
}
481481
}
482482

483+
/// A [`StorageFactory`] that creates a [`OpenDalStorage::Refreshable`] backend with
484+
/// automatic credential rotation.
485+
///
486+
/// Inject it at catalog construction time via `with_storage_factory`. At table-load time
487+
/// the catalog populates [`StorageConfig`] with the table identity and metadata location;
488+
/// `build()` reads those to pass context to the credential loader on each refresh.
489+
///
490+
/// # Example
491+
///
492+
/// ```rust,no_run
493+
/// use std::sync::Arc;
494+
///
495+
/// use iceberg::io::{RefreshableStorageFactory, StorageCredentialsLoader};
496+
///
497+
/// // Implement your own loader:
498+
/// // let loader: Arc<dyn StorageCredentialsLoader> = ...;
499+
/// // let factory = Arc::new(RefreshableStorageFactory::new(loader));
500+
/// // catalog_config.with_storage_factory(factory);
501+
/// ```
502+
#[derive(Debug, Serialize, Deserialize)]
503+
pub struct RefreshableStorageFactory {
504+
/// The credentials loader. `None` only after serde deserialization (field is skipped).
505+
#[serde(skip)]
506+
credentials_loader: Option<Arc<dyn StorageCredentialsLoader>>,
507+
}
508+
509+
impl RefreshableStorageFactory {
510+
/// Creates a new factory.
511+
pub fn new(credentials_loader: Arc<dyn StorageCredentialsLoader>) -> Self {
512+
Self {
513+
credentials_loader: Some(credentials_loader),
514+
}
515+
}
516+
}
517+
518+
#[typetag::serde]
519+
impl StorageFactory for RefreshableStorageFactory {
520+
fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
521+
let loader = self.credentials_loader.as_ref().ok_or_else(|| {
522+
Error::new(
523+
ErrorKind::Unexpected,
524+
"RefreshableStorageFactory: credentials loader unavailable after deserialization",
525+
)
526+
})?;
527+
528+
// Extract runtime context from props, stripping the internal keys so they
529+
// don't leak into the underlying OpenDAL operator configuration.
530+
let mut props = config.props().clone();
531+
let location = props.remove(PROP_METADATA_LOCATION).unwrap_or_default();
532+
let scheme = Url::parse(&location)
533+
.map(|u| u.scheme().to_string())
534+
.unwrap_or_default();
535+
let table_ident = props
536+
.remove(PROP_TABLE_IDENT)
537+
.and_then(|s| serde_json::from_str::<TableIdent>(&s).ok())
538+
.unwrap_or_else(|| {
539+
TableIdent::new(
540+
NamespaceIdent::new("unknown".to_string()),
541+
"unknown".to_string(),
542+
)
543+
});
544+
545+
let backend = RefreshableOpenDalStorageBuilder::new()
546+
.scheme(scheme)
547+
.base_props(props)
548+
.credentials_loader(Arc::clone(loader))
549+
.location(location)
550+
.table_ident(table_ident)
551+
.build()?;
552+
Ok(Arc::new(OpenDalStorage::Refreshable {
553+
backend: Some(backend),
554+
}))
555+
}
556+
}
557+
483558
#[cfg(test)]
484559
mod tests {
485560
use super::*;
@@ -575,78 +650,3 @@ mod tests {
575650
);
576651
}
577652
}
578-
579-
/// A [`StorageFactory`] that creates a [`OpenDalStorage::Refreshable`] backend with
580-
/// automatic credential rotation.
581-
///
582-
/// Inject it at catalog construction time via `with_storage_factory`. At table-load time
583-
/// the catalog populates [`StorageConfig`] with the table identity and metadata location;
584-
/// `build()` reads those to pass context to the credential loader on each refresh.
585-
///
586-
/// # Example
587-
///
588-
/// ```rust,no_run
589-
/// use std::sync::Arc;
590-
///
591-
/// use iceberg::io::{RefreshableStorageFactory, StorageCredentialsLoader};
592-
///
593-
/// // Implement your own loader:
594-
/// // let loader: Arc<dyn StorageCredentialsLoader> = ...;
595-
/// // let factory = Arc::new(RefreshableStorageFactory::new(loader));
596-
/// // catalog_config.with_storage_factory(factory);
597-
/// ```
598-
#[derive(Debug, Serialize, Deserialize)]
599-
pub struct RefreshableStorageFactory {
600-
/// The credentials loader. `None` only after serde deserialization (field is skipped).
601-
#[serde(skip)]
602-
credentials_loader: Option<Arc<dyn StorageCredentialsLoader>>,
603-
}
604-
605-
impl RefreshableStorageFactory {
606-
/// Creates a new factory.
607-
pub fn new(credentials_loader: Arc<dyn StorageCredentialsLoader>) -> Self {
608-
Self {
609-
credentials_loader: Some(credentials_loader),
610-
}
611-
}
612-
}
613-
614-
#[typetag::serde]
615-
impl StorageFactory for RefreshableStorageFactory {
616-
fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
617-
let loader = self.credentials_loader.as_ref().ok_or_else(|| {
618-
Error::new(
619-
ErrorKind::Unexpected,
620-
"RefreshableStorageFactory: credentials loader unavailable after deserialization",
621-
)
622-
})?;
623-
624-
// Extract runtime context from props, stripping the internal keys so they
625-
// don't leak into the underlying OpenDAL operator configuration.
626-
let mut props = config.props().clone();
627-
let location = props.remove(PROP_METADATA_LOCATION).unwrap_or_default();
628-
let scheme = Url::parse(&location)
629-
.map(|u| u.scheme().to_string())
630-
.unwrap_or_default();
631-
let table_ident = props
632-
.remove(PROP_TABLE_IDENT)
633-
.and_then(|s| serde_json::from_str::<TableIdent>(&s).ok())
634-
.unwrap_or_else(|| {
635-
TableIdent::new(
636-
NamespaceIdent::new("unknown".to_string()),
637-
"unknown".to_string(),
638-
)
639-
});
640-
641-
let backend = RefreshableOpenDalStorageBuilder::new()
642-
.scheme(scheme)
643-
.base_props(props)
644-
.credentials_loader(Arc::clone(loader))
645-
.location(location)
646-
.table_ident(table_ident)
647-
.build()?;
648-
Ok(Arc::new(OpenDalStorage::Refreshable {
649-
backend: Some(backend),
650-
}))
651-
}
652-
}

0 commit comments

Comments
 (0)