Skip to content

Commit 4fa5c59

Browse files
authored
Fill catalog tables in parallel (#3)
* Get schema tables in parallel * Get schema tables in parallel * Speedup s3 tables
1 parent 7544e78 commit 4fa5c59

4 files changed

Lines changed: 37 additions & 25 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ styles/
1212
.idea/
1313
.cursor
1414

15+
metastore.yaml
1516

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ volumes:
5757
aws-access-key-id: YOUR_ACCESS_KEY
5858
aws-secret-access-key: YOUR_SECRET_KEY
5959
arn: arn:aws:s3tables:us-east-2:123456789012:bucket/my-table-bucket
60-
60+
databases:
61+
- ident: my_db
62+
volume: demo
6163
# S3 volume - connects to standard S3 bucket
6264
# - ident: volume
6365
# type: s3

crates/catalog/src/catalog_list.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use datafusion::{
2121
execution::object_store::ObjectStoreRegistry,
2222
};
2323
use datafusion_iceberg::catalog::catalog::IcebergCatalog as DataFusionIcebergCatalog;
24+
use futures::future::join_all;
2425
use iceberg_rust::object_store::ObjectStoreBuilder;
2526
use iceberg_s3tables_catalog::S3TablesCatalog;
2627
use object_store::ObjectStore;
@@ -248,7 +249,7 @@ impl EmbucketCatalogList {
248249
.await
249250
.context(catalog_error::DataFusionSnafu)?;
250251
Ok(CachingCatalog::new(Arc::new(catalog), name.to_string())
251-
.with_refresh(false)
252+
.with_refresh(true)
252253
.with_catalog_type(CatalogType::S3tables))
253254
}
254255

@@ -387,22 +388,33 @@ impl EmbucketCatalogList {
387388
name: schema.clone(),
388389
};
389390
let tables = schema.schema.table_names();
390-
for table in tables {
391-
if let Some(table_provider) = schema
392-
.schema
393-
.table(&table)
394-
.await
395-
.context(catalog_error::DataFusionSnafu)?
396-
{
397-
schema.tables_cache.insert(
398-
table.clone(),
399-
Arc::new(CachingTable::new_with_schema(
400-
table,
401-
table_provider.schema(),
402-
Arc::clone(&table_provider),
403-
)),
404-
);
405-
}
391+
392+
let futs = tables
393+
.iter()
394+
.map(|table_name| async {
395+
let tp = schema
396+
.schema
397+
.table(table_name)
398+
.await
399+
.context(catalog_error::DataFusionSnafu)
400+
.ok()?
401+
.map(Arc::new)?;
402+
403+
Some((table_name.clone(), tp))
404+
})
405+
.collect::<Vec<_>>();
406+
407+
let results = join_all(futs).await;
408+
for res in results.into_iter().flatten() {
409+
let (table_name, table_provider) = res;
410+
schema.tables_cache.insert(
411+
table_name.clone(),
412+
Arc::new(CachingTable::new_with_schema(
413+
table_name,
414+
table_provider.schema(),
415+
Arc::clone(&table_provider),
416+
)),
417+
);
406418
}
407419
catalog
408420
.schemas_cache

crates/catalog/src/schema.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,11 @@ impl SchemaProvider for CachingSchema {
5050
// of SQL (e.g., via direct catalog API calls). In such cases, our cache could contain
5151
// stale metadata and ignore the latest snapshot updates.
5252
//
53-
// However, views are registered and stored only in the local cache, so we must
54-
// check the cache first and return the view if present.
55-
53+
// However, since we assume that users will interact with the Iceberg catalog
54+
// exclusively through Embucket, we can safely enable caching — in this case,
55+
// the data will remain consistent across all queries.
5656
if let Some(table) = self.tables_cache.get(name) {
57-
let table = table.value();
58-
if table.table_type() == TableType::View {
59-
return Ok(Some(Arc::clone(table) as Arc<dyn TableProvider>));
60-
}
57+
return Ok(Some(Arc::clone(table.value()) as Arc<dyn TableProvider>));
6158
}
6259

6360
if let Some(table) = self.schema.table(name).await? {

0 commit comments

Comments
 (0)