@@ -67,10 +67,10 @@ use datafusion_catalog::{
6767use datafusion_common:: config:: { ConfigField , ConfigOptions } ;
6868use datafusion_common:: metadata:: ScalarAndMetadata ;
6969use datafusion_common:: {
70- DFSchema , DataFusionError , ParamValues , SchemaReference , TableReference ,
70+ DFSchema , DataFusionError , ParamValues , SchemaError , SchemaReference , TableReference ,
7171 config:: { ConfigExtension , TableOptions } ,
7272 exec_datafusion_err, exec_err, internal_datafusion_err, not_impl_err,
73- plan_datafusion_err, plan_err,
73+ plan_datafusion_err, plan_err, schema_err ,
7474 tree_node:: { TreeNodeRecursion , TreeNodeVisitor } ,
7575} ;
7676pub use datafusion_execution:: TaskContext ;
@@ -887,6 +887,7 @@ impl SessionContext {
887887 match ( if_not_exists, or_replace, table) {
888888 ( true , false , Ok ( _) ) => self . return_empty_dataframe ( ) ,
889889 ( false , true , Ok ( _) ) => {
890+ Self :: ensure_unique_column_names ( input. schema ( ) ) ?;
890891 self . deregister_table ( name. clone ( ) ) ?;
891892 let schema = Arc :: clone ( input. schema ( ) . inner ( ) ) ;
892893 let physical = DataFrame :: new ( self . state ( ) , input) ;
@@ -906,6 +907,7 @@ impl SessionContext {
906907 exec_err ! ( "'IF NOT EXISTS' cannot coexist with 'REPLACE'" )
907908 }
908909 ( _, _, Err ( _) ) => {
910+ Self :: ensure_unique_column_names ( input. schema ( ) ) ?;
909911 let schema = Arc :: clone ( input. schema ( ) . inner ( ) ) ;
910912 let physical = DataFrame :: new ( self . state ( ) , input) ;
911913
@@ -951,14 +953,16 @@ impl SessionContext {
951953
952954 match ( or_replace, view) {
953955 ( true , Ok ( _) ) => {
954- self . deregister_table ( name. clone ( ) ) ?;
955956 let input = Self :: apply_type_coercion ( Arc :: unwrap_or_clone ( input) ) ?;
957+ Self :: ensure_unique_column_names ( input. schema ( ) ) ?;
958+ self . deregister_table ( name. clone ( ) ) ?;
956959 let table = Arc :: new ( ViewTable :: new ( input, definition) ) ;
957960 self . register_table ( name, table) ?;
958961 self . return_empty_dataframe ( )
959962 }
960963 ( _, Err ( _) ) => {
961964 let input = Self :: apply_type_coercion ( Arc :: unwrap_or_clone ( input) ) ?;
965+ Self :: ensure_unique_column_names ( input. schema ( ) ) ?;
962966 let table = Arc :: new ( ViewTable :: new ( input, definition) ) ;
963967 self . register_table ( name, table) ?;
964968 self . return_empty_dataframe ( )
@@ -967,6 +971,21 @@ impl SessionContext {
967971 }
968972 }
969973
974+ fn ensure_unique_column_names ( schema : & DFSchema ) -> Result < ( ) > {
975+ // DFSchema name checks allow duplicate unqualified names as long as their
976+ // qualifiers differ. DDL persistence drops qualifiers, so this helper must
977+ // enforce uniqueness on the final unqualified names instead.
978+ let mut seen = HashSet :: with_capacity ( schema. fields ( ) . len ( ) ) ;
979+ for field in schema. fields ( ) {
980+ if !seen. insert ( field. name ( ) . as_str ( ) ) {
981+ return schema_err ! ( SchemaError :: DuplicateUnqualifiedField {
982+ name: field. name( ) . to_string( ) ,
983+ } ) ;
984+ }
985+ }
986+ Ok ( ( ) )
987+ }
988+
970989 async fn create_catalog_schema ( & self , cmd : CreateCatalogSchema ) -> Result < DataFrame > {
971990 let CreateCatalogSchema {
972991 schema_name,
@@ -1477,6 +1496,9 @@ impl SessionContext {
14771496 RegisterFunction :: Window ( f) => {
14781497 self . state . write ( ) . register_udwf ( f) ?;
14791498 }
1499+ RegisterFunction :: HigherOrder ( f) => {
1500+ self . state . write ( ) . register_higher_order_function ( f) ?;
1501+ }
14801502 RegisterFunction :: Table ( name, f) => self . register_udtf ( & name, f) ,
14811503 } ;
14821504
@@ -1491,6 +1513,11 @@ impl SessionContext {
14911513 dropped |= self . state . write ( ) . deregister_udaf ( & stmt. name ) ?. is_some ( ) ;
14921514 dropped |= self . state . write ( ) . deregister_udwf ( & stmt. name ) ?. is_some ( ) ;
14931515 dropped |= self . state . write ( ) . deregister_udtf ( & stmt. name ) ?. is_some ( ) ;
1516+ dropped |= self
1517+ . state
1518+ . write ( )
1519+ . deregister_higher_order_function ( & stmt. name ) ?
1520+ . is_some ( ) ;
14941521
14951522 // DROP FUNCTION IF EXISTS drops the specified function only if that
14961523 // function exists and in this way, it avoids error. While the DROP FUNCTION
@@ -1590,6 +1617,20 @@ impl SessionContext {
15901617 state. register_udf ( Arc :: new ( f) ) . ok ( ) ;
15911618 }
15921619
1620+ /// Registers a higher-order function within this context.
1621+ ///
1622+ /// Note in SQL queries, function names are looked up using
1623+ /// lowercase unless the query uses quotes. For example,
1624+ ///
1625+ /// - `SELECT MY_HIGHER_ORDER_FUNC(x)...` will look for a function named `"my_higher_order_func"`
1626+ /// - `SELECT "my_HIGHER_ORDER_FUNC"(x)` will look for a function named `"my_HIGHER_ORDER_FUNC"`
1627+ ///
1628+ /// Any functions registered with the function name or its aliases will be overwritten with this new function
1629+ pub fn register_higher_order_function ( & self , f : Arc < dyn HigherOrderUDF > ) {
1630+ let mut state = self . state . write ( ) ;
1631+ state. register_higher_order_function ( f) . ok ( ) ;
1632+ }
1633+
15931634 /// Registers an aggregate UDF within this context.
15941635 ///
15951636 /// Note in SQL queries, aggregate names are looked up using
@@ -1629,6 +1670,14 @@ impl SessionContext {
16291670 self . state . write ( ) . deregister_udf ( name) . ok ( ) ;
16301671 }
16311672
1673+ /// Deregisters a higher-order function within this context.
1674+ pub fn deregister_higher_order_function ( & self , name : & str ) {
1675+ self . state
1676+ . write ( )
1677+ . deregister_higher_order_function ( name)
1678+ . ok ( ) ;
1679+ }
1680+
16321681 /// Deregisters a UDAF within this context.
16331682 pub fn deregister_udaf ( & self , name : & str ) {
16341683 self . state . write ( ) . deregister_udaf ( name) . ok ( ) ;
@@ -2171,6 +2220,8 @@ pub enum RegisterFunction {
21712220 Aggregate ( Arc < AggregateUDF > ) ,
21722221 /// Window user defined function
21732222 Window ( Arc < WindowUDF > ) ,
2223+ /// Higher-order user defined function
2224+ HigherOrder ( Arc < dyn HigherOrderUDF > ) ,
21742225 /// Table user defined function
21752226 Table ( String , Arc < dyn TableFunctionImpl > ) ,
21762227}
0 commit comments