Skip to content

Commit 600c44d

Browse files
authored
Remove iceberg call from create table statement (#21)
* Remove iceberg call from create table * Remove invalidate_cache
1 parent 6f452dd commit 600c44d

6 files changed

Lines changed: 149 additions & 297 deletions

File tree

crates/catalog/src/catalog_list.rs

Lines changed: 0 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use aws_credential_types::provider::SharedCredentialsProvider;
1414
use catalog_metastore::{
1515
AwsCredentials, Database, Metastore, RwObject, S3TablesVolume, VolumeType,
1616
};
17-
use catalog_metastore::{SchemaIdent, TableIdent};
1817
use dashmap::DashMap;
1918
use datafusion::{
2019
catalog::{CatalogProvider, CatalogProviderList},
@@ -37,22 +36,6 @@ use url::Url;
3736

3837
pub const DEFAULT_CATALOG: &str = "embucket";
3938

40-
#[derive(Debug, Eq, PartialEq)]
41-
pub enum CachedEntity {
42-
Schema(SchemaIdent),
43-
Table(TableIdent),
44-
}
45-
46-
impl CachedEntity {
47-
#[must_use]
48-
pub fn normalized(&self) -> Self {
49-
match self {
50-
Self::Schema(ident) => Self::Schema(ident.normalized()),
51-
Self::Table(ident) => Self::Table(ident.normalized()),
52-
}
53-
}
54-
}
55-
5639
pub struct EmbucketCatalogList {
5740
pub metastore: Arc<dyn Metastore>,
5841
pub table_object_store: Arc<DashMap<String, Arc<dyn ObjectStore>>>,
@@ -262,109 +245,6 @@ impl EmbucketCatalogList {
262245
)
263246
}
264247

265-
/// Do not keep returned references to avoid deadlocks
266-
fn catalog_ref_by_name(
267-
&self,
268-
name: &str,
269-
) -> Result<dashmap::mapref::one::Ref<'_, String, Arc<CachingCatalog>>> {
270-
self.catalogs.get(name).ok_or_else(|| {
271-
InvalidCacheSnafu {
272-
entity: "catalog",
273-
name,
274-
}
275-
.build()
276-
})
277-
}
278-
279-
/// Invalidates the cache for a specific catalog entity (schema or table).
280-
///
281-
/// This method ensures that the cache for the specified entity is refreshed or cleared as appropriate.
282-
/// - For a schema: If the schema exists in the underlying catalog, it is (re-)cached; if it does not exist, the cache entry is removed.
283-
/// - For a table: The table cache is invalidated. If the table exists in the underlying schema, it is (re-)cached; otherwise, the cache entry is removed.
284-
///
285-
/// # Arguments
286-
/// * `entity` - The cached entity to invalidate, which can be either a schema or a table.
287-
///
288-
/// # Errors
289-
/// Returns an error if:
290-
/// - The specified catalog or schema does not exist in the cache.
291-
/// - There is a failure when accessing the underlying catalog or schema provider.
292-
#[allow(clippy::as_conversions, clippy::too_many_lines)]
293-
#[tracing::instrument(
294-
name = "EmbucketCatalogList::refresh_schema",
295-
level = "debug",
296-
skip(self),
297-
err
298-
)]
299-
pub async fn invalidate_cache(&self, entity: CachedEntity) -> Result<()> {
300-
match entity {
301-
CachedEntity::Schema(schema_ident) => {
302-
let SchemaIdent { schema, database } = schema_ident;
303-
let catalog_ref = self.catalog_ref_by_name(&database)?;
304-
if !catalog_ref.should_refresh {
305-
return Ok(());
306-
}
307-
if let Some(schema_provider) = catalog_ref.catalog.schema(&schema) {
308-
// schema exists -> ensure it's cached
309-
if catalog_ref.schemas_cache.get(&schema).is_none() {
310-
let schema = CachingSchema {
311-
schema: schema_provider,
312-
tables_cache: DashMap::default(),
313-
name: schema.clone(),
314-
iceberg_catalog: catalog_ref.iceberg_catalog.clone(),
315-
};
316-
catalog_ref
317-
.schemas_cache
318-
.insert(schema.name.clone(), Arc::new(schema));
319-
}
320-
} else {
321-
// no schema exists -> ensure cache is empty
322-
catalog_ref.schemas_cache.remove(&schema);
323-
}
324-
}
325-
CachedEntity::Table(table_ident) => {
326-
let TableIdent {
327-
database,
328-
schema,
329-
table,
330-
} = table_ident;
331-
let catalog_ref = self.catalog_ref_by_name(&database)?;
332-
if !catalog_ref.should_refresh {
333-
return Ok(());
334-
}
335-
let schema_ref = catalog_ref.schemas_cache.get(&schema).ok_or_else(|| {
336-
InvalidCacheSnafu {
337-
entity: "schema",
338-
name: format!("{database}.{schema}"),
339-
}
340-
.build()
341-
})?;
342-
343-
// invalidate table cache if table exists, noop if doesn't
344-
schema_ref.tables_cache.remove(&table);
345-
346-
if let Some(table_provider) = schema_ref
347-
.schema
348-
.table(&table)
349-
.await
350-
.context(catalog_error::DataFusionSnafu)?
351-
{
352-
// ensure table is cached
353-
schema_ref.tables_cache.insert(
354-
table.clone(),
355-
Arc::new(CachingTable::new_with_schema(
356-
table.clone(),
357-
table_provider.schema(),
358-
Arc::clone(&table_provider),
359-
)),
360-
);
361-
}
362-
}
363-
}
364-
365-
Ok(())
366-
}
367-
368248
#[allow(clippy::as_conversions, clippy::too_many_lines)]
369249
#[tracing::instrument(
370250
name = "EmbucketCatalogList::refresh",

crates/catalog/src/schema.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use crate::df_error;
2-
use crate::table::CachingTable;
2+
use crate::table::{CachingTable, IcebergTableBuilder};
33
use async_trait::async_trait;
44
use dashmap::DashMap;
55
use datafusion::catalog::{SchemaProvider, TableProvider};
66
use datafusion_common::DataFusionError;
77
use datafusion_expr::TableType;
8+
use datafusion_iceberg::DataFusionTable;
89
use futures::executor::block_on;
910
use iceberg_rust::catalog::Catalog;
11+
use iceberg_rust::catalog::tabular::Tabular as IcebergTabular;
1012
use iceberg_rust_spec::identifier::Identifier;
13+
use snafu::ResultExt;
1114
use snafu::futures::TryFutureExt;
1215
use std::any::Any;
1316
use std::sync::Arc;
@@ -81,12 +84,29 @@ impl SchemaProvider for CachingSchema {
8184
name: String,
8285
table: Arc<dyn TableProvider>,
8386
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
84-
let caching_table = Arc::new(CachingTable::new(name.clone(), Arc::clone(&table)));
85-
self.tables_cache.insert(name.clone(), caching_table);
86-
if table.table_type() != TableType::View {
87-
return self.schema.register_table(name, table);
88-
}
89-
Ok(Some(table))
87+
let table_provider: Arc<dyn TableProvider> = if let Some(catalog) = &self.iceberg_catalog
88+
&& let Some(iceberg_builder) = table.as_any().downcast_ref::<IcebergTableBuilder>()
89+
&& table.table_type() != TableType::View
90+
{
91+
let ident = Identifier::new(std::slice::from_ref(&self.name), &name);
92+
block_on(async move {
93+
let mut builder = iceberg_builder.builder.clone();
94+
let iceberg_table = builder
95+
.build(ident.namespace(), catalog.clone())
96+
.await
97+
.context(df_error::IcebergSnafu)?;
98+
let tabular = IcebergTabular::Table(iceberg_table);
99+
let table_provider: Arc<dyn TableProvider> =
100+
Arc::new(DataFusionTable::new(tabular, None, None, None));
101+
Ok::<Arc<dyn TableProvider>, DataFusionError>(table_provider)
102+
})?
103+
} else {
104+
table
105+
};
106+
107+
let caching_table = Arc::new(CachingTable::new(name.clone(), Arc::clone(&table_provider)));
108+
self.tables_cache.insert(name, caching_table);
109+
Ok(Some(table_provider))
90110
}
91111

92112
#[allow(clippy::as_conversions)]

crates/catalog/src/table.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use crate::df_error;
22
use async_trait::async_trait;
3-
use datafusion::arrow::datatypes::SchemaRef;
3+
use datafusion::arrow::datatypes::{Schema, SchemaRef};
44
use datafusion::catalog::{Session, TableProvider};
55
use datafusion::datasource::{ViewTable, provider_as_source};
66
use datafusion::execution::SessionState;
7-
use datafusion_common::Statistics;
87
use datafusion_common::tree_node::{Transformed, TreeNode};
8+
use datafusion_common::{Statistics, plan_err};
99
use datafusion_expr::dml::InsertOp;
1010
use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, TableType};
1111
use datafusion_physical_plan::ExecutionPlan;
12+
use iceberg_rust::catalog::create::CreateTableBuilder;
1213
use once_cell::sync::OnceCell;
1314
use snafu::OptionExt;
1415
use std::any::Any;
1516
use std::collections::HashMap;
17+
use std::fmt::{Debug, Formatter};
1618
use std::sync::Arc;
1719

1820
pub struct CachingTable {
@@ -38,8 +40,8 @@ impl CachingTable {
3840
}
3941
}
4042

41-
impl std::fmt::Debug for CachingTable {
42-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43+
impl Debug for CachingTable {
44+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
4345
f.debug_struct("Table")
4446
.field("schema", &"")
4547
.field("name", &self.name)
@@ -176,3 +178,47 @@ async fn rewrite_view_source(
176178
.data;
177179
Ok(new_plan)
178180
}
181+
182+
pub struct IcebergTableBuilder {
183+
pub builder: CreateTableBuilder,
184+
}
185+
186+
impl IcebergTableBuilder {
187+
#[must_use]
188+
pub const fn new(builder: CreateTableBuilder) -> Self {
189+
Self { builder }
190+
}
191+
}
192+
193+
impl Debug for IcebergTableBuilder {
194+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
195+
f.debug_struct("IcebergTableBuilder")
196+
.field("builder", &"")
197+
.finish()
198+
}
199+
}
200+
201+
#[async_trait]
202+
impl TableProvider for IcebergTableBuilder {
203+
fn as_any(&self) -> &dyn Any {
204+
self
205+
}
206+
207+
fn schema(&self) -> SchemaRef {
208+
SchemaRef::from(Schema::empty())
209+
}
210+
211+
fn table_type(&self) -> TableType {
212+
TableType::Base
213+
}
214+
215+
async fn scan(
216+
&self,
217+
_state: &dyn Session,
218+
_projection: Option<&Vec<usize>>,
219+
_filters: &[Expr],
220+
_limit: Option<usize>,
221+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
222+
plan_err!("Iceberg table builder cannot be scanned")
223+
}
224+
}

0 commit comments

Comments
 (0)