@@ -28,6 +28,7 @@ use datafusion_expr::utils::disjunction;
2828use datafusion_expr:: {
2929 Distinct , Expr , Filter , LogicalPlan , Projection , SubqueryAlias , Union ,
3030} ;
31+ use log:: debug;
3132use std:: collections:: HashMap ;
3233use std:: sync:: Arc ;
3334
@@ -80,10 +81,15 @@ impl OptimizerRule for UnionsToFilter {
8081
8182fn try_rewrite_distinct_union ( plan : LogicalPlan ) -> Result < Option < LogicalPlan > > {
8283 let LogicalPlan :: Union ( Union { inputs, schema } ) = plan else {
84+ debug ! ( "unions_to_filter skipped: input is not a UNION" ) ;
8385 return Ok ( None ) ;
8486 } ;
8587
8688 if inputs. len ( ) < 2 {
89+ debug ! (
90+ "unions_to_filter skipped: UNION has {} input(s), need at least 2" ,
91+ inputs. len( )
92+ ) ;
8793 return Ok ( None ) ;
8894 }
8995
@@ -110,6 +116,7 @@ fn try_rewrite_distinct_union(plan: LogicalPlan) -> Result<Option<LogicalPlan>>
110116 }
111117
112118 if !transformed {
119+ debug ! ( "unions_to_filter skipped: no branch groups could be merged" ) ;
113120 return Ok ( None ) ;
114121 }
115122
@@ -138,20 +145,23 @@ fn try_rewrite_distinct_union(plan: LogicalPlan) -> Result<Option<LogicalPlan>>
138145 Ok ( Some ( LogicalPlan :: Distinct ( Distinct :: All ( Arc :: new ( union) ) ) ) )
139146}
140147
141- struct Branch {
148+ struct UnionBranch {
142149 source : LogicalPlan ,
143150 predicate : Expr ,
144151 wrappers : Vec < Wrapper > ,
145152}
146153
147- fn extract_branch ( plan : LogicalPlan ) -> Result < Option < Branch > > {
154+ fn extract_branch ( plan : LogicalPlan ) -> Result < Option < UnionBranch > > {
148155 let ( wrappers, plan) = peel_wrappers ( plan) ;
149156
150157 // Volatile or subquery expressions in the projection must not be merged:
151158 // they are evaluated once per branch in the original plan but would be
152159 // evaluated once per combined row after the rewrite, which can change the
153160 // output row set.
154161 if !wrapper_projections_are_safe ( & wrappers) {
162+ debug ! (
163+ "unions_to_filter skipped: projection wrapper contains volatile expression or subquery"
164+ ) ;
155165 return Ok ( None ) ;
156166 }
157167
@@ -160,9 +170,12 @@ fn extract_branch(plan: LogicalPlan) -> Result<Option<Branch>> {
160170 predicate, input, ..
161171 } ) => {
162172 if !is_mergeable_predicate ( & predicate) {
173+ debug ! (
174+ "unions_to_filter skipped: branch predicate contains volatility or a subquery"
175+ ) ;
163176 return Ok ( None ) ;
164177 }
165- Ok ( Some ( Branch {
178+ Ok ( Some ( UnionBranch {
166179 source : strip_passthrough_nodes ( Arc :: unwrap_or_clone ( input) ) ,
167180 predicate,
168181 wrappers,
@@ -172,8 +185,15 @@ fn extract_branch(plan: LogicalPlan) -> Result<Option<Branch>> {
172185 // Merging two such branches into one would silently drop the per-branch
173186 // row restriction (LIMIT) or rely on an order guarantee that UNION does
174187 // not preserve (ORDER BY). Bail out to leave the UNION unchanged.
175- LogicalPlan :: Limit ( _) | LogicalPlan :: Sort ( _) => Ok ( None ) ,
176- other => Ok ( Some ( Branch {
188+ LogicalPlan :: Limit ( _) => {
189+ debug ! ( "unions_to_filter skipped: branch contains LIMIT" ) ;
190+ Ok ( None )
191+ }
192+ LogicalPlan :: Sort ( _) => {
193+ debug ! ( "unions_to_filter skipped: branch contains ORDER BY / SORT" ) ;
194+ Ok ( None )
195+ }
196+ other => Ok ( Some ( UnionBranch {
177197 source : strip_passthrough_nodes ( other. clone ( ) ) ,
178198 predicate : Expr :: Literal (
179199 datafusion_common:: ScalarValue :: Boolean ( Some ( true ) ) ,
0 commit comments