Skip to content

Commit dee0f09

Browse files
committed
Remove iceberg catalog from drop statement
1 parent 88644ae commit dee0f09

5 files changed

Lines changed: 100 additions & 183 deletions

File tree

crates/catalog/src/catalog.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use datafusion::catalog::{CatalogProvider, SchemaProvider};
66
use datafusion_common::DataFusionError;
77
use futures::executor::block_on;
88
use iceberg_rust::catalog::Catalog;
9-
use iceberg_rust_spec::identifier::Identifier;
109
use iceberg_rust_spec::namespace::Namespace;
1110
use snafu::futures::TryFutureExt;
1211
use std::fmt::{Display, Formatter};
@@ -206,7 +205,7 @@ impl CatalogProvider for CachingCatalog {
206205
name: &str,
207206
cascade: bool,
208207
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
209-
self.schemas_cache.remove(name);
208+
let schema = self.schemas_cache.remove(name);
210209

211210
if let Some(catalog) = &self.iceberg_catalog {
212211
let namespace = Namespace::try_new(std::slice::from_ref(&name.to_string()))
@@ -216,7 +215,12 @@ impl CatalogProvider for CachingCatalog {
216215
.drop_namespace(&namespace)
217216
.context(df_error::IcebergSnafu),
218217
)?;
218+
} else {
219+
return self.catalog.deregister_schema(name, cascade);
220+
}
221+
if let Some((_, caching_schema)) = schema {
222+
return Ok(Some(caching_schema));
219223
}
220-
self.catalog.deregister_schema(name, cascade)
224+
Ok(None)
221225
}
222226
}

crates/catalog/src/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl SchemaProvider for CachingSchema {
9999
if let Some((_, caching_table)) = table {
100100
if caching_table.table_type() != TableType::View {
101101
if let Some(catalog) = &self.iceberg_catalog {
102-
let ident = Identifier::new(std::slice::from_ref(&self.name), &name);
102+
let ident = Identifier::new(std::slice::from_ref(&self.name), name);
103103
block_on(catalog.drop_table(&ident).context(df_error::IcebergSnafu))?;
104104
} else {
105105
return self.schema.deregister_table(name);

crates/executor/src/query.rs

Lines changed: 86 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ use functions::visitors::{
8787
};
8888
use iceberg_rust::catalog::Catalog;
8989
use iceberg_rust::catalog::create::CreateTableBuilder;
90-
use iceberg_rust::catalog::identifier::Identifier;
9190
use iceberg_rust::catalog::tabular::Tabular;
9291
use iceberg_rust::error::Error as IcebergError;
9392
use iceberg_rust::spec::arrow::schema::new_fields_with_ids;
@@ -703,124 +702,87 @@ impl UserQuery {
703702
}
704703

705704
let mut plan = self.sql_statement_to_plan(statement).await?;
706-
match &mut plan {
707-
LogicalPlan::Ddl(ref ddl) => match ddl {
705+
let table_ref = match &mut plan {
706+
LogicalPlan::Ddl(ddl) => match ddl {
708707
DdlStatement::DropTable(t) => {
709-
*t.name = self.resolve_table_ref(t.name.clone()).into();
708+
let resolved = self.resolve_table_ref(t.name.clone());
709+
t.name = resolved.clone().into();
710+
resolved
710711
}
711712
DdlStatement::DropView(v) => {
712-
*v.name = self.resolve_table_ref(v.name.clone()).into();
713+
let resolved = self.resolve_table_ref(v.name.clone());
714+
v.name = resolved.clone().into();
715+
resolved
713716
}
714717
DdlStatement::DropCatalogSchema(s) => {
715-
*s.name = self.resolve_schema_ref(s.name.clone())
718+
let resolved = self.resolve_schema_ref(s.name.clone());
719+
s.name = resolved;
720+
self.schema_ref_to_resolved(s.name.clone())
716721
}
717722
_ => return ex_error::OnlyDropStatementsSnafu.fail(),
718723
},
719724
_ => return ex_error::OnlyDropStatementsSnafu.fail(),
720725
};
721-
self.execute_logical_plan(plan).await
722726

723-
// let catalog_name = table_ref.catalog.as_ref();
724-
// let schema_name = table_ref.schema.to_string();
725-
// let ident = Identifier::new(std::slice::from_ref(&schema_name), table_ref.table.as_ref());
726-
//
727-
// // Inject more information to the error
728-
// let catalog = self.get_catalog(catalog_name).map_err(|_| {
729-
// ex_error::CatalogNotFoundSnafu {
730-
// operation_on: match object_type {
731-
// ObjectType::Table | ObjectType::View => OperationOn::Table(OperationType::Drop),
732-
// ObjectType::Schema => OperationOn::Schema(OperationType::Drop),
733-
// ObjectType::Database => OperationOn::Database(OperationType::Drop),
734-
// _ => OperationOn::Unknown,
735-
// },
736-
// catalog: catalog_name,
737-
// }
738-
// .build()
739-
// })?;
740-
//
741-
// let iceberg_catalog = match self
742-
// .resolve_iceberg_catalog_or_execute(catalog.clone(), catalog_name.to_string(), plan)
743-
// .await
744-
// {
745-
// IcebergCatalogResult::Catalog(catalog) => catalog,
746-
// IcebergCatalogResult::Result(result) => {
747-
// return result.map(|_| self.status_response())?;
748-
// }
749-
// };
750-
//
751-
// match object_type {
752-
// ObjectType::Table | ObjectType::View => {
753-
// let table_resp = iceberg_catalog.clone().load_tabular(&ident).await;
754-
// let namespace_exists = match iceberg_catalog
755-
// .clone()
756-
// .namespace_exists(ident.namespace())
757-
// .await
758-
// {
759-
// Ok(exists) => exists,
760-
// Err(err) => {
761-
// if is_missing_catalog_entity(&err) {
762-
// false
763-
// } else {
764-
// return Err(err).context(ex_error::IcebergSnafu);
765-
// }
766-
// }
767-
// };
768-
// match table_resp {
769-
// Ok(_) => {
770-
// iceberg_catalog
771-
// .drop_table(&ident)
772-
// .await
773-
// .context(ex_error::IcebergSnafu)?;
774-
// self.refresh_catalog_partially(CachedEntity::Table(MetastoreTableIdent {
775-
// database: catalog_name.to_string(),
776-
// schema: schema_name,
777-
// table: ident.name().to_string(),
778-
// }))
779-
// .await?;
780-
// }
781-
// Err(ref err) if is_missing_catalog_entity(err) => {
782-
// if namespace_exists {
783-
// if !if_exists {
784-
// ex_error::TableNotFoundInSchemaInDatabaseSnafu {
785-
// operation_on: OperationOn::Table(OperationType::Drop),
786-
// table: ident.name().to_string(),
787-
// schema: schema_name,
788-
// db: catalog_name.to_string(),
789-
// }
790-
// .fail()?;
791-
// }
792-
// } else if !if_exists {
793-
// ex_error::SchemaNotFoundInDatabaseSnafu {
794-
// operation_on: OperationOn::Table(OperationType::Drop),
795-
// schema: schema_name,
796-
// db: catalog_name.to_string(),
797-
// }
798-
// .fail()?;
799-
// }
800-
// }
801-
// Err(err) => {
802-
// // return original error, since schema exists or another iceberg failure
803-
// return Err(err).context(ex_error::IcebergSnafu);
804-
// }
805-
// }
806-
// self.status_response()
807-
// }
808-
// ObjectType::Schema => {
809-
// if catalog.schema(&schema_name).is_some()
810-
// {
811-
// catalog
812-
// .deregister_schema(&schema_name.clone(), cascade)
813-
// .context(ex_error::DataFusionSnafu)?;
814-
// self.refresh_catalog_partially(CachedEntity::Schema(MetastoreSchemaIdent {
815-
// database: catalog_name.to_string(),
816-
// schema: schema_name,
817-
// }))
818-
// .await?;
819-
// }
820-
// self.status_response()
821-
// }
822-
// _ => ex_error::OnlyDropStatementsSnafu.fail(),
823-
// }
727+
let catalog_name = table_ref.catalog.as_ref();
728+
let schema_name = table_ref.schema.to_string();
729+
730+
// Inject more information to the error
731+
let catalog = self.get_catalog(catalog_name).map_err(|_| {
732+
ex_error::CatalogNotFoundSnafu {
733+
operation_on: match object_type {
734+
ObjectType::Table | ObjectType::View => OperationOn::Table(OperationType::Drop),
735+
ObjectType::Schema => OperationOn::Schema(OperationType::Drop),
736+
ObjectType::Database => OperationOn::Database(OperationType::Drop),
737+
_ => OperationOn::Unknown,
738+
},
739+
catalog: catalog_name,
740+
}
741+
.build()
742+
})?;
743+
744+
// Check schema/table existence
745+
match object_type {
746+
ObjectType::Table | ObjectType::View => {
747+
if let Some(schema) = catalog.schema(&schema_name) {
748+
if !if_exists
749+
&& schema
750+
.table(&table_ref.table)
751+
.await
752+
.context(ex_error::DataFusionSnafu)?
753+
.is_none()
754+
{
755+
ex_error::TableNotFoundInSchemaInDatabaseSnafu {
756+
operation_on: OperationOn::Table(OperationType::Drop),
757+
table: table_ref.table.to_string(),
758+
schema: schema_name,
759+
db: catalog_name.to_string(),
760+
}
761+
.fail()?;
762+
}
763+
} else {
764+
return ex_error::SchemaNotFoundInDatabaseSnafu {
765+
operation_on: OperationOn::Table(OperationType::Drop),
766+
schema: schema_name,
767+
db: catalog_name.to_string(),
768+
}
769+
.fail();
770+
}
771+
}
772+
ObjectType::Schema => {
773+
if !if_exists && catalog.schema(&schema_name).is_none() {
774+
return ex_error::SchemaNotFoundInDatabaseSnafu {
775+
operation_on: OperationOn::Table(OperationType::Drop),
776+
schema: schema_name,
777+
db: catalog_name.to_string(),
778+
}
779+
.fail();
780+
}
781+
}
782+
_ => {}
783+
}
784+
self.execute_logical_plan(plan).await?;
785+
self.status_response()
824786
}
825787

826788
#[allow(clippy::redundant_else, clippy::too_many_lines)]
@@ -2409,7 +2371,22 @@ impl UserQuery {
24092371
catalog: Arc::from(self.current_database()),
24102372
schema,
24112373
},
2412-
_ => schema,
2374+
SchemaReference::Full { .. } => schema,
2375+
}
2376+
}
2377+
2378+
fn schema_ref_to_resolved(&self, schema: SchemaReference) -> ResolvedTableReference {
2379+
match schema {
2380+
SchemaReference::Bare { schema } => ResolvedTableReference {
2381+
catalog: Arc::from(self.current_database()),
2382+
schema,
2383+
table: Arc::from(""),
2384+
},
2385+
SchemaReference::Full { catalog, schema } => ResolvedTableReference {
2386+
catalog,
2387+
schema,
2388+
table: Arc::from(""),
2389+
},
24132390
}
24142391
}
24152392

@@ -3536,34 +3513,3 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe
35363513
table: Arc::from(table_ref.table.to_ascii_lowercase()),
35373514
}
35383515
}
3539-
3540-
const fn is_missing_catalog_entity(err: &IcebergError) -> bool {
3541-
matches!(
3542-
err,
3543-
IcebergError::NotFound(_) | IcebergError::CatalogNotFound
3544-
)
3545-
}
3546-
3547-
#[cfg(test)]
3548-
mod tests {
3549-
use super::{IcebergError, is_missing_catalog_entity};
3550-
3551-
#[test]
3552-
fn detects_catalog_not_found_as_missing_tabular() {
3553-
assert!(is_missing_catalog_entity(&IcebergError::CatalogNotFound));
3554-
}
3555-
3556-
#[test]
3557-
fn detects_not_found_as_missing_tabular() {
3558-
assert!(is_missing_catalog_entity(&IcebergError::NotFound(
3559-
"test".to_string()
3560-
)));
3561-
}
3562-
3563-
#[test]
3564-
fn ignores_other_errors_for_missing_tabular_check() {
3565-
assert!(!is_missing_catalog_entity(&IcebergError::NotSupported(
3566-
"feature".to_string()
3567-
)));
3568-
}
3569-
}

crates/executor/src/snowflake_error.rs

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -520,38 +520,11 @@ fn datafusion_error(df_error: &DataFusionError, subtext: &[&str]) -> SnowflakeEr
520520
} else if let Some(e) = err.downcast_ref::<DFCatalogExternalDFError>() {
521521
let message = e.to_string();
522522
let error_code = ErrorCode::Catalog;
523-
match e {
524-
DFCatalogExternalDFError::OrdinalPositionParamOverflow { .. } => CustomSnafu {
525-
message,
526-
error_code,
527-
}
528-
.build(),
529-
DFCatalogExternalDFError::RidParamDoesntFitInU8 { .. } => CustomSnafu {
530-
message,
531-
error_code,
532-
}
533-
.build(),
534-
DFCatalogExternalDFError::CatalogNotFound { .. } => CustomSnafu {
535-
message,
536-
error_code,
537-
}
538-
.build(),
539-
DFCatalogExternalDFError::CannotResolveViewReference { .. } => CustomSnafu {
540-
message,
541-
error_code,
542-
}
543-
.build(),
544-
DFCatalogExternalDFError::SessionDowncast { .. } => CustomSnafu {
545-
message,
546-
error_code,
547-
}
548-
.build(),
549-
DFCatalogExternalDFError::ObjectStoreNotFound { .. } => CustomSnafu {
550-
message,
551-
error_code,
552-
}
553-
.build(),
523+
CustomSnafu {
524+
message,
525+
error_code,
554526
}
527+
.build()
555528
} else if let Some(e) = err.downcast_ref::<ArrowError>() {
556529
CustomSnafu {
557530
message: e.to_string(),

crates/executor/src/tests/sql/ddl/snapshots/snowflake_error/query_drop_table_if_exists_missing_schema_snowflake_error.snap

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,6 @@ source: crates/executor/src/tests/sql/ddl/table.rs
33
description: "\"DROP TABLE IF EXISTS embucket.missing_schema.some_table\""
44
info: Tests Snowflake Error
55
---
6-
Ok(
7-
[
8-
"+----------------------------------+",
9-
"| status |",
10-
"+----------------------------------+",
11-
"| Statement executed successfully. |",
12-
"+----------------------------------+",
13-
],
6+
Err(
7+
"Snowflake Error: SQL compilation error: Schema 'embucket.missing_schema' does not exist or not authorized",
148
)

0 commit comments

Comments
 (0)