@@ -48,9 +48,9 @@ use datafusion_common::{
4848} ;
4949use datafusion_expr:: expr:: { OUTER_REFERENCE_COLUMN_PREFIX , UNNEST_COLUMN_PREFIX } ;
5050use datafusion_expr:: {
51- BinaryExpr , Distinct , Expr , JoinConstraint , JoinType , LogicalPlan ,
51+ Aggregate , BinaryExpr , Distinct , Expr , JoinConstraint , JoinType , LogicalPlan ,
5252 LogicalPlanBuilder , Operator , Projection , SortExpr , TableScan , Unnest ,
53- UserDefinedLogicalNode , expr:: Alias ,
53+ UserDefinedLogicalNode , Window , expr:: Alias ,
5454} ;
5555use sqlparser:: ast:: { self , Ident , OrderByKind , SetExpr , TableAliasColumnDef } ;
5656use std:: { sync:: Arc , vec} ;
@@ -478,6 +478,80 @@ impl Unparser<'_> {
478478 Ok ( false )
479479 }
480480
481+ fn project_window_output (
482+ & self ,
483+ window_expr : & [ Expr ] ,
484+ select : & mut SelectBuilder ,
485+ agg : Option < & Aggregate > ,
486+ ) -> Result < ( ) > {
487+ let mut items = if select. already_projected ( ) {
488+ select. pop_projections ( )
489+ } else {
490+ vec ! [ ast:: SelectItem :: Wildcard (
491+ ast:: WildcardAdditionalOptions :: default ( ) ,
492+ ) ]
493+ } ;
494+
495+ items. extend (
496+ window_expr
497+ . iter ( )
498+ . map ( |expr| {
499+ let expr = if let Some ( agg) = agg {
500+ unproject_agg_exprs ( expr. clone ( ) , agg, None ) ?
501+ } else {
502+ expr. clone ( )
503+ } ;
504+ self . select_item_to_sql ( & expr)
505+ } )
506+ . collect :: < Result < Vec < _ > > > ( ) ?,
507+ ) ;
508+ select. projection ( items) ;
509+
510+ Ok ( ( ) )
511+ }
512+
513+ fn window_input_requires_derived_subquery ( plan : & LogicalPlan ) -> bool {
514+ // These operators either produce a SELECT list or apply SQL clauses
515+ // that are evaluated after window functions in a single SELECT block.
516+ // Keep them below the Window node by emitting a derived table.
517+ matches ! (
518+ plan,
519+ LogicalPlan :: Projection ( _)
520+ | LogicalPlan :: Distinct ( _)
521+ | LogicalPlan :: Limit ( _)
522+ | LogicalPlan :: Sort ( _)
523+ | LogicalPlan :: Union ( _)
524+ )
525+ }
526+
527+ fn window_to_sql_with_derived_input (
528+ & self ,
529+ window : & Window ,
530+ select : & mut SelectBuilder ,
531+ relation : & mut RelationBuilder ,
532+ ) -> Result < ( ) > {
533+ let input_alias = "derived_window_input" ;
534+ self . derive (
535+ window. input . as_ref ( ) ,
536+ relation,
537+ Some ( self . new_table_alias ( input_alias. to_string ( ) , vec ! [ ] ) ) ,
538+ false ,
539+ ) ?;
540+
541+ let input_schema = window. input . schema ( ) ;
542+ let mut alias_rewriter = TableAliasRewriter {
543+ table_schema : input_schema. as_arrow ( ) ,
544+ alias_name : TableReference :: bare ( input_alias) ,
545+ } ;
546+ let window_expr = window
547+ . window_expr
548+ . iter ( )
549+ . map ( |expr| expr. clone ( ) . rewrite ( & mut alias_rewriter) . data ( ) )
550+ . collect :: < Result < Vec < _ > > > ( ) ?;
551+
552+ self . project_window_output ( & window_expr, select, None )
553+ }
554+
481555 #[ cfg_attr( feature = "recursive_protection" , recursive:: recursive) ]
482556 fn select_to_sql_recursively (
483557 & self ,
@@ -610,25 +684,28 @@ impl Unparser<'_> {
610684 self . select_to_sql_recursively ( p. input . as_ref ( ) , query, select, relation)
611685 }
612686 LogicalPlan :: Filter ( filter) => {
613- if let Some ( agg) =
614- find_agg_node_within_select ( plan, select. already_projected ( ) )
687+ let window = find_window_nodes_within_select (
688+ plan,
689+ None ,
690+ select. already_projected ( ) ,
691+ ) ;
692+ let agg = find_agg_node_within_select ( plan, select. already_projected ( ) ) ;
693+
694+ if let ( Some ( window) , true ) =
695+ ( window. as_deref ( ) , self . dialect . supports_qualify ( ) )
615696 {
697+ let mut unprojected =
698+ unproject_window_exprs ( filter. predicate . clone ( ) , window) ?;
699+ if let Some ( agg) = agg {
700+ unprojected = unproject_agg_exprs ( unprojected, agg, None ) ?;
701+ }
702+ let filter_expr = self . expr_to_sql ( & unprojected) ?;
703+ select. qualify ( Some ( filter_expr) ) ;
704+ } else if let Some ( agg) = agg {
616705 let unprojected =
617706 unproject_agg_exprs ( filter. predicate . clone ( ) , agg, None ) ?;
618707 let filter_expr = self . expr_to_sql ( & unprojected) ?;
619708 select. having ( Some ( filter_expr) ) ;
620- } else if let ( Some ( window) , true ) = (
621- find_window_nodes_within_select (
622- plan,
623- None ,
624- select. already_projected ( ) ,
625- ) ,
626- self . dialect . supports_qualify ( ) ,
627- ) {
628- let unprojected =
629- unproject_window_exprs ( filter. predicate . clone ( ) , & window) ?;
630- let filter_expr = self . expr_to_sql ( & unprojected) ?;
631- select. qualify ( Some ( filter_expr) ) ;
632709 } else {
633710 let filter_expr = self . expr_to_sql ( & filter. predicate ) ?;
634711 select. selection ( Some ( filter_expr) ) ;
@@ -1143,13 +1220,37 @@ impl Unparser<'_> {
11431220 Ok ( ( ) )
11441221 }
11451222 LogicalPlan :: Window ( window) => {
1146- // Window nodes are handled simultaneously with Projection nodes
1223+ // Window nodes are usually handled simultaneously with Projection
1224+ // nodes, where projected columns are unprojected back into their
1225+ // corresponding window expressions. Manually built plans can have
1226+ // Window nodes without an enclosing Projection, so in that case
1227+ // the Window node itself must contribute its output expressions.
1228+ let project_window_output = !select. already_projected ( ) ;
1229+ if project_window_output
1230+ && Self :: window_input_requires_derived_subquery ( window. input . as_ref ( ) )
1231+ {
1232+ return self
1233+ . window_to_sql_with_derived_input ( window, select, relation) ;
1234+ }
1235+
1236+ let agg = if project_window_output {
1237+ find_agg_node_within_select ( plan, false )
1238+ } else {
1239+ None
1240+ } ;
1241+
11471242 self . select_to_sql_recursively (
11481243 window. input . as_ref ( ) ,
11491244 query,
11501245 select,
11511246 relation,
1152- )
1247+ ) ?;
1248+
1249+ if project_window_output {
1250+ self . project_window_output ( & window. window_expr , select, agg) ?;
1251+ }
1252+
1253+ Ok ( ( ) )
11531254 }
11541255 LogicalPlan :: EmptyRelation ( _) => {
11551256 // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
0 commit comments