@@ -22,10 +22,11 @@ use std::sync::Arc;
2222
2323use crate :: PhysicalExpr ;
2424use crate :: expressions:: { Column , Literal } ;
25+ use crate :: scalar_function:: ScalarFunctionExpr ;
2526use crate :: utils:: collect_columns;
2627
2728use arrow:: array:: { RecordBatch , RecordBatchOptions } ;
28- use arrow:: datatypes:: { Field , Schema , SchemaRef } ;
29+ use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
2930use datafusion_common:: stats:: { ColumnStatistics , Precision } ;
3031use datafusion_common:: tree_node:: { Transformed , TransformedResult , TreeNode } ;
3132use datafusion_common:: {
@@ -952,9 +953,79 @@ impl ProjectionMapping {
952953 None => Ok ( Transformed :: no ( e) ) ,
953954 } )
954955 . data ( ) ?;
955- map. entry ( source_expr)
956+ map. entry ( Arc :: clone ( & source_expr) )
956957 . or_default ( )
957- . push ( ( target_expr, expr_idx) ) ;
958+ . push ( ( Arc :: clone ( & target_expr) , expr_idx) ) ;
959+
960+ // For struct-producing functions (e.g. named_struct), decompose
961+ // into field-level mapping entries so that orderings propagate
962+ // through struct projections. For example, if the projection has
963+ // `named_struct('ticker', p.ticker, ...) AS details`, this adds:
964+ // p.ticker → get_field(col("details"), "ticker")
965+ // enabling the optimizer to know that sorting by
966+ // `details.ticker` is equivalent to sorting by `p.ticker`.
967+ if let Some ( func_expr) =
968+ source_expr. as_any ( ) . downcast_ref :: < ScalarFunctionExpr > ( )
969+ {
970+ let literal_args: Vec < Option < ScalarValue > > = func_expr
971+ . args ( )
972+ . iter ( )
973+ . map ( |arg| {
974+ arg. as_any ( )
975+ . downcast_ref :: < Literal > ( )
976+ . map ( |l| l. value ( ) . clone ( ) )
977+ } )
978+ . collect ( ) ;
979+
980+ #[ expect(
981+ clippy:: collapsible_if,
982+ reason = "readability: field_mapping and struct type are logically separate checks"
983+ ) ]
984+ if let Some ( field_mapping) =
985+ func_expr. fun ( ) . struct_field_mapping ( & literal_args)
986+ {
987+ if let DataType :: Struct ( struct_fields) = func_expr. return_type ( ) {
988+ for ( accessor_args, source_arg_idx) in & field_mapping. fields {
989+ let value_expr =
990+ Arc :: clone ( & func_expr. args ( ) [ * source_arg_idx] ) ;
991+
992+ // Build accessor args: [target_col, ...field_name_literals]
993+ let mut accessor_fn_args: Vec < Arc < dyn PhysicalExpr > > =
994+ vec ! [ Arc :: clone( & target_expr) ] ;
995+ accessor_fn_args. extend ( accessor_args. iter ( ) . map ( |sv| {
996+ Arc :: new ( Literal :: new ( sv. clone ( ) ) )
997+ as Arc < dyn PhysicalExpr >
998+ } ) ) ;
999+
1000+ // Look up the field's return type from the struct schema
1001+ let return_field = accessor_args
1002+ . first ( )
1003+ . and_then ( |sv| sv. try_as_str ( ) . flatten ( ) )
1004+ . and_then ( |field_name| {
1005+ struct_fields
1006+ . iter ( )
1007+ . find ( |f| f. name ( ) == field_name)
1008+ . cloned ( )
1009+ } ) ;
1010+
1011+ if let Some ( return_field) = return_field {
1012+ let field_access_expr = Arc :: new ( ScalarFunctionExpr :: new (
1013+ field_mapping. field_accessor . name ( ) ,
1014+ Arc :: clone ( & field_mapping. field_accessor ) ,
1015+ accessor_fn_args,
1016+ return_field,
1017+ Arc :: new ( func_expr. config_options ( ) . clone ( ) ) ,
1018+ ) )
1019+ as Arc < dyn PhysicalExpr > ;
1020+
1021+ map. entry ( value_expr)
1022+ . or_default ( )
1023+ . push ( ( field_access_expr, expr_idx) ) ;
1024+ }
1025+ }
1026+ }
1027+ }
1028+ }
9581029 }
9591030 Ok ( Self { map } )
9601031 }
@@ -1110,8 +1181,10 @@ pub(crate) mod tests {
11101181 let data_type = source. data_type ( input_schema) ?;
11111182 let nullable = source. nullable ( input_schema) ?;
11121183 for ( target, _) in targets. iter ( ) {
1184+ // Skip non-Column targets (e.g. struct field decomposition
1185+ // entries which are ScalarFunctionExpr targets).
11131186 let Some ( column) = target. as_any ( ) . downcast_ref :: < Column > ( ) else {
1114- return plan_err ! ( "Expects to have column" ) ;
1187+ continue ;
11151188 } ;
11161189 fields. push ( Field :: new ( column. name ( ) , data_type. clone ( ) , nullable) ) ;
11171190 }
0 commit comments