Skip to content

Commit d0300c1

Browse files
authored
Register CSV from CreateStage within current context (#212)
* # This is a combination of 5 commits. # This is the 1st commit message: Make common sql executor Make common sql executor Alter session params to current context Make common sql executor (#209) * Make common sql executor * Make common sql executor # This is the commit message #2: Fix deployment (#211) * Fix deployment * Fix binary name # This is the commit message #3: Fix tmp EOF until fix released # This is the commit message #4: Register CSV within current context # This is the commit message #5: Register CSV within current context * Register CSV within current context
1 parent 3140339 commit d0300c1

1 file changed

Lines changed: 8 additions & 14 deletions

File tree

crates/runtime/src/datafusion/execution.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl SqlExecutor {
114114
}
115115
Statement::CreateStage { .. } => {
116116
// We support only CSV uploads for now
117-
return Box::pin(self.create_stage_query(*s, warehouse_name)).await;
117+
return Box::pin(self.create_stage_query(*s)).await;
118118
}
119119
Statement::CopyIntoSnowflake { .. } => {
120120
return Box::pin(self.copy_into_snowflake_query(*s, warehouse_name)).await;
@@ -374,7 +374,6 @@ impl SqlExecutor {
374374
pub async fn create_stage_query(
375375
&self,
376376
statement: Statement,
377-
warehouse_name: &str,
378377
) -> IcehutSQLResult<Vec<RecordBatch>> {
379378
if let Statement::CreateStage {
380379
name,
@@ -412,7 +411,6 @@ impl SqlExecutor {
412411
.unwrap_or(b'"');
413412

414413
let file_path = stage_params.url.unwrap_or_default();
415-
let stage_table_name = format!("stage_{table_name}");
416414
let url =
417415
Url::parse(file_path.as_str()).map_err(|_| IcehutSQLError::InvalidIdentifier {
418416
ident: file_path.clone(),
@@ -458,7 +456,7 @@ impl SqlExecutor {
458456
// Register CSV file with filled missing datatype with default Utf8
459457
self.ctx
460458
.register_csv(
461-
stage_table_name.clone(),
459+
table_name.value.clone(),
462460
file_path,
463461
CsvReadOptions::new()
464462
.has_header(skip_header)
@@ -467,13 +465,7 @@ impl SqlExecutor {
467465
)
468466
.await
469467
.context(ih_error::DataFusionSnafu)?;
470-
471-
// Create stages database and create table with prepared schema
472-
// TODO Don't create table in case we have common ctx
473-
self.create_database(warehouse_name, ObjectName(vec![Ident::new("stages")]), true)
474-
.await?;
475-
let create_query = format!("CREATE TABLE {warehouse_name}.stages.{table_name} AS (SELECT * FROM {stage_table_name})");
476-
self.query(&create_query, warehouse_name, "").await
468+
Ok(vec![])
477469
} else {
478470
Err(IcehutSQLError::DataFusion {
479471
source: DataFusionError::NotImplemented(
@@ -493,9 +485,8 @@ impl SqlExecutor {
493485
} = statement
494486
{
495487
// Insert data to table
496-
let from_query = from_stage.to_string().replace('@', "");
497-
let insert_query =
498-
format!("INSERT INTO {into} SELECT * FROM {warehouse_name}.stages.{from_query}");
488+
let stage_name = from_stage.to_string().replace('@', "");
489+
let insert_query = format!("INSERT INTO {into} SELECT * FROM {stage_name}");
499490
self.execute_with_custom_plan(&insert_query, warehouse_name)
500491
.await
501492
} else {
@@ -961,6 +952,9 @@ impl SqlExecutor {
961952

962953
match statement.clone() {
963954
DFStatement::Statement(s) => match *s {
955+
Statement::CopyIntoSnowflake { into, .. } => {
956+
Some(TableReference::parse_str(&into.to_string()))
957+
}
964958
Statement::Drop { names, .. } => {
965959
Some(TableReference::parse_str(&names[0].to_string()))
966960
}

0 commit comments

Comments
 (0)