From 814ce9513a7dfd8582a0070226d6424fa63b9cf6 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Sun, 17 May 2026 10:12:11 +0530 Subject: [PATCH 1/2] fix(sql): reject duplicate unqualified names in CTAS, CREATE VIEW, and SELECT INTO --- datafusion/sql/src/planner.rs | 22 ++++++++++++++-- datafusion/sql/src/query.rs | 30 +++++++++++++--------- datafusion/sql/src/statement.rs | 7 ++++- datafusion/sqllogictest/test_files/ddl.slt | 20 +++++++++++++++ 4 files changed, 64 insertions(+), 15 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index df0e68eab67e4..ac7f866bf9e23 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -16,7 +16,7 @@ // under the License. //! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST) -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use std::vec; @@ -30,7 +30,7 @@ use datafusion_common::error::add_possible_columns_to_diag; use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err}; use datafusion_common::{ DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err, - plan_datafusion_err, + plan_datafusion_err, schema_err, }; use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder}; pub use datafusion_expr::planner::ContextProvider; @@ -40,6 +40,24 @@ use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo}; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias}; +/// Ensure a logical plan's output schema has no duplicate (unqualified) column +/// names. `CREATE TABLE` / `CREATE VIEW` / `SELECT INTO` persist only +/// unqualified names, so a schema with duplicates produces an object that +/// cannot be selected from (see +/// , +/// ). +pub(crate) fn ensure_unique_column_names(schema: &DFSchema) -> Result<()> { + let mut seen = HashSet::with_capacity(schema.fields().len()); + for field in schema.fields() { + if !seen.insert(field.name().as_str()) { + return schema_err!(SchemaError::DuplicateUnqualifiedField { + name: field.name().to_string(), + }); + } + } + Ok(()) +} + /// SQL parser options #[derive(Debug, Clone, Copy)] pub struct ParserOptions { diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index e320d2ee6e9c1..037405b57efff 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -17,7 +17,9 @@ use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ + ContextProvider, PlannerContext, SqlToRel, ensure_unique_column_names, +}; use crate::stack::StackGuard; use datafusion_common::{Constraints, DFSchema, Result, not_impl_err}; @@ -364,17 +366,21 @@ impl SqlToRel<'_, S> { select_into: Option, ) -> Result { match select_into { - Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(into.name)?, - constraints: Constraints::default(), - input: Arc::new(plan), - if_not_exists: false, - or_replace: false, - temporary: false, - column_defaults: vec![], - }, - ))), + Some(into) => { + ensure_unique_column_names(plan.schema())?; + + Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( + CreateMemoryTable { + name: self.object_name_to_table_reference(into.name)?, + constraints: Constraints::default(), + input: Arc::new(plan), + if_not_exists: false, + or_replace: false, + temporary: false, + column_defaults: vec![], + }, + ))) + } _ => Ok(plan), } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 587ed02d13188..b7efbc3cbec44 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -25,7 +25,8 @@ use crate::parser::{ LexOrdering, ResetStatement, Statement as DFStatement, }; use crate::planner::{ - ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier, + ContextProvider, PlannerContext, SqlToRel, ensure_unique_column_names, + object_name_to_qualifier, }; use crate::utils::normalize_ident; @@ -545,6 +546,8 @@ impl SqlToRel<'_, S> { plan }; + ensure_unique_column_names(plan.schema())?; + let constraints = self.new_constraint_from_table_constraints( &all_constraints, plan.schema(), @@ -673,6 +676,8 @@ impl SqlToRel<'_, S> { let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?; plan = self.apply_expr_alias(plan, columns)?; + ensure_unique_column_names(plan.schema())?; + Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { name: self.object_name_to_table_reference(name)?, input: Arc::new(plan), diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 977d2d03a1d07..82c30e9aba386 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -971,3 +971,23 @@ drop view v; statement count 0 drop table t; + +# https://github.com/apache/datafusion/issues/22167 +# https://github.com/apache/datafusion/issues/13487 +# CREATE TABLE AS / CREATE VIEW must reject queries whose result schema +# has duplicate (unqualified) column names, since the resulting object +# stores only unqualified names and would otherwise be unselectable. +statement ok +CREATE TABLE dup_src AS VALUES(1, 2); + +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1 +CREATE TABLE dup_ctas AS SELECT * FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2; + +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1 +CREATE VIEW dup_view AS SELECT * FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2; + +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1 +SELECT * INTO dup_into FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2; + +statement ok +DROP TABLE dup_src; From a40612dd2a8e5830b7f03e685c1ba3e8c187eb5b Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 19 May 2026 10:18:01 +0530 Subject: [PATCH 2/2] move from planner time check to execution time --- datafusion/core/src/execution/context/mod.rs | 25 ++- datafusion/core/tests/dataframe/mod.rs | 172 ++++++++++++++++++- datafusion/sql/src/planner.rs | 22 +-- datafusion/sql/src/query.rs | 30 ++-- datafusion/sql/src/statement.rs | 7 +- 5 files changed, 205 insertions(+), 51 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d84ef0c898313..c43228cce9d26 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -67,10 +67,10 @@ use datafusion_catalog::{ use datafusion_common::config::{ConfigField, ConfigOptions}; use datafusion_common::metadata::ScalarAndMetadata; use datafusion_common::{ - DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference, + DFSchema, DataFusionError, ParamValues, SchemaError, SchemaReference, TableReference, config::{ConfigExtension, TableOptions}, exec_datafusion_err, exec_err, internal_datafusion_err, not_impl_err, - plan_datafusion_err, plan_err, + plan_datafusion_err, plan_err, schema_err, tree_node::{TreeNodeRecursion, TreeNodeVisitor}, }; pub use datafusion_execution::TaskContext; @@ -886,6 +886,7 @@ impl SessionContext { match (if_not_exists, or_replace, table) { (true, false, Ok(_)) => self.return_empty_dataframe(), (false, true, Ok(_)) => { + Self::ensure_unique_column_names(input.schema())?; self.deregister_table(name.clone())?; let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); @@ -905,6 +906,7 @@ impl SessionContext { exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") } (_, _, Err(_)) => { + Self::ensure_unique_column_names(input.schema())?; let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); @@ -950,14 +952,16 @@ impl SessionContext { match (or_replace, view) { (true, Ok(_)) => { - self.deregister_table(name.clone())?; let input = Self::apply_type_coercion(Arc::unwrap_or_clone(input))?; + Self::ensure_unique_column_names(input.schema())?; + self.deregister_table(name.clone())?; let table = Arc::new(ViewTable::new(input, definition)); self.register_table(name, table)?; self.return_empty_dataframe() } (_, Err(_)) => { let input = Self::apply_type_coercion(Arc::unwrap_or_clone(input))?; + Self::ensure_unique_column_names(input.schema())?; let table = Arc::new(ViewTable::new(input, definition)); self.register_table(name, table)?; self.return_empty_dataframe() @@ -966,6 +970,21 @@ impl SessionContext { } } + fn ensure_unique_column_names(schema: &DFSchema) -> Result<()> { + // DFSchema name checks allow duplicate unqualified names as long as their + // qualifiers differ. DDL persistence drops qualifiers, so this helper must + // enforce uniqueness on the final unqualified names instead. + let mut seen = HashSet::with_capacity(schema.fields().len()); + for field in schema.fields() { + if !seen.insert(field.name().as_str()) { + return schema_err!(SchemaError::DuplicateUnqualifiedField { + name: field.name().to_string(), + }); + } + } + Ok(()) + } + async fn create_catalog_schema(&self, cmd: CreateCatalogSchema) -> Result { let CreateCatalogSchema { schema_name, diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 1418dcad1150e..b4d69f54b1e48 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -75,10 +75,11 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::expr::{GroupingSet, NullTreatment, Sort, WindowFunction}; use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ - Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, LogicalPlanBuilder, - ScalarFunctionImplementation, SortExpr, TableType, WindowFrame, WindowFrameBound, - WindowFrameUnits, WindowFunctionDefinition, cast, col, create_udf, exists, - in_subquery, lit, out_ref_col, placeholder, scalar_subquery, when, wildcard, + CreateMemoryTable, CreateView, DdlStatement, Expr, ExprFunctionExt, ExprSchemable, + LogicalPlan, LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, TableType, + WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, cast, col, + create_udf, exists, in_subquery, lit, out_ref_col, placeholder, scalar_subquery, + when, wildcard, }; use datafusion_physical_expr::Partitioning; use datafusion_physical_expr::aggregate::AggregateExprBuilder; @@ -6750,6 +6751,169 @@ async fn test_copy_to_preserves_order() -> Result<()> { Ok(()) } +fn duplicate_unqualified_name_batch() -> Result { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec![ + "abcDEF", + "abc123", + "CBAdef", + "123AbcDef", + ])), + Arc::new(Int32Array::from(vec![1, 10, 10, 100])), + ], + ) + .map_err(Into::into) +} + +async fn duplicate_unqualified_name_input( + ctx: &SessionContext, + name: &str, +) -> Result { + ctx.register_batch(name, duplicate_unqualified_name_batch()?)?; + + let left = ctx.table(name).await?; + let right = left.clone().alias("t2")?; + Ok(left + .join(right, JoinType::Inner, &["b"], &["b"], None)? + .into_unoptimized_plan()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_create_memory_table() +-> Result<()> { + let ctx = SessionContext::new(); + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + + let plan = LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { + name: TableReference::bare("dup_out"), + constraints: Constraints::default(), + input: Arc::new(input), + if_not_exists: false, + or_replace: false, + temporary: false, + column_defaults: vec![], + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + assert!(ctx.table("dup_out").await.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_replace_table() +-> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE dup_out AS VALUES ('seed', 0)") + .await? + .collect() + .await?; + + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + let plan = LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { + name: TableReference::bare("dup_out"), + constraints: Constraints::default(), + input: Arc::new(input), + if_not_exists: false, + or_replace: true, + temporary: false, + column_defaults: vec![], + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + + let rows = ctx.sql("SELECT * FROM dup_out").await?.collect().await?; + assert_batches_eq!( + [ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| seed | 0 |", + "+---------+---------+", + ], + &rows + ); + + Ok(()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_create_view() +-> Result<()> { + let ctx = SessionContext::new(); + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + let plan = LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + name: TableReference::bare("dup_view"), + input: Arc::new(input), + or_replace: false, + definition: None, + temporary: false, + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + assert!(ctx.table("dup_view").await.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_replace_view() +-> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE VIEW dup_view AS SELECT 'seed' AS column1, 0 AS column2") + .await? + .collect() + .await?; + + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + let plan = LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + name: TableReference::bare("dup_view"), + input: Arc::new(input), + or_replace: true, + definition: None, + temporary: false, + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + + let rows = ctx.sql("SELECT * FROM dup_view").await?.collect().await?; + assert_batches_eq!( + [ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| seed | 0 |", + "+---------+---------+", + ], + &rows + ); + + Ok(()) +} + #[tokio::test] async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index ac7f866bf9e23..df0e68eab67e4 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -16,7 +16,7 @@ // under the License. //! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST) -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::vec; @@ -30,7 +30,7 @@ use datafusion_common::error::add_possible_columns_to_diag; use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err}; use datafusion_common::{ DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err, - plan_datafusion_err, schema_err, + plan_datafusion_err, }; use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder}; pub use datafusion_expr::planner::ContextProvider; @@ -40,24 +40,6 @@ use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo}; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias}; -/// Ensure a logical plan's output schema has no duplicate (unqualified) column -/// names. `CREATE TABLE` / `CREATE VIEW` / `SELECT INTO` persist only -/// unqualified names, so a schema with duplicates produces an object that -/// cannot be selected from (see -/// , -/// ). -pub(crate) fn ensure_unique_column_names(schema: &DFSchema) -> Result<()> { - let mut seen = HashSet::with_capacity(schema.fields().len()); - for field in schema.fields() { - if !seen.insert(field.name().as_str()) { - return schema_err!(SchemaError::DuplicateUnqualifiedField { - name: field.name().to_string(), - }); - } - } - Ok(()) -} - /// SQL parser options #[derive(Debug, Clone, Copy)] pub struct ParserOptions { diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 037405b57efff..e320d2ee6e9c1 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -17,9 +17,7 @@ use std::sync::Arc; -use crate::planner::{ - ContextProvider, PlannerContext, SqlToRel, ensure_unique_column_names, -}; +use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; use datafusion_common::{Constraints, DFSchema, Result, not_impl_err}; @@ -366,21 +364,17 @@ impl SqlToRel<'_, S> { select_into: Option, ) -> Result { match select_into { - Some(into) => { - ensure_unique_column_names(plan.schema())?; - - Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(into.name)?, - constraints: Constraints::default(), - input: Arc::new(plan), - if_not_exists: false, - or_replace: false, - temporary: false, - column_defaults: vec![], - }, - ))) - } + Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( + CreateMemoryTable { + name: self.object_name_to_table_reference(into.name)?, + constraints: Constraints::default(), + input: Arc::new(plan), + if_not_exists: false, + or_replace: false, + temporary: false, + column_defaults: vec![], + }, + ))), _ => Ok(plan), } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index b7efbc3cbec44..587ed02d13188 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -25,8 +25,7 @@ use crate::parser::{ LexOrdering, ResetStatement, Statement as DFStatement, }; use crate::planner::{ - ContextProvider, PlannerContext, SqlToRel, ensure_unique_column_names, - object_name_to_qualifier, + ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier, }; use crate::utils::normalize_ident; @@ -546,8 +545,6 @@ impl SqlToRel<'_, S> { plan }; - ensure_unique_column_names(plan.schema())?; - let constraints = self.new_constraint_from_table_constraints( &all_constraints, plan.schema(), @@ -676,8 +673,6 @@ impl SqlToRel<'_, S> { let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?; plan = self.apply_expr_alias(plan, columns)?; - ensure_unique_column_names(plan.schema())?; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { name: self.object_name_to_table_reference(name)?, input: Arc::new(plan),