Skip to content

Commit ba5d1e0

Browse files
authored
Remove iceberg catalog calls from create schema (#20)
* Merge * Remove iceberg catalog from drop statement * Fix * Init * Remove iceberg related logic from create schema * Remove iceberg related logic from create schema * Remove iceberg related logic from create schema
1 parent c9141d9 commit ba5d1e0

5 files changed

Lines changed: 101 additions & 106 deletions

File tree

crates/catalog/src/catalog.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
1+
use crate::catalogs::embucket::schema::EmbucketSchema;
12
use crate::df_error;
23
use crate::schema::CachingSchema;
4+
use catalog_metastore::Metastore;
35
use chrono::NaiveDateTime;
46
use dashmap::DashMap;
57
use datafusion::catalog::{CatalogProvider, SchemaProvider};
68
use datafusion_common::DataFusionError;
9+
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
10+
use datafusion_iceberg::catalog::schema::IcebergSchema;
711
use futures::executor::block_on;
812
use iceberg_rust::catalog::Catalog;
913
use iceberg_rust_spec::namespace::Namespace;
14+
use snafu::OptionExt;
1015
use snafu::futures::TryFutureExt;
1116
use std::fmt::{Display, Formatter};
1217
use std::{any::Any, sync::Arc};
1318

1419
#[derive(Clone)]
1520
pub struct CachingCatalog {
1621
pub catalog: Arc<dyn CatalogProvider>,
22+
pub metastore: Option<Arc<dyn Metastore>>,
1723
pub iceberg_catalog: Option<Arc<dyn Catalog>>,
1824
pub catalog_type: CatalogType,
1925
pub schemas_cache: DashMap<String, Arc<CachingSchema>>,
@@ -65,6 +71,7 @@ impl CachingCatalog {
6571
Self {
6672
catalog: catalog_provider,
6773
iceberg_catalog,
74+
metastore: None,
6875
schemas_cache: DashMap::new(),
6976
should_refresh: false,
7077
enable_information_schema: true,
@@ -95,6 +102,12 @@ impl CachingCatalog {
95102
self.properties = Some(properties);
96103
self
97104
}
105+
106+
#[must_use]
107+
pub fn with_metastore(mut self, metastore: Arc<dyn Metastore>) -> Self {
108+
self.metastore = Some(metastore);
109+
self
110+
}
98111
}
99112

100113
#[allow(clippy::missing_fields_in_debug)]
@@ -183,15 +196,57 @@ impl CatalogProvider for CachingCatalog {
183196
name: &str,
184197
schema: Arc<dyn SchemaProvider>,
185198
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
199+
let schema_provider = if let Some(catalog) = &self.iceberg_catalog {
200+
let namespace = Namespace::try_new(std::slice::from_ref(&name.to_string()))
201+
.map_err(|err| DataFusionError::External(Box::new(err)))?;
202+
203+
let schema_provider: Arc<dyn SchemaProvider> = match self.catalog_type {
204+
CatalogType::Embucket | CatalogType::Memory => {
205+
let metastore = self
206+
.metastore
207+
.clone()
208+
.context(df_error::MetastoreIsMissingSnafu)?;
209+
Arc::new(EmbucketSchema {
210+
database: self.name.clone(),
211+
schema: name.to_string(),
212+
metastore: Arc::clone(&metastore),
213+
iceberg_catalog: catalog.clone(),
214+
})
215+
}
216+
CatalogType::S3tables => {
217+
let Some(iceberg_catalog) =
218+
self.catalog.as_any().downcast_ref::<IcebergCatalog>()
219+
else {
220+
return Err(DataFusionError::Plan(format!(
221+
"Catalog {} is not an Iceberg catalog.",
222+
self.name
223+
)));
224+
};
225+
Arc::new(IcebergSchema::new(
226+
namespace.clone(),
227+
iceberg_catalog.mirror(),
228+
))
229+
}
230+
};
231+
block_on(
232+
catalog
233+
.create_namespace(&namespace, None)
234+
.context(df_error::IcebergSnafu),
235+
)?;
236+
schema_provider
237+
} else {
238+
return self.catalog.register_schema(name, schema);
239+
};
240+
186241
let caching_schema = Arc::new(CachingSchema {
187242
name: name.to_string(),
188-
schema: Arc::clone(&schema),
243+
schema: schema_provider,
189244
tables_cache: DashMap::new(),
190245
iceberg_catalog: self.iceberg_catalog.clone(),
191246
});
192247
self.schemas_cache
193248
.insert(name.to_string(), Arc::clone(&caching_schema));
194-
self.catalog.register_schema(name, schema)
249+
Ok(Some(caching_schema))
195250
}
196251

197252
#[tracing::instrument(

crates/catalog/src/catalog_list.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ impl EmbucketCatalogList {
213213
.with_properties(Properties {
214214
created_at: db.created_at,
215215
updated_at: db.created_at,
216-
}),
216+
})
217+
.with_metastore(self.metastore.clone()),
217218
)
218219
}
219220

crates/catalog/src/df_error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ pub enum DFExternalError {
1515
#[snafu(implicit)]
1616
location: Location,
1717
},
18+
#[snafu(display("Metastore is missing for embucket catalog"))]
19+
MetastoreIsMissing {
20+
#[snafu(implicit)]
21+
location: Location,
22+
},
1823
#[snafu(display("Ordinal position param overflow: {error}"))]
1924
OrdinalPositionParamOverflow {
2025
#[snafu(source)]

crates/executor/src/error.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,6 @@ pub enum Error {
415415
#[snafu(implicit)]
416416
location: Location,
417417
},
418-
#[snafu(display("Only simple schema names are supported"))]
419-
OnlySimpleSchemaNames {
420-
#[snafu(implicit)]
421-
location: Location,
422-
},
423418
#[snafu(display("unsupported SHOW statement: {statement}"))]
424419
UnsupportedShowStatement {
425420
statement: String,

crates/executor/src/query.rs

Lines changed: 37 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use catalog::catalog_list::CachedEntity;
2525
use catalog::table::CachingTable;
2626
use catalog_metastore::{
2727
AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume,
28-
SchemaIdent as MetastoreSchemaIdent, TableCreateRequest as MetastoreTableCreateRequest,
29-
TableFormat as MetastoreTableFormat, TableIdent as MetastoreTableIdent, Volume, VolumeType,
28+
TableCreateRequest as MetastoreTableCreateRequest, TableFormat as MetastoreTableFormat,
29+
TableIdent as MetastoreTableIdent, Volume, VolumeType,
3030
models::volumes::create_object_store_from_url,
3131
};
3232
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
@@ -73,8 +73,6 @@ use datafusion_expr::{
7373
};
7474
use datafusion_iceberg::DataFusionTable;
7575
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
76-
use datafusion_iceberg::catalog::mirror::Mirror;
77-
use datafusion_iceberg::catalog::schema::IcebergSchema;
7876
use datafusion_iceberg::table::DataFusionTableConfigBuilder;
7977
use datafusion_physical_plan::collect;
8078
use functions::semi_structured::variant::visitors::visit_all;
@@ -90,7 +88,6 @@ use iceberg_rust::catalog::create::CreateTableBuilder;
9088
use iceberg_rust::catalog::tabular::Tabular;
9189
use iceberg_rust::error::Error as IcebergError;
9290
use iceberg_rust::spec::arrow::schema::new_fields_with_ids;
93-
use iceberg_rust::spec::namespace::Namespace;
9491
use iceberg_rust::spec::schema::Schema;
9592
use iceberg_rust::spec::snapshot::Snapshot;
9693
use iceberg_rust::spec::table_metadata::TableMetadata;
@@ -471,16 +468,6 @@ impl UserQuery {
471468
})
472469
}
473470

474-
#[instrument(name = "UserQuery::get_iceberg_mirror", level = "trace")]
475-
fn get_iceberg_mirror(catalog: &Arc<dyn CatalogProvider>) -> Option<Arc<Mirror>> {
476-
let caching_catalog = catalog.as_any().downcast_ref::<CachingCatalog>()?;
477-
let iceberg_catalog = caching_catalog
478-
.catalog
479-
.as_any()
480-
.downcast_ref::<IcebergCatalog>()?;
481-
Some(iceberg_catalog.mirror())
482-
}
483-
484471
/// The code below relies on [`Catalog`] trait for different iceberg catalog
485472
/// implementations (REST, S3 table buckets, or anything else).
486473
/// In case this is built-in datafusion's [`MemoryCatalogProvider`] we shortcut and rely on its implementation
@@ -1660,69 +1647,28 @@ impl UserQuery {
16601647

16611648
#[instrument(name = "UserQuery::create_schema", level = "trace", skip(self), err)]
16621649
pub async fn create_schema(&self, statement: Statement) -> Result<QueryResult> {
1663-
let Statement::CreateSchema {
1664-
schema_name,
1665-
if_not_exists,
1666-
..
1667-
} = statement.clone()
1668-
else {
1669-
return ex_error::OnlyCreateSchemaStatementsSnafu.fail();
1670-
};
1671-
1672-
let SchemaName::Simple(schema_name) = schema_name else {
1673-
return ex_error::OnlySimpleSchemaNamesSnafu.fail();
1674-
};
1675-
1676-
let ident: MetastoreSchemaIdent = self.resolve_schema_object_name(schema_name.0)?.into();
1677-
let plan = self.sql_statement_to_plan(statement).await?;
1678-
let catalog = self.get_catalog(&ident.database)?;
1679-
1680-
let downcast_result = self
1681-
.resolve_iceberg_catalog_or_execute(catalog.clone(), ident.database.clone(), plan)
1682-
.await;
1683-
let iceberg_catalog = match downcast_result {
1684-
IcebergCatalogResult::Catalog(catalog) => catalog,
1685-
IcebergCatalogResult::Result(result) => {
1686-
return result.map(|_| self.created_entity_response())?;
1650+
let mut plan = self.sql_statement_to_plan(statement).await?;
1651+
let (schema_ref, if_not_exists) = match &mut plan {
1652+
LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(t)) => {
1653+
let resolved = self.resolve_schema_name(&t.schema_name)?;
1654+
t.schema_name = resolved.to_string();
1655+
(self.schema_ref_to_resolved(resolved), t.if_not_exists)
16871656
}
1657+
_ => return ex_error::OnlyCreateSchemaStatementsSnafu.fail(),
16881658
};
16891659

1690-
let schema_exists = iceberg_catalog
1691-
.list_namespaces(None)
1692-
.await
1693-
.context(ex_error::IcebergSnafu)?
1694-
.iter()
1695-
.any(|namespace| namespace.join(".") == ident.schema);
1696-
1697-
if schema_exists {
1660+
let catalog = self.get_catalog(&schema_ref.catalog)?;
1661+
if catalog.schema(&schema_ref.schema).is_some() {
16981662
if if_not_exists {
16991663
return self.created_entity_response();
17001664
}
17011665
return ex_error::ObjectAlreadyExistsSnafu {
17021666
r#type: ExistingObjectType::Schema,
1703-
name: ident.schema,
1667+
name: schema_ref.schema.to_string(),
17041668
}
17051669
.fail();
17061670
}
1707-
let namespace = Namespace::try_new(std::slice::from_ref(&ident.schema))
1708-
.map_err(|err| DataFusionError::External(Box::new(err)))
1709-
.context(ex_error::DataFusionSnafu)?;
1710-
iceberg_catalog
1711-
.create_namespace(&namespace, None)
1712-
.await
1713-
.context(ex_error::IcebergSnafu)?;
1714-
if let Some(mirror) = Self::get_iceberg_mirror(&catalog) {
1715-
catalog
1716-
.register_schema(
1717-
&namespace.to_string(),
1718-
Arc::new(IcebergSchema::new(namespace.clone(), mirror)),
1719-
)
1720-
.context(ex_error::DataFusionSnafu)?;
1721-
}
1722-
1723-
self.refresh_catalog_partially(CachedEntity::Schema(ident))
1724-
.await?;
1725-
1671+
self.execute_logical_plan(plan).await?;
17261672
self.created_entity_response()
17271673
}
17281674

@@ -2667,38 +2613,31 @@ impl UserQuery {
26672613
}
26682614

26692615
// Fill in the database if missing and normalize the identifiers for ObjectNamePart
2670-
pub fn resolve_schema_object_name(
2671-
&self,
2672-
mut schema_ident: Vec<ObjectNamePart>,
2673-
) -> Result<NormalizedIdent> {
2674-
match schema_ident.len() {
2675-
1 => {
2676-
schema_ident.insert(
2677-
0,
2678-
ObjectNamePart::Identifier(Ident::new(self.current_database())),
2679-
);
2680-
}
2681-
2 => {}
2682-
_ => {
2683-
return ex_error::InvalidSchemaIdentifierSnafu {
2684-
ident: schema_ident
2685-
.iter()
2686-
.map(ToString::to_string)
2687-
.collect::<Vec<_>>()
2688-
.join("."),
2689-
}
2690-
.fail();
2691-
}
2692-
}
2693-
let normalized_idents = schema_ident
2694-
.into_iter()
2695-
.map(|part| match part {
2696-
ObjectNamePart::Identifier(ident) => self.normalize_ident(ident),
2697-
ObjectNamePart::Function(_) => Ident::new(String::new()),
2698-
})
2699-
.filter(|ident| !ident.value.is_empty())
2616+
pub fn resolve_schema_name(&self, ident: &str) -> Result<SchemaReference> {
2617+
let parts: Vec<String> = ident
2618+
.split('.')
2619+
.map(std::string::ToString::to_string)
27002620
.collect();
2701-
Ok(NormalizedIdent(normalized_idents))
2621+
match parts.len() {
2622+
1 => Ok(SchemaReference::Full {
2623+
catalog: Arc::from(self.current_database()),
2624+
schema: Arc::from(
2625+
self.normalize_ident(Ident::new(parts[0].clone()))
2626+
.to_string(),
2627+
),
2628+
}),
2629+
2 => Ok(SchemaReference::Full {
2630+
catalog: Arc::from(
2631+
self.normalize_ident(Ident::new(parts[0].clone()))
2632+
.to_string(),
2633+
),
2634+
schema: Arc::from(
2635+
self.normalize_ident(Ident::new(parts[1].clone()))
2636+
.to_string(),
2637+
),
2638+
}),
2639+
_ => ex_error::InvalidSchemaIdentifierSnafu { ident }.fail(),
2640+
}
27022641
}
27032642

27042643
fn normalize_ident(&self, ident: Ident) -> Ident {

0 commit comments

Comments
 (0)