@@ -89,14 +89,15 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
8989use datafusion_physical_plan:: execution_plan:: InvariantLevel ;
9090use datafusion_physical_plan:: placeholder_row:: PlaceholderRowExec ;
9191use datafusion_physical_plan:: unnest:: ListUnnest ;
92+ use datafusion_sql:: TableReference ;
93+ use sqlparser:: ast:: NullTreatment ;
9294
9395use crate :: schema_equivalence:: schema_satisfied_by;
9496use async_trait:: async_trait;
9597use datafusion_datasource:: file_groups:: FileGroup ;
9698use futures:: { StreamExt , TryStreamExt } ;
9799use itertools:: { multiunzip, Itertools } ;
98100use log:: { debug, trace} ;
99- use sqlparser:: ast:: NullTreatment ;
100101use tokio:: sync:: Mutex ;
101102
102103/// Physical query planner that converts a `LogicalPlan` to an
@@ -890,8 +891,8 @@ impl DefaultPhysicalPlanner {
890891
891892 // 2 Children
892893 LogicalPlan :: Join ( Join {
893- left,
894- right,
894+ left : original_left ,
895+ right : original_right ,
895896 on : keys,
896897 filter,
897898 join_type,
@@ -916,23 +917,25 @@ impl DefaultPhysicalPlanner {
916917 let ( left, left_col_keys, left_projected) =
917918 wrap_projection_for_join_if_necessary (
918919 & left_keys,
919- left . as_ref ( ) . clone ( ) ,
920+ original_left . as_ref ( ) . clone ( ) ,
920921 ) ?;
921922 let ( right, right_col_keys, right_projected) =
922923 wrap_projection_for_join_if_necessary (
923924 & right_keys,
924- right . as_ref ( ) . clone ( ) ,
925+ original_right . as_ref ( ) . clone ( ) ,
925926 ) ?;
926927 let column_on = ( left_col_keys, right_col_keys) ;
927928
928929 let left = Arc :: new ( left) ;
929930 let right = Arc :: new ( right) ;
930- let new_join = LogicalPlan :: Join ( Join :: try_new_with_project_input (
931+ let ( new_join, requalified ) = Join :: try_new_with_project_input (
931932 node,
932933 Arc :: clone ( & left) ,
933934 Arc :: clone ( & right) ,
934935 column_on,
935- ) ?) ;
936+ ) ?;
937+
938+ let new_join = LogicalPlan :: Join ( new_join) ;
936939
937940 // If inputs were projected then create ExecutionPlan for these new
938941 // LogicalPlan nodes.
@@ -965,8 +968,24 @@ impl DefaultPhysicalPlanner {
965968
966969 // Remove temporary projected columns
967970 if left_projected || right_projected {
968- let final_join_result =
969- join_schema. iter ( ) . map ( Expr :: from) . collect :: < Vec < _ > > ( ) ;
971+ // Re-qualify the join schema only if the inputs were previously requalified in
972+ // `try_new_with_project_input`. This ensures that when building the Projection
973+ // it can correctly resolve field nullability and data types
974+ // by disambiguating fields from the left and right sides of the join.
975+ let qualified_join_schema = if requalified {
976+ Arc :: new ( qualify_join_schema_sides (
977+ join_schema,
978+ original_left,
979+ original_right,
980+ ) ?)
981+ } else {
982+ Arc :: clone ( join_schema)
983+ } ;
984+
985+ let final_join_result = qualified_join_schema
986+ . iter ( )
987+ . map ( Expr :: from)
988+ . collect :: < Vec < _ > > ( ) ;
970989 let projection = LogicalPlan :: Projection ( Projection :: try_new (
971990 final_join_result,
972991 Arc :: new ( new_join) ,
@@ -1463,6 +1482,64 @@ fn get_null_physical_expr_pair(
14631482 Ok ( ( Arc :: new ( null_value) , physical_name) )
14641483}
14651484
1485+ /// Qualifies the fields in a join schema with "left" and "right" qualifiers
1486+ /// without mutating the original schema. This function should only be used when
1487+ /// the join inputs have already been requalified earlier in `try_new_with_project_input`.
1488+ ///
1489+ /// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution)
1490+ /// when converting expressions to fields.
1491+ fn qualify_join_schema_sides (
1492+ join_schema : & DFSchema ,
1493+ left : & LogicalPlan ,
1494+ right : & LogicalPlan ,
1495+ ) -> Result < DFSchema > {
1496+ let left_fields = left. schema ( ) . fields ( ) ;
1497+ let right_fields = right. schema ( ) . fields ( ) ;
1498+ let join_fields = join_schema. fields ( ) ;
1499+
1500+ // Validate lengths
1501+ if join_fields. len ( ) != left_fields. len ( ) + right_fields. len ( ) {
1502+ return internal_err ! (
1503+ "Join schema field count must match left and right field count."
1504+ ) ;
1505+ }
1506+
1507+ // Validate field names match
1508+ for ( i, ( field, expected) ) in join_fields
1509+ . iter ( )
1510+ . zip ( left_fields. iter ( ) . chain ( right_fields. iter ( ) ) )
1511+ . enumerate ( )
1512+ {
1513+ if field. name ( ) != expected. name ( ) {
1514+ return internal_err ! (
1515+ "Field name mismatch at index {}: expected '{}', found '{}'" ,
1516+ i,
1517+ expected. name( ) ,
1518+ field. name( )
1519+ ) ;
1520+ }
1521+ }
1522+
1523+ // qualify sides
1524+ let qualifiers = join_fields
1525+ . iter ( )
1526+ . enumerate ( )
1527+ . map ( |( i, _) | {
1528+ if i < left_fields. len ( ) {
1529+ Some ( TableReference :: Bare {
1530+ table : Arc :: from ( "left" ) ,
1531+ } )
1532+ } else {
1533+ Some ( TableReference :: Bare {
1534+ table : Arc :: from ( "right" ) ,
1535+ } )
1536+ }
1537+ } )
1538+ . collect ( ) ;
1539+
1540+ join_schema. with_field_specific_qualified_schema ( qualifiers)
1541+ }
1542+
14661543fn get_physical_expr_pair (
14671544 expr : & Expr ,
14681545 input_dfschema : & DFSchema ,
0 commit comments