Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 129 additions & 47 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::datafusion::physical_plan::merge::{
use crate::datafusion::rewriters::session_context::SessionContextExprRewriter;
use crate::error::{OperationOn, OperationType};
use crate::models::{QueryContext, QueryResult};
use catalog::schema::CachingSchema;
use catalog::table::{CachingTable, IcebergTableBuilder};
use catalog_metastore::{
AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume,
Expand Down Expand Up @@ -78,6 +79,7 @@ use functions::visitors::{
table_functions_cte_relation, timestamp, top_limit,
unimplemented::functions_checker::visit as unimplemented_functions_checker,
};
use iceberg_rust::catalog::Catalog;
use iceberg_rust::catalog::create::CreateTableBuilder;
use iceberg_rust::catalog::tabular::Tabular;
use iceberg_rust::error::Error as IcebergError;
Expand Down Expand Up @@ -707,7 +709,7 @@ impl UserQuery {
});
}

let table_location = create_table_statement
let _table_location = create_table_statement
.location
.clone()
.or_else(|| create_table_statement.base_location.clone());
Expand Down Expand Up @@ -759,17 +761,42 @@ impl UserQuery {
.build()
})?;

let table_provider: Option<Arc<dyn TableProvider>> = self.create_iceberg_table_provider(
table_ref,
schema_provider.clone(),
table_location,
create_table_statement,
plan.clone(),
)?;
if let Some(provider) = table_provider {
let iceberg_catalog = schema_provider
.as_any()
.downcast_ref::<CachingSchema>()
.and_then(|schema| schema.iceberg_catalog.clone());

if let Some(iceberg_catalog) = iceberg_catalog {
self.create_iceberg_table_with_catalog(
iceberg_catalog,
&table_ref,
&create_table_statement,
&plan,
)
.await?;
if let Some(caching_schema) = schema_provider.as_any().downcast_ref::<CachingSchema>() {
caching_schema.tables_cache.remove(&table_name);
}
schema_provider
.register_table(table_name, provider)
.context(ex_error::DataFusionSnafu)?;
.table(&table_name)
.await
.context(ex_error::DataFusionSnafu)?
.context(ex_error::TableProviderNotFoundSnafu {
table_name: table_name.clone(),
})?;
} else {
let table_provider: Option<Arc<dyn TableProvider>> = self
.create_iceberg_table_provider(
&table_ref,
schema_provider.clone(),
&create_table_statement,
&plan,
)?;
if let Some(provider) = table_provider {
schema_provider
.register_table(table_name.clone(), provider)
.context(ex_error::DataFusionSnafu)?;
}
}

// Insert data to new table
Expand Down Expand Up @@ -821,33 +848,10 @@ impl UserQuery {
self.created_entity_response()
}

#[allow(unused_variables, clippy::needless_pass_by_value)]
#[instrument(
name = "UserQuery::create_iceberg_table",
level = "trace",
skip(self),
err
)]
pub fn create_iceberg_table_provider(
&self,
table_ref: ResolvedTableReference,
schema_provider: Arc<dyn SchemaProvider>,
table_location: Option<String>,
statement: CreateTableStatement,
plan: LogicalPlan,
) -> Result<Option<Arc<dyn TableProvider>>> {
// Check if table already exists, if exists and CREATE OR REPLACE - drop it
if schema_provider.table_exist(&table_ref.table) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why if_not_exists amd or_replace logic removed?

if statement.if_not_exists {
return Ok(None);
}
if statement.or_replace {
schema_provider
.deregister_table(&table_ref.table)
.context(ex_error::DataFusionSnafu)?;
}
}

fn build_iceberg_create_table_builder(
table_ref: &ResolvedTableReference,
plan: &LogicalPlan,
) -> Result<CreateTableBuilder> {
let fields_with_ids = StructType::try_from(&new_fields_with_ids(
&Fields::from(
plan.schema()
Expand All @@ -856,8 +860,7 @@ impl UserQuery {
.iter()
.map(|field| {
if field.data_type() == &DataType::Null {
let new_field = Field::new(field.name(), DataType::Utf8, true);
Arc::new(new_field)
Arc::new(Field::new(field.name(), DataType::Utf8, true))
} else {
field.clone()
}
Expand All @@ -869,17 +872,15 @@ impl UserQuery {
.map_err(|err| DataFusionError::External(Box::new(err)))
.context(ex_error::DataFusionSnafu)?;

// Create builder and configure it
let mut builder = Schema::builder();
builder.with_schema_id(0);
builder.with_identifier_field_ids(vec![]);
let mut schema_builder = Schema::builder();
schema_builder.with_schema_id(0);
schema_builder.with_identifier_field_ids(vec![]);

// Add each struct field individually
for field in fields_with_ids.iter() {
builder.with_struct_field(field.clone());
schema_builder.with_struct_field(field.clone());
}

let schema = builder
let schema = schema_builder
.build()
.map_err(|err| DataFusionError::External(Box::new(err)))
.context(ex_error::DataFusionSnafu)?;
Expand All @@ -888,6 +889,87 @@ impl UserQuery {
builder
.with_name(table_ref.table.to_string())
.with_schema(schema);
Ok(builder)
}

#[instrument(
name = "UserQuery::create_iceberg_table_with_catalog",
level = "trace",
skip(self, iceberg_catalog, table_ref, statement, plan),
err
)]
async fn create_iceberg_table_with_catalog(
&self,
iceberg_catalog: Arc<dyn Catalog>,
table_ref: &ResolvedTableReference,
statement: &CreateTableStatement,
plan: &LogicalPlan,
) -> Result<()> {
let ident = MetastoreTableIdent {
database: table_ref.catalog.to_string(),
schema: table_ref.schema.to_string(),
table: table_ref.table.to_string(),
};
let iceberg_ident = ident.to_iceberg_ident();

let table_exists = iceberg_catalog
.clone()
.load_tabular(&iceberg_ident)
.await
.is_ok();

if table_exists {
if statement.if_not_exists {
return Ok(());
}
if statement.or_replace {
iceberg_catalog
.drop_table(&iceberg_ident)
.await
.context(ex_error::IcebergSnafu)?;
} else {
return ex_error::ObjectAlreadyExistsSnafu {
r#type: ExistingObjectType::Table,
name: ident.to_string(),
}
.fail();
}
}

let mut builder = Self::build_iceberg_create_table_builder(table_ref, plan)?;
builder
.build(&[table_ref.schema.to_string()], iceberg_catalog)
.await
.context(ex_error::IcebergSnafu)?;
Ok(())
}

#[instrument(
name = "UserQuery::create_iceberg_table_provider",
level = "trace",
skip(self, schema_provider, statement, plan),
err
)]
pub fn create_iceberg_table_provider(
&self,
table_ref: &ResolvedTableReference,
schema_provider: Arc<dyn SchemaProvider>,
statement: &CreateTableStatement,
plan: &LogicalPlan,
) -> Result<Option<Arc<dyn TableProvider>>> {
// Check if table already exists, if exists and CREATE OR REPLACE - drop it
if schema_provider.table_exist(&table_ref.table) {
if statement.if_not_exists {
return Ok(None);
}
if statement.or_replace {
schema_provider
.deregister_table(&table_ref.table)
.context(ex_error::DataFusionSnafu)?;
}
}

let builder = Self::build_iceberg_create_table_builder(table_ref, plan)?;
Ok(Some(Arc::new(IcebergTableBuilder::new(builder))))
}

Expand Down