@@ -16,7 +16,7 @@ use crate::config::{NodeMapping, RelationshipMapping};
1616use crate :: error:: Result ;
1717use crate :: source_catalog:: GraphSourceCatalog ;
1818use datafusion:: logical_expr:: {
19- col, BinaryExpr , Expr , JoinType , LogicalPlan , LogicalPlanBuilder , Operator ,
19+ col, BinaryExpr , Expr , JoinType , LogicalPlan , LogicalPlanBuilder , Operator , TableSource ,
2020} ;
2121use std:: collections:: HashMap ;
2222use std:: sync:: Arc ;
@@ -41,6 +41,97 @@ pub(crate) struct TargetJoinParams<'a> {
4141}
4242
4343impl DataFusionPlanner {
44+ /// Build a target node scan with property filters and qualified column names
45+ fn build_target_scan (
46+ & self ,
47+ target_source : Arc < dyn TableSource > ,
48+ target_label : & str ,
49+ target_variable : & str ,
50+ target_properties : & HashMap < String , PropertyValue > ,
51+ ) -> Result < LogicalPlan > {
52+ let target_schema = target_source. schema ( ) ;
53+ let normalized_target_label = target_label. to_lowercase ( ) ;
54+ let mut target_builder =
55+ LogicalPlanBuilder :: scan ( & normalized_target_label, target_source, None ) . map_err (
56+ |e| self . plan_error ( & format ! ( "Failed to scan target node '{}'" , target_label) , e) ,
57+ ) ?;
58+
59+ // Apply target property filters (e.g., (b {age: 30}))
60+ for ( k, v) in target_properties. iter ( ) {
61+ let lit_expr = super :: expression:: to_df_value_expr (
62+ & crate :: ast:: ValueExpression :: Literal ( v. clone ( ) ) ,
63+ ) ;
64+ let filter_expr = Expr :: BinaryExpr ( BinaryExpr {
65+ left : Box :: new ( col ( k. to_lowercase ( ) ) ) ,
66+ op : Operator :: Eq ,
67+ right : Box :: new ( lit_expr) ,
68+ } ) ;
69+ target_builder = target_builder. filter ( filter_expr) . map_err ( |e| {
70+ self . plan_error ( & format ! ( "Failed to apply target filter on '{}'" , k) , e)
71+ } ) ?;
72+ }
73+
74+ let target_qualified_exprs: Vec < Expr > = target_schema
75+ . fields ( )
76+ . iter ( )
77+ . map ( |field| {
78+ let qualified_name = qualify_column ( target_variable, field. name ( ) ) ;
79+ col ( field. name ( ) ) . alias ( & qualified_name)
80+ } )
81+ . collect ( ) ;
82+
83+ target_builder
84+ . project ( target_qualified_exprs)
85+ . map_err ( |e| self . plan_error ( "Failed to project target columns" , e) ) ?
86+ . build ( )
87+ . map_err ( |e| self . plan_error ( "Failed to build target scan" , e) )
88+ }
89+
90+ /// Handle variable reuse case by adding a filter constraint instead of joining
91+ fn handle_variable_reuse_filter (
92+ & self ,
93+ mut builder : LogicalPlanBuilder ,
94+ params : & TargetJoinParams ,
95+ qualified_target_key : String ,
96+ ) -> Result < LogicalPlan > {
97+ // Determine the relationship target key based on direction
98+ let target_key = match params. direction {
99+ RelationshipDirection :: Outgoing => & params. rel_map . target_id_field ,
100+ RelationshipDirection :: Incoming => & params. rel_map . source_id_field ,
101+ RelationshipDirection :: Undirected => & params. rel_map . target_id_field ,
102+ } ;
103+
104+ let qualified_rel_target_key = qualify_column ( params. rel_qualifier , target_key) ;
105+
106+ // Create a filter expression: rel_target_key = target_key
107+ let join_condition = Expr :: BinaryExpr ( BinaryExpr {
108+ left : Box :: new ( col ( & qualified_rel_target_key) ) ,
109+ op : Operator :: Eq ,
110+ right : Box :: new ( col ( & qualified_target_key) ) ,
111+ } ) ;
112+
113+ // Apply filter instead of join
114+ builder = builder
115+ . filter ( join_condition)
116+ . map_err ( |e| self . plan_error ( "Failed to apply variable reuse filter" , e) ) ?;
117+
118+ // TODO: Handle target_properties filters when variable is reused
119+ // For now, we'll skip them since the variable already exists
120+ if !params. target_properties . is_empty ( ) {
121+ return Err ( crate :: error:: GraphError :: PlanError {
122+ message : format ! (
123+ "Property filters on reused variable '{}' are not yet supported" ,
124+ params. target_variable
125+ ) ,
126+ location : snafu:: Location :: new ( file ! ( ) , line ! ( ) , column ! ( ) ) ,
127+ } ) ;
128+ }
129+
130+ builder
131+ . build ( )
132+ . map_err ( |e| self . plan_error ( "Failed to build plan with variable reuse" , e) )
133+ }
134+
44135 /// Join source node plan with relationship scan
45136 pub ( crate ) fn join_source_to_relationship (
46137 & self ,
@@ -94,43 +185,35 @@ impl DataFusionPlanner {
94185 . map_err ( |e| self . plan_error ( "Failed to build plan (no target source)" , e) ) ;
95186 } ;
96187
97- // Create target node scan with qualified column aliases and property filters
98- let target_schema = target_source . schema ( ) ;
99- let normalized_target_label = target_label . to_lowercase ( ) ;
100- let mut target_builder =
101- LogicalPlanBuilder :: scan ( & normalized_target_label , target_source , None ) . map_err (
102- |e| self . plan_error ( & format ! ( "Failed to scan target node '{}' " , target_label ) , e ) ,
103- ) ? ;
188+ // Check if target variable already exists in the schema (variable reuse)
189+ // Build a temporary plan to inspect the current schema
190+ let current_plan = builder
191+ . clone ( )
192+ . build ( )
193+ . map_err ( |e| self . plan_error ( "Failed to build temp plan for schema check " , e ) ) ? ;
194+ let current_schema = current_plan . schema ( ) ;
104195
105- // Apply target property filters (e.g., (b {age: 30}))
106- for ( k, v) in params. target_properties . iter ( ) {
107- let lit_expr = super :: expression:: to_df_value_expr (
108- & crate :: ast:: ValueExpression :: Literal ( v. clone ( ) ) ,
109- ) ;
110- let filter_expr = Expr :: BinaryExpr ( BinaryExpr {
111- left : Box :: new ( col ( k. to_lowercase ( ) ) ) ,
112- op : Operator :: Eq ,
113- right : Box :: new ( lit_expr) ,
114- } ) ;
115- target_builder = target_builder. filter ( filter_expr) . map_err ( |e| {
116- self . plan_error ( & format ! ( "Failed to apply target filter on '{}'" , k) , e)
117- } ) ?;
196+ // Check if the target variable's ID column already exists
197+ let qualified_target_key =
198+ qualify_column ( params. target_variable , & params. node_map . id_field ) ;
199+ let target_exists = current_schema
200+ . field_with_unqualified_name ( & qualified_target_key)
201+ . is_ok ( ) ;
202+
203+ if target_exists {
204+ // Variable reuse: target node columns already in schema
205+ // Skip creating new scan, just add filter constraint
206+ return self . handle_variable_reuse_filter ( builder, params, qualified_target_key) ;
118207 }
119208
120- let target_qualified_exprs: Vec < Expr > = target_schema
121- . fields ( )
122- . iter ( )
123- . map ( |field| {
124- let qualified_name = qualify_column ( params. target_variable , field. name ( ) ) ;
125- col ( field. name ( ) ) . alias ( & qualified_name)
126- } )
127- . collect ( ) ;
128-
129- let target_scan = target_builder
130- . project ( target_qualified_exprs)
131- . map_err ( |e| self . plan_error ( "Failed to project target columns" , e) ) ?
132- . build ( )
133- . map_err ( |e| self . plan_error ( "Failed to build target scan" , e) ) ?;
209+ // Normal case: target variable doesn't exist yet
210+ // Create target node scan with qualified column aliases and property filters
211+ let target_scan = self . build_target_scan (
212+ target_source,
213+ & target_label,
214+ params. target_variable ,
215+ params. target_properties ,
216+ ) ?;
134217
135218 // Determine target join keys
136219 let target_key = match params. direction {
@@ -140,8 +223,6 @@ impl DataFusionPlanner {
140223 } ;
141224
142225 let qualified_rel_target_key = qualify_column ( params. rel_qualifier , target_key) ;
143- let qualified_target_key =
144- qualify_column ( params. target_variable , & params. node_map . id_field ) ;
145226
146227 builder = builder
147228 . join (
0 commit comments