diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d84ef0c898313..c43228cce9d26 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -67,10 +67,10 @@ use datafusion_catalog::{ use datafusion_common::config::{ConfigField, ConfigOptions}; use datafusion_common::metadata::ScalarAndMetadata; use datafusion_common::{ - DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference, + DFSchema, DataFusionError, ParamValues, SchemaError, SchemaReference, TableReference, config::{ConfigExtension, TableOptions}, exec_datafusion_err, exec_err, internal_datafusion_err, not_impl_err, - plan_datafusion_err, plan_err, + plan_datafusion_err, plan_err, schema_err, tree_node::{TreeNodeRecursion, TreeNodeVisitor}, }; pub use datafusion_execution::TaskContext; @@ -886,6 +886,7 @@ impl SessionContext { match (if_not_exists, or_replace, table) { (true, false, Ok(_)) => self.return_empty_dataframe(), (false, true, Ok(_)) => { + Self::ensure_unique_column_names(input.schema())?; self.deregister_table(name.clone())?; let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); @@ -905,6 +906,7 @@ impl SessionContext { exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") } (_, _, Err(_)) => { + Self::ensure_unique_column_names(input.schema())?; let schema = Arc::clone(input.schema().inner()); let physical = DataFrame::new(self.state(), input); @@ -950,14 +952,16 @@ impl SessionContext { match (or_replace, view) { (true, Ok(_)) => { - self.deregister_table(name.clone())?; let input = Self::apply_type_coercion(Arc::unwrap_or_clone(input))?; + Self::ensure_unique_column_names(input.schema())?; + self.deregister_table(name.clone())?; let table = Arc::new(ViewTable::new(input, definition)); self.register_table(name, table)?; self.return_empty_dataframe() } (_, Err(_)) => { let input = Self::apply_type_coercion(Arc::unwrap_or_clone(input))?; + Self::ensure_unique_column_names(input.schema())?; let table = Arc::new(ViewTable::new(input, definition)); self.register_table(name, table)?; self.return_empty_dataframe() @@ -966,6 +970,21 @@ impl SessionContext { } } + fn ensure_unique_column_names(schema: &DFSchema) -> Result<()> { + // DFSchema name checks allow duplicate unqualified names as long as their + // qualifiers differ. DDL persistence drops qualifiers, so this helper must + // enforce uniqueness on the final unqualified names instead. + let mut seen = HashSet::with_capacity(schema.fields().len()); + for field in schema.fields() { + if !seen.insert(field.name().as_str()) { + return schema_err!(SchemaError::DuplicateUnqualifiedField { + name: field.name().to_string(), + }); + } + } + Ok(()) + } + async fn create_catalog_schema(&self, cmd: CreateCatalogSchema) -> Result { let CreateCatalogSchema { schema_name, diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 1418dcad1150e..b4d69f54b1e48 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -75,10 +75,11 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::expr::{GroupingSet, NullTreatment, Sort, WindowFunction}; use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ - Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, LogicalPlanBuilder, - ScalarFunctionImplementation, SortExpr, TableType, WindowFrame, WindowFrameBound, - WindowFrameUnits, WindowFunctionDefinition, cast, col, create_udf, exists, - in_subquery, lit, out_ref_col, placeholder, scalar_subquery, when, wildcard, + CreateMemoryTable, CreateView, DdlStatement, Expr, ExprFunctionExt, ExprSchemable, + LogicalPlan, LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, TableType, + WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, cast, col, + create_udf, exists, in_subquery, lit, out_ref_col, placeholder, scalar_subquery, + when, wildcard, }; use datafusion_physical_expr::Partitioning; use datafusion_physical_expr::aggregate::AggregateExprBuilder; @@ -6750,6 +6751,169 @@ async fn test_copy_to_preserves_order() -> Result<()> { Ok(()) } +fn duplicate_unqualified_name_batch() -> Result { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec![ + "abcDEF", + "abc123", + "CBAdef", + "123AbcDef", + ])), + Arc::new(Int32Array::from(vec![1, 10, 10, 100])), + ], + ) + .map_err(Into::into) +} + +async fn duplicate_unqualified_name_input( + ctx: &SessionContext, + name: &str, +) -> Result { + ctx.register_batch(name, duplicate_unqualified_name_batch()?)?; + + let left = ctx.table(name).await?; + let right = left.clone().alias("t2")?; + Ok(left + .join(right, JoinType::Inner, &["b"], &["b"], None)? + .into_unoptimized_plan()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_create_memory_table() +-> Result<()> { + let ctx = SessionContext::new(); + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + + let plan = LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { + name: TableReference::bare("dup_out"), + constraints: Constraints::default(), + input: Arc::new(input), + if_not_exists: false, + or_replace: false, + temporary: false, + column_defaults: vec![], + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + assert!(ctx.table("dup_out").await.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_replace_table() +-> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE dup_out AS VALUES ('seed', 0)") + .await? + .collect() + .await?; + + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + let plan = LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { + name: TableReference::bare("dup_out"), + constraints: Constraints::default(), + input: Arc::new(input), + if_not_exists: false, + or_replace: true, + temporary: false, + column_defaults: vec![], + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + + let rows = ctx.sql("SELECT * FROM dup_out").await?.collect().await?; + assert_batches_eq!( + [ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| seed | 0 |", + "+---------+---------+", + ], + &rows + ); + + Ok(()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_create_view() +-> Result<()> { + let ctx = SessionContext::new(); + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + let plan = LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + name: TableReference::bare("dup_view"), + input: Arc::new(input), + or_replace: false, + definition: None, + temporary: false, + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + assert!(ctx.table("dup_view").await.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn execute_logical_plan_rejects_duplicate_unqualified_names_in_replace_view() +-> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE VIEW dup_view AS SELECT 'seed' AS column1, 0 AS column2") + .await? + .collect() + .await?; + + let input = duplicate_unqualified_name_input(&ctx, "t1").await?; + let plan = LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + name: TableReference::bare("dup_view"), + input: Arc::new(input), + or_replace: true, + definition: None, + temporary: false, + })); + + let err = ctx.execute_logical_plan(plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "Schema contains duplicate unqualified field name a" + ); + + let rows = ctx.sql("SELECT * FROM dup_view").await?.collect().await?; + assert_batches_eq!( + [ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| seed | 0 |", + "+---------+---------+", + ], + &rows + ); + + Ok(()) +} + #[tokio::test] async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 977d2d03a1d07..82c30e9aba386 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -971,3 +971,23 @@ drop view v; statement count 0 drop table t; + +# https://github.com/apache/datafusion/issues/22167 +# https://github.com/apache/datafusion/issues/13487 +# CREATE TABLE AS / CREATE VIEW must reject queries whose result schema +# has duplicate (unqualified) column names, since the resulting object +# stores only unqualified names and would otherwise be unselectable. +statement ok +CREATE TABLE dup_src AS VALUES(1, 2); + +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1 +CREATE TABLE dup_ctas AS SELECT * FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2; + +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1 +CREATE VIEW dup_view AS SELECT * FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2; + +statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name column1 +SELECT * INTO dup_into FROM dup_src LEFT JOIN dup_src y ON dup_src.column1 = y.column2; + +statement ok +DROP TABLE dup_src;