Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::vec;
Expand All @@ -30,7 +30,7 @@ use datafusion_common::error::add_possible_columns_to_diag;
use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
use datafusion_common::{
DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
plan_datafusion_err,
plan_datafusion_err, schema_err,
};
use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
pub use datafusion_expr::planner::ContextProvider;
Expand All @@ -40,6 +40,24 @@ use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};

/// Ensure a logical plan's output schema has no duplicate (unqualified) column
/// names. `CREATE TABLE` / `CREATE VIEW` / `SELECT INTO` persist only
/// unqualified names, so a schema with duplicates produces an object that
/// cannot be selected from (see
/// <https://github.com/apache/datafusion/issues/22167>,
/// <https://github.com/apache/datafusion/issues/13487>).
pub(crate) fn ensure_unique_column_names(schema: &DFSchema) -> Result<()> {
let mut seen = HashSet::with_capacity(schema.fields().len());
for field in schema.fields() {
if !seen.insert(field.name().as_str()) {
return schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
});
}
}
Ok(())
}

/// SQL parser options
#[derive(Debug, Clone, Copy)]
pub struct ParserOptions {
Expand Down
30 changes: 18 additions & 12 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::planner::{
ContextProvider, PlannerContext, SqlToRel, ensure_unique_column_names,
};

use crate::stack::StackGuard;
use datafusion_common::{Constraints, DFSchema, Result, not_impl_err};
Expand Down Expand Up @@ -364,17 +366,21 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
select_into: Option<SelectInto>,
) -> Result<LogicalPlan> {
match select_into {
Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::default(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
temporary: false,
column_defaults: vec![],
},
))),
Some(into) => {
ensure_unique_column_names(plan.schema())?;

Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::default(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
temporary: false,
column_defaults: vec![],
},
)))
}
_ => Ok(plan),
}
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::parser::{
LexOrdering, ResetStatement, Statement as DFStatement,
};
use crate::planner::{
ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
ContextProvider, PlannerContext, SqlToRel, ensure_unique_column_names,
object_name_to_qualifier,
};
use crate::utils::normalize_ident;

Expand Down Expand Up @@ -545,6 +546,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan
};

ensure_unique_column_names(plan.schema())?;

let constraints = self.new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
Expand Down Expand Up @@ -673,6 +676,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
plan = self.apply_expr_alias(plan, columns)?;

ensure_unique_column_names(plan.schema())?;

Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name: self.object_name_to_table_reference(name)?,
input: Arc::new(plan),
Expand Down
20 changes: 20 additions & 0 deletions datafusion/sqllogictest/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,23 @@ drop view v;

statement count 0
drop table t;

# https://github.com/apache/datafusion/issues/22167
# https://github.com/apache/datafusion/issues/13487
# CREATE TABLE AS / CREATE VIEW must reject queries whose result schema
# has duplicate (unqualified) column names, since the resulting object
# stores only unqualified names and would otherwise be unselectable.
statement ok
CREATE TABLE dup_src AS VALUES(1, 2);

statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1
CREATE TABLE dup_ctas AS SELECT * FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2;

statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1
CREATE VIEW dup_view AS SELECT * FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2;

statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1
SELECT * INTO dup_into FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2;

statement ok
DROP TABLE dup_src;
Loading