Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ styles/
.idea/
.cursor

metastore.yaml

4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ volumes:
aws-access-key-id: YOUR_ACCESS_KEY
aws-secret-access-key: YOUR_SECRET_KEY
arn: arn:aws:s3tables:us-east-2:123456789012:bucket/my-table-bucket

databases:
- ident: my_db
volume: demo
# S3 volume - connects to standard S3 bucket
# - ident: volume
# type: s3
Expand Down
46 changes: 29 additions & 17 deletions crates/catalog/src/catalog_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::{
execution::object_store::ObjectStoreRegistry,
};
use datafusion_iceberg::catalog::catalog::IcebergCatalog as DataFusionIcebergCatalog;
use futures::future::join_all;
use iceberg_rust::object_store::ObjectStoreBuilder;
use iceberg_s3tables_catalog::S3TablesCatalog;
use object_store::ObjectStore;
Expand Down Expand Up @@ -248,7 +249,7 @@ impl EmbucketCatalogList {
.await
.context(catalog_error::DataFusionSnafu)?;
Ok(CachingCatalog::new(Arc::new(catalog), name.to_string())
.with_refresh(false)
.with_refresh(true)
.with_catalog_type(CatalogType::S3tables))
}

Expand Down Expand Up @@ -387,22 +388,33 @@ impl EmbucketCatalogList {
name: schema.clone(),
};
let tables = schema.schema.table_names();
for table in tables {
if let Some(table_provider) = schema
.schema
.table(&table)
.await
.context(catalog_error::DataFusionSnafu)?
{
schema.tables_cache.insert(
table.clone(),
Arc::new(CachingTable::new_with_schema(
table,
table_provider.schema(),
Arc::clone(&table_provider),
)),
);
}

let futs = tables
.iter()
.map(|table_name| async {
let tp = schema
.schema
.table(table_name)
.await
.context(catalog_error::DataFusionSnafu)
.ok()?
.map(Arc::new)?;

Some((table_name.clone(), tp))
})
.collect::<Vec<_>>();
Comment thread
osipovartem marked this conversation as resolved.

let results = join_all(futs).await;
for res in results.into_iter().flatten() {
let (table_name, table_provider) = res;
schema.tables_cache.insert(
table_name.clone(),
Arc::new(CachingTable::new_with_schema(
table_name,
table_provider.schema(),
Arc::clone(&table_provider),
)),
);
}
catalog
.schemas_cache
Expand Down
11 changes: 4 additions & 7 deletions crates/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ impl SchemaProvider for CachingSchema {
// of SQL (e.g., via direct catalog API calls). In such cases, our cache could contain
// stale metadata and ignore the latest snapshot updates.
//
// However, views are registered and stored only in the local cache, so we must
// check the cache first and return the view if present.

// However, since we assume that users will interact with the Iceberg catalog
// exclusively through Embucket, we can safely enable caching — in this case,
// the data will remain consistent across all queries.
if let Some(table) = self.tables_cache.get(name) {
let table = table.value();
if table.table_type() == TableType::View {
return Ok(Some(Arc::clone(table) as Arc<dyn TableProvider>));
}
return Ok(Some(Arc::clone(table.value()) as Arc<dyn TableProvider>));
}

if let Some(table) = self.schema.table(name).await? {
Expand Down