11// SPDX-License-Identifier: Apache-2.0
22// SPDX-FileCopyrightText: Copyright the Vortex contributors
33
4+ use std:: collections:: BTreeMap ;
45use std:: sync:: Arc ;
56
67use arrow_schema:: DataType ;
@@ -41,7 +42,6 @@ use vortex::scalar_fn::fns::binary::Binary;
4142use vortex:: scalar_fn:: fns:: like:: Like ;
4243use vortex:: scalar_fn:: fns:: like:: LikeOptions ;
4344use vortex:: scalar_fn:: fns:: operators:: Operator ;
44- use vortex_utils:: aliases:: hash_set:: HashSet ;
4545
4646use crate :: convert:: FromDataFusion ;
4747
@@ -121,12 +121,17 @@ pub trait ExpressionConvertor: Send + Sync {
121121pub struct DefaultExpressionConvertor { }
122122
123123impl DefaultExpressionConvertor {
124- fn scan_projection_for_leftover_expr (
124+ /// Builds scan inputs for a decimal arithmetic expression that stays in DataFusion.
125+ ///
126+ /// Decimal arithmetic itself is not pushed down, but schema evolution can leave
127+ /// decimal-to-decimal `CastColumnExpr`s inside the leftover expression. Preserve
128+ /// those casts in the scan projection so the leftover expression is rebound
129+ /// against the logical decimal type DataFusion planned for.
130+ fn scan_projection_for_decimal_arithmetic_leftover (
125131 & self ,
126132 expr : & Arc < dyn PhysicalExpr > ,
127133 ) -> DFResult < Vec < ( String , Expression ) > > {
128- let mut projected_names = HashSet :: new ( ) ;
129- let mut scan_projection = Vec :: new ( ) ;
134+ let mut scan_projection = BTreeMap :: new ( ) ;
130135
131136 expr. apply ( |node| {
132137 if let Some ( cast_col_expr) = node. as_any ( ) . downcast_ref :: < df_expr:: CastColumnExpr > ( )
@@ -138,24 +143,21 @@ impl DefaultExpressionConvertor {
138143 . downcast_ref :: < df_expr:: Column > ( )
139144 {
140145 let name = column. name ( ) . to_string ( ) ;
141- if projected_names. insert ( name. clone ( ) ) {
142- scan_projection. push ( ( name, self . convert ( node. as_ref ( ) ) ?) ) ;
143- }
146+ scan_projection. insert ( column. index ( ) , ( name, self . convert ( node. as_ref ( ) ) ?) ) ;
144147 return Ok ( TreeNodeRecursion :: Stop ) ;
145148 }
146149
150+ if let Some ( column) = node. as_any ( ) . downcast_ref :: < df_expr:: Column > ( ) {
151+ scan_projection. entry ( column. index ( ) ) . or_insert_with ( || {
152+ let name = column. name ( ) . to_string ( ) ;
153+ ( name. clone ( ) , get_item ( name, root ( ) ) )
154+ } ) ;
155+ }
156+
147157 Ok ( TreeNodeRecursion :: Continue )
148158 } ) ?;
149159
150- scan_projection. extend (
151- collect_columns ( expr)
152- . into_iter ( )
153- . filter ( |c| !projected_names. contains ( c. name ( ) ) )
154- . sorted_by_key ( |c| c. index ( ) )
155- . map ( |c| ( c. name ( ) . to_string ( ) , get_item ( c. name ( ) , root ( ) ) ) ) ,
156- ) ;
157-
158- Ok ( scan_projection)
160+ Ok ( scan_projection. into_values ( ) . collect ( ) )
159161 }
160162
161163 /// Attempts to convert a DataFusion ScalarFunctionExpr to a Vortex expression.
@@ -362,7 +364,8 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
362364 && ( is_decimal ( & binary_expr. left ( ) . data_type ( input_schema) ?)
363365 && is_decimal ( & binary_expr. right ( ) . data_type ( input_schema) ?) )
364366 {
365- scan_projection. extend ( self . scan_projection_for_leftover_expr ( node) ?) ;
367+ scan_projection
368+ . extend ( self . scan_projection_for_decimal_arithmetic_leftover ( node) ?) ;
366369 leftover_projection. push ( projection_expr. clone ( ) ) ;
367370 return Ok ( TreeNodeRecursion :: Stop ) ;
368371 }
0 commit comments