Skip to content

Commit c9141d9

Browse files
authored
Remove iceberg catalog from executor for drop statements (#17)
* Merge * Remove iceberg catalog from drop statement * Fix * Fix
1 parent 8331b64 commit c9141d9

9 files changed

Lines changed: 157 additions & 194 deletions

File tree

crates/catalog/src/catalog.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1+
use crate::df_error;
12
use crate::schema::CachingSchema;
23
use chrono::NaiveDateTime;
34
use dashmap::DashMap;
45
use datafusion::catalog::{CatalogProvider, SchemaProvider};
6+
use datafusion_common::DataFusionError;
7+
use futures::executor::block_on;
8+
use iceberg_rust::catalog::Catalog;
9+
use iceberg_rust_spec::namespace::Namespace;
10+
use snafu::futures::TryFutureExt;
511
use std::fmt::{Display, Formatter};
612
use std::{any::Any, sync::Arc};
713

814
#[derive(Clone)]
915
pub struct CachingCatalog {
1016
pub catalog: Arc<dyn CatalogProvider>,
17+
pub iceberg_catalog: Option<Arc<dyn Catalog>>,
1118
pub catalog_type: CatalogType,
1219
pub schemas_cache: DashMap<String, Arc<CachingSchema>>,
1320
pub should_refresh: bool,
@@ -50,9 +57,14 @@ impl Display for CatalogType {
5057
}
5158

5259
impl CachingCatalog {
53-
pub fn new(catalog: Arc<dyn CatalogProvider>, name: String) -> Self {
60+
pub fn new(
61+
catalog_provider: Arc<dyn CatalogProvider>,
62+
name: String,
63+
iceberg_catalog: Option<Arc<dyn Catalog>>,
64+
) -> Self {
5465
Self {
55-
catalog,
66+
catalog: catalog_provider,
67+
iceberg_catalog,
5668
schemas_cache: DashMap::new(),
5769
should_refresh: false,
5870
enable_information_schema: true,
@@ -128,6 +140,7 @@ impl CatalogProvider for CachingCatalog {
128140
name: name.clone(),
129141
schema,
130142
tables_cache: DashMap::new(),
143+
iceberg_catalog: self.iceberg_catalog.clone(),
131144
}),
132145
);
133146
}
@@ -148,6 +161,7 @@ impl CatalogProvider for CachingCatalog {
148161
name: name.to_string(),
149162
schema: Arc::clone(&schema),
150163
tables_cache: DashMap::new(),
164+
iceberg_catalog: self.iceberg_catalog.clone(),
151165
});
152166

153167
self.schemas_cache
@@ -173,6 +187,7 @@ impl CatalogProvider for CachingCatalog {
173187
name: name.to_string(),
174188
schema: Arc::clone(&schema),
175189
tables_cache: DashMap::new(),
190+
iceberg_catalog: self.iceberg_catalog.clone(),
176191
});
177192
self.schemas_cache
178193
.insert(name.to_string(), Arc::clone(&caching_schema));
@@ -190,7 +205,22 @@ impl CatalogProvider for CachingCatalog {
190205
name: &str,
191206
cascade: bool,
192207
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
193-
self.schemas_cache.remove(name);
194-
self.catalog.deregister_schema(name, cascade)
208+
let schema = self.schemas_cache.remove(name);
209+
210+
if let Some(catalog) = &self.iceberg_catalog {
211+
let namespace = Namespace::try_new(std::slice::from_ref(&name.to_string()))
212+
.map_err(|err| DataFusionError::External(Box::new(err)))?;
213+
block_on(
214+
catalog
215+
.drop_namespace(&namespace)
216+
.context(df_error::IcebergSnafu),
217+
)?;
218+
} else {
219+
return self.catalog.deregister_schema(name, cascade);
220+
}
221+
if let Some((_, caching_schema)) = schema {
222+
return Ok(Some(caching_schema));
223+
}
224+
Ok(None)
195225
}
196226
}

crates/catalog/src/catalog_list.rs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use datafusion::{
2222
};
2323
use datafusion_iceberg::catalog::catalog::IcebergCatalog as DataFusionIcebergCatalog;
2424
use futures::future::join_all;
25+
use iceberg_rust::catalog::Catalog;
2526
use iceberg_rust::object_store::ObjectStoreBuilder;
2627
use iceberg_s3tables_catalog::S3TablesCatalog;
2728
use object_store::ObjectStore;
@@ -155,9 +156,8 @@ impl EmbucketCatalogList {
155156
err
156157
)]
157158
pub async fn register_catalogs(self: &Arc<Self>) -> Result<()> {
158-
let mut all_catalogs = Vec::new();
159159
// Add metastore databases as catalogs
160-
all_catalogs.extend(self.internal_catalogs().await?);
160+
let all_catalogs = self.metastore_catalogs().await?;
161161
for catalog in all_catalogs {
162162
self.catalogs
163163
.insert(catalog.name.clone(), Arc::new(catalog));
@@ -171,7 +171,7 @@ impl EmbucketCatalogList {
171171
skip(self),
172172
err
173173
)]
174-
pub async fn internal_catalogs(&self) -> Result<Vec<CachingCatalog>> {
174+
pub async fn metastore_catalogs(&self) -> Result<Vec<CachingCatalog>> {
175175
let mut catalogs = Vec::new();
176176
let databases = self
177177
.metastore
@@ -198,19 +198,23 @@ impl EmbucketCatalogList {
198198
}
199199

200200
fn get_embucket_catalog(&self, db: &RwObject<Database>) -> Result<CachingCatalog> {
201-
let iceberg_catalog = EmbucketIcebergCatalog::new(self.metastore.clone(), db.ident.clone())
202-
.context(MetastoreSnafu)?;
203-
let catalog: Arc<dyn CatalogProvider> = Arc::new(EmbucketCatalog::new(
201+
let iceberg_catalog: Arc<dyn Catalog> = Arc::new(
202+
EmbucketIcebergCatalog::new(self.metastore.clone(), db.ident.clone())
203+
.context(MetastoreSnafu)?,
204+
);
205+
let catalog_provider: Arc<dyn CatalogProvider> = Arc::new(EmbucketCatalog::new(
204206
db.ident.clone(),
205207
self.metastore.clone(),
206-
Arc::new(iceberg_catalog),
208+
iceberg_catalog.clone(),
207209
));
208-
Ok(CachingCatalog::new(catalog, db.ident.clone())
209-
.with_refresh(true)
210-
.with_properties(Properties {
211-
created_at: db.created_at,
212-
updated_at: db.created_at,
213-
}))
210+
Ok(
211+
CachingCatalog::new(catalog_provider, db.ident.clone(), Some(iceberg_catalog))
212+
.with_refresh(true)
213+
.with_properties(Properties {
214+
created_at: db.created_at,
215+
updated_at: db.created_at,
216+
}),
217+
)
214218
}
215219

216220
#[tracing::instrument(
@@ -238,19 +242,23 @@ impl EmbucketCatalogList {
238242
.credentials_provider(SharedCredentialsProvider::new(creds))
239243
.region(Region::new(volume.region()))
240244
.build();
241-
let catalog = S3TablesCatalog::new(
242-
&config,
243-
volume.arn.as_str(),
244-
ObjectStoreBuilder::S3(Box::new(volume.s3_builder())),
245-
)
246-
.context(catalog_error::S3TablesSnafu)?;
245+
let iceberg_catalog: Arc<dyn Catalog> = Arc::new(
246+
S3TablesCatalog::new(
247+
&config,
248+
volume.arn.as_str(),
249+
ObjectStoreBuilder::S3(Box::new(volume.s3_builder())),
250+
)
251+
.context(catalog_error::S3TablesSnafu)?,
252+
);
247253

248-
let catalog = DataFusionIcebergCatalog::new(Arc::new(catalog), None)
254+
let catalog = DataFusionIcebergCatalog::new(iceberg_catalog.clone(), None)
249255
.await
250256
.context(catalog_error::DataFusionSnafu)?;
251-
Ok(CachingCatalog::new(Arc::new(catalog), name.to_string())
252-
.with_refresh(true)
253-
.with_catalog_type(CatalogType::S3tables))
257+
Ok(
258+
CachingCatalog::new(Arc::new(catalog), name.to_string(), Some(iceberg_catalog))
259+
.with_refresh(false)
260+
.with_catalog_type(CatalogType::S3tables),
261+
)
254262
}
255263

256264
/// Do not keep returned references to avoid deadlocks
@@ -302,6 +310,7 @@ impl EmbucketCatalogList {
302310
schema: schema_provider,
303311
tables_cache: DashMap::default(),
304312
name: schema.clone(),
313+
iceberg_catalog: catalog_ref.iceberg_catalog.clone(),
305314
};
306315
catalog_ref
307316
.schemas_cache
@@ -386,6 +395,7 @@ impl EmbucketCatalogList {
386395
schema: schema_provider,
387396
tables_cache: DashMap::default(),
388397
name: schema.clone(),
398+
iceberg_catalog: catalog.iceberg_catalog.clone(),
389399
};
390400
let tables = schema.schema.table_names();
391401

@@ -457,7 +467,7 @@ impl EmbucketCatalogList {
457467
let mut interval = interval(Duration::from_secs(interval_secs));
458468
loop {
459469
interval.tick().await;
460-
match self.internal_catalogs().await {
470+
match self.metastore_catalogs().await {
461471
Ok(catalogs) => {
462472
for catalog in catalogs {
463473
if self.catalogs.contains_key(&catalog.name) {
@@ -539,7 +549,7 @@ impl CatalogProviderList for EmbucketCatalogList {
539549
name: String,
540550
catalog: Arc<dyn CatalogProvider>,
541551
) -> Option<Arc<dyn CatalogProvider>> {
542-
let catalog = CachingCatalog::new(catalog, name);
552+
let catalog = CachingCatalog::new(catalog, name, None);
543553
self.catalogs
544554
.insert(catalog.name.clone(), Arc::new(catalog))
545555
.map(|arc| {

crates/catalog/src/df_error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use error_stack_trace;
2+
use iceberg_rust::error::Error as IcebergError;
23
use snafu::Location;
34
use snafu::prelude::*;
45

@@ -45,6 +46,14 @@ pub enum DFExternalError {
4546
#[snafu(implicit)]
4647
location: Location,
4748
},
49+
50+
#[snafu(display("Iceberg error: {error}"))]
51+
Iceberg {
52+
#[snafu(source(from(IcebergError, Box::new)))]
53+
error: Box<IcebergError>,
54+
#[snafu(implicit)]
55+
location: Location,
56+
},
4857
}
4958

5059
impl From<DFExternalError> for datafusion_common::DataFusionError {

crates/catalog/src/schema.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
1+
use crate::df_error;
12
use crate::table::CachingTable;
23
use async_trait::async_trait;
34
use dashmap::DashMap;
45
use datafusion::catalog::{SchemaProvider, TableProvider};
56
use datafusion_common::DataFusionError;
67
use datafusion_expr::TableType;
8+
use futures::executor::block_on;
9+
use iceberg_rust::catalog::Catalog;
10+
use iceberg_rust_spec::identifier::Identifier;
11+
use snafu::futures::TryFutureExt;
712
use std::any::Any;
813
use std::sync::Arc;
914

1015
pub struct CachingSchema {
1116
pub schema: Arc<dyn SchemaProvider>,
17+
pub iceberg_catalog: Option<Arc<dyn Catalog>>,
1218
pub name: String,
1319
pub tables_cache: DashMap<String, Arc<CachingTable>>,
1420
}
@@ -89,9 +95,15 @@ impl SchemaProvider for CachingSchema {
8995
name: &str,
9096
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
9197
let table = self.tables_cache.remove(name);
98+
9299
if let Some((_, caching_table)) = table {
93100
if caching_table.table_type() != TableType::View {
94-
return self.schema.deregister_table(name);
101+
if let Some(catalog) = &self.iceberg_catalog {
102+
let ident = Identifier::new(std::slice::from_ref(&self.name), name);
103+
block_on(catalog.drop_table(&ident).context(df_error::IcebergSnafu))?;
104+
} else {
105+
return self.schema.deregister_table(name);
106+
}
95107
}
96108
return Ok(Some(caching_table as Arc<dyn TableProvider>));
97109
}

0 commit comments

Comments
 (0)