Skip to content

Commit 90a2d05

Browse files
authored
Restore create schema (#442)
* Create schema * Create schema * Create schema * Rename test * Fix comments
1 parent b9e4914 commit 90a2d05

5 files changed

Lines changed: 104 additions & 67 deletions

File tree

crates/metastore/src/models/schema.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ impl IceBucketSchemaIdent {
4040
}
4141
}
4242

43+
impl std::fmt::Display for IceBucketSchemaIdent {
44+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45+
write!(f, "{}.{}", self.database, self.schema)
46+
}
47+
}
48+
4349
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, utoipa::ToSchema)]
4450
pub struct IceBucketSchema {
4551
pub ident: IceBucketSchemaIdent,

crates/runtime/src/execution/query.rs

Lines changed: 63 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ use iceberg_rust::catalog::Catalog;
4242
use iceberg_rust::spec::arrow::schema::new_fields_with_ids;
4343
use iceberg_rust::spec::schema::Schema;
4444
use iceberg_rust::spec::types::StructType;
45+
use iceberg_rust_spec::namespace::Namespace;
4546
use icebucket_metastore::{
46-
IceBucketSchema, IceBucketSchemaIdent, IceBucketTableCreateRequest, IceBucketTableFormat,
47-
IceBucketTableIdent, Metastore,
47+
IceBucketSchemaIdent, IceBucketTableCreateRequest, IceBucketTableFormat, IceBucketTableIdent,
48+
Metastore,
4849
};
4950
use object_store::aws::AmazonS3Builder;
5051
use serde::{Deserialize, Serialize};
@@ -256,11 +257,8 @@ impl IceBucketQuery {
256257
.create_database(db_name, if_not_exists)
257258
.await;*/
258259
}
259-
Statement::CreateSchema {
260-
schema_name,
261-
if_not_exists,
262-
} => {
263-
let result = self.create_schema(schema_name, if_not_exists).await;
260+
Statement::CreateSchema { .. } => {
261+
let result = Box::pin(self.create_schema(*s)).await;
264262
self.refresh_catalog().await?;
265263
return result;
266264
}
@@ -586,7 +584,7 @@ impl IceBucketQuery {
586584
}
587585

588586
/// This is experimental CREATE STAGE support
589-
/// Current limitations
587+
/// Current limitations
590588
/// TODO
591589
/// - Prepare object storage depending on the URL. Currently we support only s3 public buckets /// with public access with default eu-central-1 region
592590
/// - Parse credentials from specified config
@@ -811,54 +809,55 @@ impl IceBucketQuery {
811809
}
812810

813811
#[tracing::instrument(level = "trace", skip(self), err, ret)]
814-
pub async fn create_schema(
815-
&self,
816-
name: SchemaName,
817-
if_not_exists: bool,
818-
) -> ExecutionResult<Vec<RecordBatch>> {
819-
match name {
820-
SchemaName::Simple(schema_name) => {
821-
let object_name = self.resolve_schema_ident(schema_name.0)?;
822-
823-
let database_name = object_name.0[0].clone().to_string();
824-
let schema_name = object_name.0[1].clone().to_string();
812+
pub async fn create_schema(&self, statement: Statement) -> ExecutionResult<Vec<RecordBatch>> {
813+
let Statement::CreateSchema {
814+
schema_name,
815+
if_not_exists,
816+
} = statement.clone()
817+
else {
818+
return Err(ExecutionError::DataFusion {
819+
source: DataFusionError::NotImplemented(
820+
"Only CREATE SCHEMA statements are supported".to_string(),
821+
),
822+
});
823+
};
825824

826-
let icebucket_schema_ident = IceBucketSchemaIdent {
827-
database: database_name.clone(),
828-
schema: schema_name.clone(),
829-
};
825+
let SchemaName::Simple(schema_name) = schema_name else {
826+
return Err(ExecutionError::DataFusion {
827+
source: DataFusionError::NotImplemented(
828+
"Only simple schema names are supported".to_string(),
829+
),
830+
});
831+
};
830832

831-
let exists = self
832-
.metastore
833-
.get_schema(&icebucket_schema_ident)
834-
.await
835-
.context(ex_error::MetastoreSnafu)?
836-
.is_some();
833+
let ident: IceBucketSchemaIdent = self.resolve_schema_ident(schema_name.0)?.into();
834+
let catalog = self.get_catalog(ident.database.as_str())?;
835+
if catalog.schema(ident.schema.as_str()).is_some() && if_not_exists {
836+
return Err(ExecutionError::ObjectAlreadyExists {
837+
type_name: "schema".to_string(),
838+
name: ident.schema,
839+
});
840+
}
837841

838-
if exists && if_not_exists {
839-
return Err(ExecutionError::ObjectAlreadyExists {
840-
type_name: "schema".to_string(),
841-
name: schema_name.to_string(),
842-
});
843-
} else if !exists {
844-
let icebucket_schema = IceBucketSchema {
845-
ident: icebucket_schema_ident.clone(),
846-
properties: None,
847-
};
848-
self.metastore
849-
.create_schema(&icebucket_schema_ident, icebucket_schema)
850-
.await
851-
.context(ex_error::MetastoreSnafu)?;
842+
let plan = self.sql_statement_to_plan(statement).await?;
843+
let downcast_result = self.resolve_iceberg_catalog_or_execute(catalog, plan).await;
844+
let iceberg_catalog = match downcast_result {
845+
IcebergCatalogResult::Catalog(catalog) => catalog,
846+
IcebergCatalogResult::Result(result) => {
847+
return match result {
848+
Ok(_) => Ok(vec![]),
849+
Err(err) => Err(err),
852850
}
853851
}
854-
_ => {
855-
return Err(ExecutionError::DataFusion {
856-
source: DataFusionError::NotImplemented(
857-
"Only simple schema names are supported".to_string(),
858-
),
859-
});
860-
}
861-
}
852+
};
853+
854+
let namespace = Namespace::try_new(&[ident.schema])
855+
.map_err(|err| DataFusionError::External(Box::new(err)))
856+
.context(ex_error::DataFusionSnafu)?;
857+
iceberg_catalog
858+
.create_namespace(&namespace, None)
859+
.await
860+
.context(ex_error::IcebergSnafu)?;
862861
created_entity_response()
863862
}
864863

@@ -1543,21 +1542,24 @@ impl IceBucketQuery {
15431542
&self,
15441543
mut schema_ident: Vec<Ident>,
15451544
) -> ExecutionResult<NormalizedIdent> {
1546-
let database = self.current_database();
1547-
if schema_ident.len() == 1 {
1548-
if let Some(database) = database {
1549-
schema_ident.insert(0, Ident::new(database));
1550-
} else {
1545+
match schema_ident.len() {
1546+
1 => match self.current_database() {
1547+
Some(database) => {
1548+
schema_ident.insert(0, Ident::new(database));
1549+
}
1550+
None => {
1551+
return Err(ExecutionError::InvalidSchemaIdentifier {
1552+
ident: NormalizedIdent(schema_ident).to_string(),
1553+
});
1554+
}
1555+
},
1556+
2 => {}
1557+
_ => {
15511558
return Err(ExecutionError::InvalidSchemaIdentifier {
15521559
ident: NormalizedIdent(schema_ident).to_string(),
15531560
});
15541561
}
1555-
} else {
1556-
return Err(ExecutionError::InvalidSchemaIdentifier {
1557-
ident: NormalizedIdent(schema_ident).to_string(),
1558-
});
15591562
}
1560-
15611563
Ok(NormalizedIdent(
15621564
schema_ident
15631565
.iter()

crates/runtime/src/execution/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl ExecutionService {
220220
};
221221

222222
let query = user_session.query(&query, IceBucketQueryContext::default());
223-
query.execute().await?;
223+
Box::pin(query.execute()).await?;
224224

225225
user_session
226226
.ctx

crates/runtime/src/execution/tests/query.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ async fn test_context_name_injection() {
302302

303303
#[tokio::test]
304304
async fn test_create_table_with_timestamp_nanosecond() {
305-
let (execution_svc, session_id) = prepare_env().await;
305+
let (execution_svc, _, session_id) = prepare_env().await;
306306
let table_ident = IceBucketTableIdent {
307307
database: "icebucket".to_string(),
308308
schema: "public".to_string(),
@@ -329,7 +329,7 @@ async fn test_create_table_with_timestamp_nanosecond() {
329329

330330
#[tokio::test]
331331
async fn test_drop_table() {
332-
let (execution_svc, session_id) = prepare_env().await;
332+
let (execution_svc, _, session_id) = prepare_env().await;
333333
let table_ident = IceBucketTableIdent {
334334
database: "icebucket".to_string(),
335335
schema: "public".to_string(),
@@ -374,7 +374,26 @@ async fn test_drop_table() {
374374
}
375375
}
376376

377-
async fn prepare_env() -> (ExecutionService, String) {
377+
#[tokio::test]
378+
async fn test_create_schema() {
379+
let (execution_svc, metastore, session_id) = prepare_env().await;
380+
let schema_ident = IceBucketSchemaIdent {
381+
database: "icebucket".to_string(),
382+
schema: "public_new".to_string(),
383+
};
384+
let query = format!("CREATE SCHEMA {schema_ident};");
385+
execution_svc
386+
.query(&session_id, &query, IceBucketQueryContext::default())
387+
.await
388+
.expect("Failed to execute query");
389+
// TODO use "SHOW SCHEMAS" sql
390+
metastore
391+
.get_schema(&schema_ident)
392+
.await
393+
.expect("Failed to get schema");
394+
}
395+
396+
async fn prepare_env() -> (ExecutionService, Arc<SlateDBMetastore>, String) {
378397
let metastore = SlateDBMetastore::new_in_memory().await;
379398
metastore
380399
.create_volume(
@@ -424,5 +443,5 @@ async fn prepare_env() -> (ExecutionService, String) {
424443
.create_session(session_id.to_string())
425444
.await
426445
.expect("Failed to create session");
427-
(execution_svc, session_id.to_string())
446+
(execution_svc, metastore, session_id.to_string())
428447
}

crates/runtime/src/execution/utils.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use chrono::DateTime;
2828
use datafusion::arrow::array::ArrayRef;
2929
use datafusion::arrow::datatypes::DataType;
3030
use datafusion::common::Result as DataFusionResult;
31-
use icebucket_metastore::IceBucketTableIdent;
31+
use icebucket_metastore::{IceBucketSchemaIdent, IceBucketTableIdent};
3232
use sqlparser::ast::{Ident, ObjectName};
3333
use std::collections::HashMap;
3434
use std::sync::Arc;
@@ -393,6 +393,16 @@ impl From<NormalizedIdent> for IceBucketTableIdent {
393393
}
394394
}
395395

396+
impl From<NormalizedIdent> for IceBucketSchemaIdent {
397+
fn from(ident: NormalizedIdent) -> Self {
398+
let ident = ident.0;
399+
Self {
400+
schema: ident[1].value.clone(),
401+
database: ident[0].value.clone(),
402+
}
403+
}
404+
}
405+
396406
impl From<NormalizedIdent> for ObjectName {
397407
fn from(ident: NormalizedIdent) -> Self {
398408
Self(ident.0)

0 commit comments

Comments
 (0)