Skip to content

Commit f4d4cc2

Browse files
authored
Merge branch 'JanKaul:main' into main
2 parents a40522f + 3a4e01d commit f4d4cc2

5 files changed

Lines changed: 165 additions & 55 deletions

File tree

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ impl FileCatalog {
556556
});
557557
files
558558
.into_iter()
559-
.last()
559+
.next_back()
560560
.ok_or(IcebergError::CatalogNotFound)
561561
}
562562

@@ -588,7 +588,7 @@ fn trim_start_path(path: &str) -> &str {
588588

589589
fn parse_version(path: &str) -> Result<u64, IcebergError> {
590590
path.split('/')
591-
.last()
591+
.next_back()
592592
.ok_or(IcebergError::InvalidFormat("Metadata location".to_owned()))?
593593
.trim_start_matches('v')
594594
.trim_end_matches(".metadata.json")

catalogs/iceberg-glue-catalog/src/lib.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use iceberg_rust::{
2020
identifier::Identifier,
2121
namespace::Namespace,
2222
tabular::Tabular,
23-
Catalog,
23+
Catalog, CatalogList,
2424
},
2525
error::Error as IcebergError,
2626
materialized_view::MaterializedView,
@@ -948,6 +948,40 @@ impl Catalog for GlueCatalog {
948948
}
949949
}
950950

951+
#[derive(Debug, Clone)]
952+
pub struct GlueCatalogList {
953+
name: String,
954+
config: SdkConfig,
955+
object_store_builder: ObjectStoreBuilder,
956+
}
957+
958+
impl GlueCatalogList {
959+
pub fn new(name: &str, config: &SdkConfig, object_store_builder: ObjectStoreBuilder) -> Self {
960+
Self {
961+
name: name.to_owned(),
962+
config: config.to_owned(),
963+
object_store_builder,
964+
}
965+
}
966+
}
967+
968+
#[async_trait]
969+
impl CatalogList for GlueCatalogList {
970+
fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
971+
if self.name == name {
972+
Some(Arc::new(
973+
GlueCatalog::new(&self.config, &self.name, self.object_store_builder.clone())
974+
.unwrap(),
975+
))
976+
} else {
977+
None
978+
}
979+
}
980+
async fn list_catalogs(&self) -> Vec<String> {
981+
vec![self.name.clone()]
982+
}
983+
}
984+
951985
#[cfg(test)]
952986
pub mod tests {
953987
use aws_config::{BehaviorVersion, Region};

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 126 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,41 @@ use iceberg_rust::{
2525
table::Table,
2626
view::View,
2727
};
28-
use object_store::ObjectStore;
29-
use std::{collections::HashMap, path::Path, sync::Arc};
28+
use object_store::{aws::AmazonS3Builder, ObjectStore};
29+
use std::{
30+
collections::HashMap,
31+
path::Path,
32+
sync::{Arc, RwLock},
33+
};
3034

3135
use crate::{
3236
apis::{
3337
self,
3438
catalog_api_api::{self, NamespaceExistsError},
3539
configuration::Configuration,
3640
},
37-
models,
41+
models::{self, StorageCredential},
3842
};
3943

4044
#[derive(Debug)]
4145
pub struct RestCatalog {
4246
name: Option<String>,
4347
configuration: Configuration,
44-
object_store_builder: ObjectStoreBuilder,
48+
default_object_store_builder: Option<ObjectStoreBuilder>,
49+
cache: Arc<RwLock<HashMap<Identifier, Arc<dyn ObjectStore>>>>,
4550
}
4651

4752
impl RestCatalog {
4853
pub fn new(
4954
name: Option<&str>,
5055
configuration: Configuration,
51-
object_store_builder: ObjectStoreBuilder,
56+
default_object_store_builder: Option<ObjectStoreBuilder>,
5257
) -> Self {
5358
RestCatalog {
5459
name: name.map(ToString::to_string),
5560
configuration,
56-
object_store_builder,
61+
default_object_store_builder,
62+
cache: Arc::new(RwLock::new(HashMap::new())),
5763
}
5864
}
5965
}
@@ -276,7 +282,7 @@ impl Catalog for RestCatalog {
276282
)),
277283
Err(apis::Error::ResponseError(content)) => {
278284
if content.status == 404 {
279-
let table_metadata = catalog_api_api::load_table(
285+
let response = catalog_api_api::load_table(
280286
&self.configuration,
281287
self.name.as_deref(),
282288
&identifier.namespace().to_string(),
@@ -285,12 +291,27 @@ impl Catalog for RestCatalog {
285291
None,
286292
)
287293
.await
288-
.map(|x| x.metadata)
289294
.map_err(|_| Error::CatalogNotFound)?;
290295

291-
let object_store = self
292-
.object_store_builder
293-
.build(Bucket::from_path(&table_metadata.location)?)?;
296+
let object_store = object_store_from_response(&response)?
297+
.ok_or(Error::NotFound("Object store credentials".to_string()))
298+
.or_else(|_| {
299+
self.default_object_store_builder
300+
.as_ref()
301+
.ok_or(Error::NotFound("Default object store".to_string()))
302+
.and_then(|x| {
303+
let bucket = Bucket::from_path(&response.metadata.location)?;
304+
x.build(bucket)
305+
})
306+
})?;
307+
308+
self.cache
309+
.write()
310+
.unwrap()
311+
.insert(identifier.clone(), object_store.clone());
312+
313+
let table_metadata = response.metadata;
314+
294315
Ok(Tabular::Table(
295316
Table::new(
296317
identifier.clone(),
@@ -315,50 +336,54 @@ impl Catalog for RestCatalog {
315336
identifier: Identifier,
316337
create_table: CreateTable,
317338
) -> Result<Table, Error> {
318-
catalog_api_api::create_table(
339+
let response = catalog_api_api::create_table(
319340
&self.configuration,
320341
self.name.as_deref(),
321342
&identifier.namespace().to_string(),
322343
create_table,
323344
None,
324345
)
325346
.map_err(Into::<Error>::into)
326-
.and_then(|response| {
327-
let clone = self.clone();
328-
async move {
329-
let object_store = clone
330-
.object_store_builder
331-
.build(Bucket::from_path(&response.metadata.location)?)?;
332-
Table::new(identifier.clone(), clone, object_store, response.metadata).await
333-
}
334-
})
335-
.await
347+
.await?;
348+
349+
let object_store = object_store_from_response(&response)?
350+
.ok_or(Error::NotFound("Object store credentials".to_string()))
351+
.or_else(|_| {
352+
self.default_object_store_builder
353+
.as_ref()
354+
.ok_or(Error::NotFound("Default object store".to_string()))
355+
.and_then(|x| {
356+
let bucket = Bucket::from_path(&response.metadata.location)?;
357+
x.build(bucket)
358+
})
359+
})?;
360+
361+
Table::new(identifier.clone(), self, object_store, response.metadata).await
336362
}
337363
/// Update a table by atomically changing the pointer to the metadata file
338364
async fn update_table(
339365
self: Arc<Self>,
340366
commit: iceberg_rust::catalog::commit::CommitTable,
341367
) -> Result<Table, Error> {
342368
let identifier = commit.identifier.clone();
343-
catalog_api_api::update_table(
369+
let response = catalog_api_api::update_table(
344370
&self.configuration,
345371
self.name.as_deref(),
346372
&identifier.namespace().to_string(),
347373
identifier.name(),
348374
commit,
349375
)
350-
.map_err(Into::<Error>::into)
351-
.and_then(|response| {
352-
let clone = self.clone();
353-
let identifier = identifier.clone();
354-
async move {
355-
let object_store = clone
356-
.object_store_builder
357-
.build(Bucket::from_path(&response.metadata.location)?)?;
358-
Table::new(identifier, clone, object_store, response.metadata).await
359-
}
360-
})
361376
.await
377+
.map_err(Into::<Error>::into)?;
378+
379+
let Some(object_store) = self.cache.read().unwrap().get(&identifier).cloned() else {
380+
return Err(Error::NotFound(format!(
381+
"Object store for table {}",
382+
&identifier
383+
)));
384+
};
385+
386+
Table::new(identifier, self, object_store, response.metadata).await
362387
}
363388
async fn create_view(
364389
self: Arc<Self>,
@@ -489,34 +514,41 @@ impl Catalog for RestCatalog {
489514
metadata_location.to_owned(),
490515
);
491516

492-
catalog_api_api::register_table(
517+
let response = catalog_api_api::register_table(
493518
&self.configuration,
494519
self.name.as_deref(),
495520
&identifier.namespace().to_string(),
496521
request,
497522
)
498523
.map_err(Into::<Error>::into)
499-
.and_then(|response| {
500-
let clone = self.clone();
501-
async move {
502-
let object_store = clone
503-
.object_store_builder
504-
.build(Bucket::from_path(&response.metadata.location)?)?;
505-
Table::new(identifier.clone(), clone, object_store, response.metadata).await
506-
}
507-
})
508-
.await
524+
.await?;
525+
let object_store = object_store_from_response(&response)?
526+
.ok_or(Error::NotFound("Object store credentials".to_string()))
527+
.or_else(|_| {
528+
self.default_object_store_builder
529+
.as_ref()
530+
.ok_or(Error::NotFound("Default object store".to_string()))
531+
.and_then(|x| {
532+
let bucket = Bucket::from_path(&response.metadata.location)?;
533+
x.build(bucket)
534+
})
535+
})?;
536+
537+
Table::new(identifier.clone(), self, object_store, response.metadata).await
509538
}
510539
}
511540

512541
#[derive(Debug, Clone)]
513542
pub struct RestCatalogList {
514543
configuration: Configuration,
515-
object_store_builder: ObjectStoreBuilder,
544+
object_store_builder: Option<ObjectStoreBuilder>,
516545
}
517546

518547
impl RestCatalogList {
519-
pub fn new(configuration: Configuration, object_store_builder: ObjectStoreBuilder) -> Self {
548+
pub fn new(
549+
configuration: Configuration,
550+
object_store_builder: Option<ObjectStoreBuilder>,
551+
) -> Self {
520552
Self {
521553
configuration,
522554
object_store_builder,
@@ -542,14 +574,14 @@ impl CatalogList for RestCatalogList {
542574
pub struct RestNoPrefixCatalogList {
543575
name: String,
544576
configuration: Configuration,
545-
object_store_builder: ObjectStoreBuilder,
577+
object_store_builder: Option<ObjectStoreBuilder>,
546578
}
547579

548580
impl RestNoPrefixCatalogList {
549581
pub fn new(
550582
name: &str,
551583
configuration: Configuration,
552-
object_store_builder: ObjectStoreBuilder,
584+
object_store_builder: Option<ObjectStoreBuilder>,
553585
) -> Self {
554586
Self {
555587
name: name.to_owned(),
@@ -577,6 +609,50 @@ impl CatalogList for RestNoPrefixCatalogList {
577609
}
578610
}
579611

612+
const CLIENT_REGION: &str = "client.region";
613+
const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
614+
const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
615+
const AWS_SESSION_TOKEN: &str = "s3.session-token";
616+
617+
fn object_store_from_response(
618+
response: &models::LoadTableResult,
619+
) -> Result<Option<Arc<dyn ObjectStore>>, Error> {
620+
let config = match (&response.storage_credentials, &response.config) {
621+
(Some(credentials), _) => Some(&credentials[0].config),
622+
(None, Some(config)) => Some(config),
623+
(None, None) => None,
624+
};
625+
626+
let Some(config) = config else {
627+
return Ok(None);
628+
};
629+
630+
let region = config.get(CLIENT_REGION);
631+
if config.contains_key(AWS_ACCESS_KEY_ID) {
632+
let access_key_id = config.get(AWS_ACCESS_KEY_ID);
633+
let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY);
634+
let session_token = config.get(AWS_SESSION_TOKEN);
635+
let mut builder = AmazonS3Builder::new();
636+
637+
if let Some(region) = region {
638+
builder = builder.with_region(region)
639+
}
640+
if let Some(access_key_id) = access_key_id {
641+
builder = builder.with_access_key_id(access_key_id)
642+
}
643+
if let Some(secret_access_key) = secret_access_key {
644+
builder = builder.with_secret_access_key(secret_access_key)
645+
}
646+
if let Some(session_token) = session_token {
647+
builder = builder.with_token(session_token)
648+
}
649+
650+
Ok(Some(Arc::new(builder.build()?)))
651+
} else {
652+
Ok(None)
653+
}
654+
}
655+
580656
#[cfg(test)]
581657
pub mod tests {
582658
use datafusion::{
@@ -685,7 +761,7 @@ pub mod tests {
685761
let iceberg_catalog = Arc::new(RestCatalog::new(
686762
None,
687763
configuration(&format!("http://{rest_host}:{rest_port}")),
688-
object_store,
764+
Some(object_store),
689765
));
690766

691767
iceberg_catalog

datafusion_iceberg/tests/integration_trino.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async fn integration_trino_rest() {
199199
let catalog = Arc::new(RestCatalog::new(
200200
None,
201201
configuration(&rest_host.to_string(), rest_port),
202-
object_store,
202+
Some(object_store),
203203
));
204204

205205
let tables = catalog

iceberg-rust/src/table/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl Table {
192192
///
193193
/// # Returns
194194
/// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
195-
/// or an empty vector if no current snapshot exists
195+
/// or an empty vector if no current snapshot exists
196196
///
197197
/// # Errors
198198
/// Returns an error if:

0 commit comments

Comments
 (0)