diff --git a/.gitignore b/.gitignore index 5f279b91..c68a1c75 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ styles/ .idea/ .cursor +metastore.yaml diff --git a/README.md b/README.md index 4dec22d4..2d771411 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,9 @@ volumes: aws-access-key-id: YOUR_ACCESS_KEY aws-secret-access-key: YOUR_SECRET_KEY arn: arn:aws:s3tables:us-east-2:123456789012:bucket/my-table-bucket - +databases: + - ident: my_db + volume: demo # S3 volume - connects to standard S3 bucket # - ident: volume # type: s3 diff --git a/crates/catalog/src/catalog_list.rs b/crates/catalog/src/catalog_list.rs index 82c559cb..4d3419eb 100644 --- a/crates/catalog/src/catalog_list.rs +++ b/crates/catalog/src/catalog_list.rs @@ -21,6 +21,7 @@ use datafusion::{ execution::object_store::ObjectStoreRegistry, }; use datafusion_iceberg::catalog::catalog::IcebergCatalog as DataFusionIcebergCatalog; +use futures::future::join_all; use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_s3tables_catalog::S3TablesCatalog; use object_store::ObjectStore; @@ -248,7 +249,7 @@ impl EmbucketCatalogList { .await .context(catalog_error::DataFusionSnafu)?; Ok(CachingCatalog::new(Arc::new(catalog), name.to_string()) - .with_refresh(false) + .with_refresh(true) .with_catalog_type(CatalogType::S3tables)) } @@ -387,22 +388,33 @@ impl EmbucketCatalogList { name: schema.clone(), }; let tables = schema.schema.table_names(); - for table in tables { - if let Some(table_provider) = schema - .schema - .table(&table) - .await - .context(catalog_error::DataFusionSnafu)? - { - schema.tables_cache.insert( - table.clone(), - Arc::new(CachingTable::new_with_schema( - table, - table_provider.schema(), - Arc::clone(&table_provider), - )), - ); - } + + let futs = tables + .iter() + .map(|table_name| async { + let tp = schema + .schema + .table(table_name) + .await + .context(catalog_error::DataFusionSnafu) + .ok()? + .map(Arc::new)?; + + Some((table_name.clone(), tp)) + }) + .collect::>(); + + let results = join_all(futs).await; + for res in results.into_iter().flatten() { + let (table_name, table_provider) = res; + schema.tables_cache.insert( + table_name.clone(), + Arc::new(CachingTable::new_with_schema( + table_name, + table_provider.schema(), + Arc::clone(&table_provider), + )), + ); } catalog .schemas_cache diff --git a/crates/catalog/src/schema.rs b/crates/catalog/src/schema.rs index aea398ef..020217c2 100644 --- a/crates/catalog/src/schema.rs +++ b/crates/catalog/src/schema.rs @@ -50,14 +50,11 @@ impl SchemaProvider for CachingSchema { // of SQL (e.g., via direct catalog API calls). In such cases, our cache could contain // stale metadata and ignore the latest snapshot updates. // - // However, views are registered and stored only in the local cache, so we must - // check the cache first and return the view if present. - + // However, since we assume that users will interact with the Iceberg catalog + // exclusively through Embucket, we can safely enable caching — in this case, + // the data will remain consistent across all queries. if let Some(table) = self.tables_cache.get(name) { - let table = table.value(); - if table.table_type() == TableType::View { - return Ok(Some(Arc::clone(table) as Arc)); - } + return Ok(Some(Arc::clone(table.value()) as Arc)); } if let Some(table) = self.schema.table(name).await? {