diff --git a/rust/lance-graph/src/datafusion_planner.rs b/rust/lance-graph/src/datafusion_planner.rs index c2aeba3e..3db58125 100644 --- a/rust/lance-graph/src/datafusion_planner.rs +++ b/rust/lance-graph/src/datafusion_planner.rs @@ -3,25 +3,16 @@ //! DataFusion-based physical planner for graph queries //! -//! This module translates graph logical plans into DataFusion logical plans for execution. -//! It implements a two-phase planning approach: +//! Translates graph logical plans into DataFusion logical plans using a two-phase approach: //! //! ## Phase 1: Analysis -//! - Extracts metadata from the graph logical plan (from `logical_plan.rs`) -//! - Assigns unique IDs to relationship instances to avoid column name conflicts +//! - Assigns unique IDs to relationship instances to avoid column conflicts //! - Collects variable-to-label mappings and required datasets //! //! ## Phase 2: Plan Building -//! - Converts graph operations to relational operations -//! - Nodes as Tables: Each node label becomes a table scan -//! - Relationships as Tables: Each relationship type becomes a linking table -//! - Graph traversals become SQL joins with qualified column names -//! -//! ## Key Design Decisions -//! - **Unique relationship aliases**: Each relationship expansion gets a unique alias -//! (e.g., `knows_1`, `knows_2`) to support multi-hop queries without column conflicts -//! - **Relationship variables**: User-specified variables (e.g., `[r:KNOWS]`) take precedence -//! - **Column qualification**: All columns are qualified as `{variable}__{column}` to avoid ambiguity +//! - Nodes → Table scans, Relationships → Linking tables, Traversals → Joins +//! - Variable-length paths (`*1..3`) use unrolling: generate fixed-length plans + UNION +//! - All columns qualified as `{variable}__{column}` to avoid ambiguity use crate::ast::RelationshipDirection; use crate::error::Result; @@ -102,12 +93,12 @@ struct SourceJoinParams<'a> { /// Parameters for joining relationship to target node struct TargetJoinParams<'a> { - source_variable: &'a str, target_variable: &'a str, rel_qualifier: &'a str, node_map: &'a crate::config::NodeMapping, rel_map: &'a crate::config::RelationshipMapping, direction: &'a RelationshipDirection, + target_properties: &'a HashMap, } /// Planning context that tracks state during plan building @@ -227,15 +218,7 @@ fn analyze_operator( input, source_variable, target_variable, - relationship_types, - direction, - relationship_variable, - .. - } - | LogicalOperator::VariableLengthExpand { - input, - source_variable, - target_variable, + target_label, relationship_types, direction, relationship_variable, @@ -244,13 +227,10 @@ fn analyze_operator( // Recursively analyze input first analyze_operator(input, analysis, rel_counter)?; - // Infer target variable's label from source variable - // For (a:Person)-[:KNOWS]->(b), b also gets label Person - if let Some(source_label) = analysis.var_to_label.get(source_variable).cloned() { - analysis - .var_to_label - .insert(target_variable.clone(), source_label); - } + // Register the target variable with its label from the logical plan + analysis + .var_to_label + .insert(target_variable.clone(), target_label.clone()); // Assign unique instance ID for this relationship if let Some(rel_type) = relationship_types.first() { @@ -278,6 +258,63 @@ fn analyze_operator( analysis.required_datasets.insert(rel_type.clone()); } } + LogicalOperator::VariableLengthExpand { + input, + source_variable, + target_variable, + relationship_types, + direction, + relationship_variable, + min_length, + max_length, + .. + } => { + // Recursively analyze input first + analyze_operator(input, analysis, rel_counter)?; + + // Infer target variable's label from source variable + // For (a:Person)-[:KNOWS]->(b), b also gets label Person + if let Some(source_label) = analysis.var_to_label.get(source_variable).cloned() { + analysis + .var_to_label + .insert(target_variable.clone(), source_label); + } + + // For variable-length paths, register multiple instances (one per hop) + // We need to register instances for all possible hop counts + if let Some(rel_type) = relationship_types.first() { + let max_hops = max_length.unwrap_or(crate::MAX_VARIABLE_LENGTH_HOPS); + let min_hops = min_length.unwrap_or(1).max(1); + + // Register instances for each hop count we'll generate + for hop_count in min_hops..=max_hops { + for _ in 0..hop_count { + let instance_id = rel_counter + .entry(rel_type.clone()) + .and_modify(|c| *c += 1) + .or_insert(1); + + // Use relationship variable if provided, otherwise use type_instanceId + let alias = if let Some(rel_var) = relationship_variable { + format!("{}_{}", rel_var, instance_id) + } else { + format!("{}_{}", rel_type.to_lowercase(), instance_id) + }; + + analysis.relationship_instances.push(RelationshipInstance { + id: *instance_id, + rel_type: rel_type.clone(), + source_var: source_variable.clone(), + target_var: target_variable.clone(), + direction: direction.clone(), + alias, + }); + } + } + + analysis.required_datasets.insert(rel_type.clone()); + } + } LogicalOperator::Filter { input, .. } | LogicalOperator::Project { input, .. } | LogicalOperator::Sort { input, .. } @@ -390,24 +427,43 @@ impl DataFusionPlanner { input, source_variable, target_variable, + target_label, relationship_types, direction, + properties, + target_properties, .. - } - | LogicalOperator::VariableLengthExpand { + } => self.build_expand( + ctx, + input, + source_variable, + target_variable, + target_label, + relationship_types, + direction, + properties, + target_properties, + ), + LogicalOperator::VariableLengthExpand { input, source_variable, target_variable, relationship_types, direction, + min_length, + max_length, + target_properties, .. - } => self.build_expand( + } => self.build_variable_length_expand( ctx, input, source_variable, target_variable, relationship_types, direction, + *min_length, + *max_length, + target_properties, ), LogicalOperator::Join { left, .. } => { // Not yet implemented: explicit join. For now, use left branch @@ -482,14 +538,18 @@ impl DataFusionPlanner { } /// Build a relationship expansion (graph traversal) as a series of joins + #[allow(clippy::too_many_arguments)] fn build_expand( &self, ctx: &mut PlanningContext, input: &LogicalOperator, source_variable: &str, target_variable: &str, + target_label: &str, relationship_types: &[String], direction: &RelationshipDirection, + relationship_properties: &HashMap, + target_properties: &HashMap, ) -> Result { let left_plan = self.build_operator(ctx, input)?; @@ -520,8 +580,9 @@ impl DataFusionPlanner { return Ok(left_plan); }; - // Build relationship scan with qualified columns - let rel_scan = self.build_relationship_scan(&rel_instance, rel_source)?; + // Build relationship scan with qualified columns and property filters + let rel_scan = + self.build_relationship_scan(&rel_instance, rel_source, relationship_properties)?; // Join source node with relationship let source_params = SourceJoinParams { @@ -533,28 +594,47 @@ impl DataFusionPlanner { }; let builder = self.join_source_to_relationship(left_plan, rel_scan, &source_params)?; - // Join relationship with target node + // Join relationship with target node using the explicit target_label + let target_node_map = self.config.node_mappings.get(target_label).ok_or_else(|| { + crate::error::GraphError::ConfigError { + message: format!("No mapping found for target label: {}", target_label), + location: snafu::Location::new(file!(), line!(), column!()), + } + })?; + let target_params = TargetJoinParams { - source_variable, target_variable, rel_qualifier: &rel_instance.alias, - node_map, + node_map: target_node_map, rel_map, direction, + target_properties, }; self.join_relationship_to_target(builder, cat, ctx, &target_params) } - /// Build a qualified relationship scan + /// Build a qualified relationship scan with property filters fn build_relationship_scan( &self, rel_instance: &RelationshipInstance, rel_source: Arc, + relationship_properties: &HashMap, ) -> Result { let rel_schema = rel_source.schema(); - let rel_builder = + let mut rel_builder = LogicalPlanBuilder::scan(&rel_instance.rel_type, rel_source, None).unwrap(); + // Apply relationship property filters (e.g., -[r {since: 2020}]->) + for (k, v) in relationship_properties.iter() { + let lit_expr = self.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone())); + let filter_expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(col(k)), + op: Operator::Eq, + right: Box::new(lit_expr), + }); + rel_builder = rel_builder.filter(filter_expr).unwrap(); + } + // Use unique alias from rel_instance to avoid column conflicts let rel_qualified_exprs: Vec = rel_schema .fields() @@ -607,11 +687,11 @@ impl DataFusionPlanner { ctx: &PlanningContext, params: &TargetJoinParams, ) -> Result { - // For now, assume target has same label as source (simplified) + // Get the target label from the analysis (which now has the correct label from Expand) let Some(target_label) = ctx .analysis .var_to_label - .get(params.source_variable) + .get(params.target_variable) .cloned() else { return Ok(builder.build().unwrap()); @@ -621,9 +701,21 @@ impl DataFusionPlanner { return Ok(builder.build().unwrap()); }; - // Create target node scan with qualified column aliases + // Create target node scan with qualified column aliases and property filters let target_schema = target_source.schema(); - let target_builder = LogicalPlanBuilder::scan(&target_label, target_source, None).unwrap(); + let mut target_builder = + LogicalPlanBuilder::scan(&target_label, target_source, None).unwrap(); + + // Apply target property filters (e.g., (b {age: 30})) + for (k, v) in params.target_properties.iter() { + let lit_expr = self.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone())); + let filter_expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(col(k)), + op: Operator::Eq, + right: Box::new(lit_expr), + }); + target_builder = target_builder.filter(filter_expr).unwrap(); + } let target_qualified_exprs: Vec = target_schema .fields() @@ -663,174 +755,815 @@ impl DataFusionPlanner { Ok(builder.build().unwrap()) } - // ============================================================================ - // Expression Translators - // ============================================================================ + /// Get the expected qualified column names for variable-length path results + /// + /// Derives the column set from actual source and target node schemas rather than + /// using fragile prefix matching. This prevents accidentally including intermediate + /// node columns or missing renamed properties. + fn get_expected_varlength_columns( + &self, + ctx: &PlanningContext, + source_variable: &str, + target_variable: &str, + ) -> Result> { + use std::collections::HashSet; - fn to_df_boolean_expr(&self, expr: &crate::ast::BooleanExpression) -> Expr { - use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO}; - match expr { - BE::Comparison { - left, - operator, - right, - } => { - let l = self.to_df_value_expr(left); - let r = self.to_df_value_expr(right); - let op = match operator { - CO::Equal => Operator::Eq, - CO::NotEqual => Operator::NotEq, - CO::LessThan => Operator::Lt, - CO::LessThanOrEqual => Operator::LtEq, - CO::GreaterThan => Operator::Gt, - CO::GreaterThanOrEqual => Operator::GtEq, - }; - Expr::BinaryExpr(BinaryExpr { - left: Box::new(l), - op, - right: Box::new(r), - }) - } - BE::In { expression, list } => { - use datafusion::logical_expr::expr::InList as DFInList; - let expr = self.to_df_value_expr(expression); - let list_exprs = list - .iter() - .map(|item| self.to_df_value_expr(item)) - .collect::>(); - Expr::InList(DFInList::new(Box::new(expr), list_exprs, false)) + let mut expected = HashSet::new(); + + let Some(cat) = &self.catalog else { + return Ok(expected); + }; + + // Get source node label and schema + if let Some(source_label) = ctx.analysis.var_to_label.get(source_variable) { + if let Some(source) = cat.node_source(source_label) { + let source_schema = source.schema(); + for field in source_schema.fields() { + let qualified_name = format!("{}__{}", source_variable, field.name()); + expected.insert(qualified_name); + } } - BE::And(l, r) => Expr::BinaryExpr(BinaryExpr { - left: Box::new(self.to_df_boolean_expr(l)), - op: Operator::And, - right: Box::new(self.to_df_boolean_expr(r)), - }), - BE::Or(l, r) => Expr::BinaryExpr(BinaryExpr { - left: Box::new(self.to_df_boolean_expr(l)), - op: Operator::Or, - right: Box::new(self.to_df_boolean_expr(r)), - }), - BE::Not(inner) => Expr::Not(Box::new(self.to_df_boolean_expr(inner))), - BE::Exists(prop) => Expr::IsNotNull(Box::new( - self.to_df_value_expr(&crate::ast::ValueExpression::Property(prop.clone())), - )), - _ => lit(true), } - } - fn to_df_value_expr(&self, expr: &crate::ast::ValueExpression) -> Expr { - use crate::ast::{PropertyValue as PV, ValueExpression as VE}; - match expr { - VE::Property(prop) => { - // Create qualified column name: variable__property - let qualified_name = format!("{}__{}", prop.variable, prop.property); - col(&qualified_name) - } - VE::Variable(v) => col(v), - VE::Literal(PV::String(s)) => lit(s.clone()), - VE::Literal(PV::Integer(i)) => lit(*i), - VE::Literal(PV::Float(f)) => lit(*f), - VE::Literal(PV::Boolean(b)) => lit(*b), - VE::Literal(PV::Null) => { - datafusion::logical_expr::Expr::Literal(datafusion::scalar::ScalarValue::Null, None) - } - VE::Literal(PV::Parameter(_)) => lit(0), - VE::Literal(PV::Property(prop)) => { - // Create qualified column name: variable__property - let qualified_name = format!("{}__{}", prop.variable, prop.property); - col(&qualified_name) + // Get target node label and schema + if let Some(target_label) = ctx.analysis.var_to_label.get(target_variable) { + if let Some(target) = cat.node_source(target_label) { + let target_schema = target.schema(); + for field in target_schema.fields() { + let qualified_name = format!("{}__{}", target_variable, field.name()); + expected.insert(qualified_name); + } } - VE::Function { .. } | VE::Arithmetic { .. } => lit(0), } + + Ok(expected) } -} -#[cfg(test)] -mod tests { - use super::*; - use crate::ast::{ - BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, ValueExpression, - }; - use crate::logical_plan::{LogicalOperator, ProjectionItem}; - use crate::source_catalog::{InMemoryCatalog, SimpleTableSource}; - use arrow_schema::{DataType, Field, Schema}; - use std::sync::Arc; + /// Build variable-length path expansion using unrolling + UNION strategy + /// + /// For a query like: (a)-[:KNOWS*1..3]->(b) + /// This generates: + /// 1-hop plan UNION 2-hop plan UNION 3-hop plan + #[allow(clippy::too_many_arguments)] + fn build_variable_length_expand( + &self, + ctx: &mut PlanningContext, + input: &LogicalOperator, + source_variable: &str, + target_variable: &str, + relationship_types: &[String], + direction: &RelationshipDirection, + min_length: Option, + max_length: Option, + target_properties: &HashMap, + ) -> Result { + let min_hops = min_length.unwrap_or(1).max(1); + let max_hops = max_length.unwrap_or(crate::MAX_VARIABLE_LENGTH_HOPS); + + // Validate range + if min_hops > max_hops { + return Err(crate::error::GraphError::InvalidPattern { + message: format!( + "Invalid variable-length range: min {} > max {}", + min_hops, max_hops + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } - fn person_schema() -> Arc { - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int64, false), - Field::new("name", DataType::Utf8, true), - Field::new("age", DataType::Int64, true), - ])) - } + if max_hops > crate::MAX_VARIABLE_LENGTH_HOPS { + return Err(crate::error::GraphError::UnsupportedFeature { + feature: format!( + "Variable-length paths with max length > {} (got {})", + crate::MAX_VARIABLE_LENGTH_HOPS, + max_hops + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } - fn make_catalog() -> Arc { - let person_src = Arc::new(SimpleTableSource::new(person_schema())); - let knows_schema = Arc::new(Schema::new(vec![ - Field::new("src_person_id", DataType::Int64, false), - Field::new("dst_person_id", DataType::Int64, false), - ])); - let knows_src = Arc::new(SimpleTableSource::new(knows_schema)); - Arc::new( - InMemoryCatalog::new() - .with_node_source("Person", person_src) - .with_relationship_source("KNOWS", knows_src), - ) - } + // Build the input plan (source node scan) + let input_plan = self.build_operator(ctx, input)?; - #[test] - fn test_df_boolean_expr_in_list() { - let cfg = crate::config::GraphConfig::builder().build().unwrap(); - let planner = DataFusionPlanner::new(cfg); - let expr = BooleanExpression::In { - expression: ValueExpression::Property(PropertyRef { - variable: "rel".into(), - property: "relationship_type".into(), - }), - list: vec![ - ValueExpression::Literal(PropertyValue::String("WORKS_FOR".into())), - ValueExpression::Literal(PropertyValue::String("PART_OF".into())), - ], - }; + // Derive expected column names from source and target node schemas + // This ensures we only project columns that actually belong to source/target nodes + let expected_columns = + self.get_expected_varlength_columns(ctx, source_variable, target_variable)?; - if let Expr::InList(in_list) = planner.to_df_boolean_expr(&expr) { - assert!(!in_list.negated); - assert_eq!(in_list.list.len(), 2); - match *in_list.expr { - Expr::Column(ref col_expr) => { - assert_eq!(col_expr.name(), "rel__relationship_type"); - } - other => panic!("Expected column expression, got {:?}", other), - } + // Generate a plan for each hop count and UNION them + let mut plans = Vec::new(); + + for hop_count in min_hops..=max_hops { + let mut plan = self.build_fixed_length_path( + ctx, + input_plan.clone(), + source_variable, + target_variable, + relationship_types, + direction, + hop_count, + target_properties, + )?; + + // Project only source and target columns to ensure consistent schema for UNION + // This removes intermediate node columns that vary by hop count + // Use the pre-computed expected column set derived from actual node schemas + let projection: Vec = plan + .schema() + .fields() + .iter() + .filter(|f| expected_columns.contains(f.name().as_str())) + .map(|f| col(f.name())) + .collect(); + + plan = LogicalPlanBuilder::from(plan) + .project(projection) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to project for UNION: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })? + .build() + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to build projection: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + plans.push(plan); + } + + // UNION all plans together + if plans.len() == 1 { + Ok(plans.into_iter().next().unwrap()) } else { - panic!("Expected InList expression"); + // Build UNION of all plans + let mut union_plan = plans[0].clone(); + for plan in plans.into_iter().skip(1) { + union_plan = LogicalPlanBuilder::from(union_plan) + .union(plan) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to UNION variable-length paths: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })? + .build() + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to build UNION plan: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + } + Ok(union_plan) } } - #[test] - fn test_df_planner_scan_filter_project() { - let scan = LogicalOperator::ScanByLabel { - variable: "n".to_string(), - label: "Person".to_string(), - properties: Default::default(), - }; + /// Build a fixed-length path of N hops + /// + /// For hop_count=3: (a)-[:KNOWS]->(temp1)-[:KNOWS]->(temp2)-[:KNOWS]->(b) + #[allow(clippy::too_many_arguments)] + fn build_fixed_length_path( + &self, + ctx: &mut PlanningContext, + input_plan: LogicalPlan, + source_variable: &str, + target_variable: &str, + relationship_types: &[String], + direction: &RelationshipDirection, + hop_count: u32, + target_properties: &HashMap, + ) -> Result { + let mut current_plan = input_plan; + let mut current_source = source_variable.to_string(); + + for hop_index in 0..hop_count { + let is_last_hop = hop_index == hop_count - 1; + + // Target variable: use actual target on last hop, temp variable otherwise + let current_target = if is_last_hop { + target_variable.to_string() + } else { + format!("_temp_{}_{}", source_variable, hop_index + 1) + }; + + // Build the expansion on top of current plan + // Apply target properties only on the last hop + let props_to_apply = if is_last_hop { + target_properties + } else { + &HashMap::new() + }; + + current_plan = self.build_expand_on_plan( + ctx, + current_plan, + ¤t_source, + ¤t_target, + relationship_types, + direction, + props_to_apply, + )?; - let pred = BooleanExpression::Comparison { - left: ValueExpression::Property(PropertyRef { - variable: "n".to_string(), - property: "age".to_string(), - }), - operator: ComparisonOperator::GreaterThan, - right: ValueExpression::Literal(PropertyValue::Integer(30)), - }; + // Move to next hop + current_source = current_target; + } - let filter = LogicalOperator::Filter { - input: Box::new(scan), - predicate: pred, + Ok(current_plan) + } + + /// Build a single-hop expansion on top of an existing plan + #[allow(clippy::too_many_arguments)] + fn build_expand_on_plan( + &self, + ctx: &mut PlanningContext, + input_plan: LogicalPlan, + source_variable: &str, + target_variable: &str, + relationship_types: &[String], + direction: &RelationshipDirection, + target_properties: &HashMap, + ) -> Result { + let rel_type = + relationship_types + .first() + .ok_or_else(|| crate::error::GraphError::InvalidPattern { + message: "Expand requires at least one relationship type".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let rel_instance = ctx.next_relationship_instance(rel_type)?; + let rel_map = self.get_relationship_mapping(rel_type)?; + let (target_label, node_map) = self.get_target_node_mapping(ctx, target_variable)?; + let catalog = self.get_catalog()?; + + // Build relationship scan and join + let rel_scan = self.build_qualified_relationship_scan(catalog, &rel_instance)?; + let mut builder = self.join_relationship_to_input( + input_plan, + rel_scan, + source_variable, + &rel_instance, + rel_map, + node_map, + direction, + )?; + + // Build target node scan and join + let target_scan = self.build_qualified_target_scan( + catalog, + &target_label, + target_variable, + target_properties, + )?; + builder = self.join_target_to_builder( + builder, + target_scan, + target_variable, + &rel_instance, + rel_map, + node_map, + direction, + )?; + + builder + .build() + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to build expansion plan: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + /// Get relationship mapping from config + fn get_relationship_mapping( + &self, + rel_type: &str, + ) -> Result<&crate::config::RelationshipMapping> { + self.config + .relationship_mappings + .get(rel_type) + .ok_or_else(|| crate::error::GraphError::ConfigError { + message: format!("No mapping found for relationship type: {}", rel_type), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + /// Get target node mapping from context + fn get_target_node_mapping( + &self, + ctx: &PlanningContext, + target_variable: &str, + ) -> Result<(String, &crate::config::NodeMapping)> { + let target_label = ctx + .analysis + .var_to_label + .get(target_variable) + .or_else(|| self.config.node_mappings.keys().next()) + .ok_or_else(|| crate::error::GraphError::ConfigError { + message: format!( + "Cannot infer target label for variable: {}", + target_variable + ), + location: snafu::Location::new(file!(), line!(), column!()), + })? + .clone(); + + let node_map = self + .config + .node_mappings + .get(&target_label) + .ok_or_else(|| crate::error::GraphError::ConfigError { + message: format!("No mapping found for node label: {}", target_label), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + Ok((target_label, node_map)) + } + + /// Get catalog reference + fn get_catalog(&self) -> Result<&Arc> { + self.catalog + .as_ref() + .ok_or_else(|| crate::error::GraphError::ConfigError { + message: "Catalog not available".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + /// Build a qualified relationship scan for expansion + fn build_qualified_relationship_scan( + &self, + catalog: &Arc, + rel_instance: &RelationshipInstance, + ) -> Result { + let rel_source = catalog + .relationship_source(&rel_instance.rel_type) + .ok_or_else(|| crate::error::GraphError::ConfigError { + message: format!( + "No table source found for relationship: {}", + rel_instance.rel_type + ), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let rel_schema = rel_source.schema(); + let rel_builder = LogicalPlanBuilder::scan(&rel_instance.rel_type, rel_source, None) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to scan relationship: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let rel_qualified_exprs: Vec = rel_schema + .fields() + .iter() + .map(|field| { + let qualified_name = format!("{}__{}", rel_instance.alias, field.name()); + col(field.name()).alias(&qualified_name) + }) + .collect(); + + rel_builder + .project(rel_qualified_exprs) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to project relationship: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })? + .build() + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to build relationship scan: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + /// Get relationship join key based on direction (source side) + fn get_source_join_key<'a>( + direction: &RelationshipDirection, + rel_map: &'a crate::config::RelationshipMapping, + ) -> &'a str { + match direction { + RelationshipDirection::Outgoing => &rel_map.source_id_field, + RelationshipDirection::Incoming => &rel_map.target_id_field, + RelationshipDirection::Undirected => &rel_map.source_id_field, + } + } + + /// Get relationship join key based on direction (target side) + fn get_target_join_key<'a>( + direction: &RelationshipDirection, + rel_map: &'a crate::config::RelationshipMapping, + ) -> &'a str { + match direction { + RelationshipDirection::Outgoing => &rel_map.target_id_field, + RelationshipDirection::Incoming => &rel_map.source_id_field, + RelationshipDirection::Undirected => &rel_map.target_id_field, + } + } + + /// Join input plan with relationship scan + #[allow(clippy::too_many_arguments)] + fn join_relationship_to_input( + &self, + input_plan: LogicalPlan, + rel_scan: LogicalPlan, + source_variable: &str, + rel_instance: &RelationshipInstance, + rel_map: &crate::config::RelationshipMapping, + node_map: &crate::config::NodeMapping, + direction: &RelationshipDirection, + ) -> Result { + let source_key = Self::get_source_join_key(direction, rel_map); + let qualified_source_key = format!("{}__{}", source_variable, &node_map.id_field); + let qualified_rel_source_key = format!("{}__{}", rel_instance.alias, source_key); + + LogicalPlanBuilder::from(input_plan) + .join( + rel_scan, + JoinType::Inner, + (vec![qualified_source_key], vec![qualified_rel_source_key]), + None, + ) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to join with relationship: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + /// Build a qualified target node scan with property filters + fn build_qualified_target_scan( + &self, + catalog: &Arc, + target_label: &str, + target_variable: &str, + target_properties: &HashMap, + ) -> Result { + let target_source = catalog.node_source(target_label).ok_or_else(|| { + crate::error::GraphError::ConfigError { + message: format!("No table source found for node label: {}", target_label), + location: snafu::Location::new(file!(), line!(), column!()), + } + })?; + + let target_schema = target_source.schema(); + let mut target_builder = LogicalPlanBuilder::scan(target_label, target_source, None) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to scan target node: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + // Apply target property filters + for (k, v) in target_properties.iter() { + let lit_expr = self.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone())); + let filter_expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(col(k)), + op: Operator::Eq, + right: Box::new(lit_expr), + }); + target_builder = target_builder.filter(filter_expr).map_err(|e| { + crate::error::GraphError::PlanError { + message: format!("Failed to apply target property filter: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + } + })?; + } + + let target_qualified_exprs: Vec = target_schema + .fields() + .iter() + .map(|field| { + let qualified_name = format!("{}__{}", target_variable, field.name()); + col(field.name()).alias(&qualified_name) + }) + .collect(); + + target_builder + .project(target_qualified_exprs) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to project target node: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + })? + .build() + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to build target scan: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + /// Join builder with target node scan + #[allow(clippy::too_many_arguments)] + fn join_target_to_builder( + &self, + builder: LogicalPlanBuilder, + target_scan: LogicalPlan, + target_variable: &str, + rel_instance: &RelationshipInstance, + rel_map: &crate::config::RelationshipMapping, + node_map: &crate::config::NodeMapping, + direction: &RelationshipDirection, + ) -> Result { + let target_key = Self::get_target_join_key(direction, rel_map); + let qualified_rel_target_key = format!("{}__{}", rel_instance.alias, target_key); + let qualified_target_key = format!("{}__{}", target_variable, &node_map.id_field); + + builder + .join( + target_scan, + JoinType::Inner, + (vec![qualified_rel_target_key], vec![qualified_target_key]), + None, + ) + .map_err(|e| crate::error::GraphError::PlanError { + message: format!("Failed to join with target node: {}", e), + location: snafu::Location::new(file!(), line!(), column!()), + }) + } + + // ============================================================================ + // Expression Translators + // ============================================================================ + + fn to_df_boolean_expr(&self, expr: &crate::ast::BooleanExpression) -> Expr { + use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO}; + match expr { + BE::Comparison { + left, + operator, + right, + } => { + let l = self.to_df_value_expr(left); + let r = self.to_df_value_expr(right); + let op = match operator { + CO::Equal => Operator::Eq, + CO::NotEqual => Operator::NotEq, + CO::LessThan => Operator::Lt, + CO::LessThanOrEqual => Operator::LtEq, + CO::GreaterThan => Operator::Gt, + CO::GreaterThanOrEqual => Operator::GtEq, + }; + Expr::BinaryExpr(BinaryExpr { + left: Box::new(l), + op, + right: Box::new(r), + }) + } + BE::In { expression, list } => { + use datafusion::logical_expr::expr::InList as DFInList; + let expr = self.to_df_value_expr(expression); + let list_exprs = list + .iter() + .map(|item| self.to_df_value_expr(item)) + .collect::>(); + Expr::InList(DFInList::new(Box::new(expr), list_exprs, false)) + } + BE::And(l, r) => Expr::BinaryExpr(BinaryExpr { + left: Box::new(self.to_df_boolean_expr(l)), + op: Operator::And, + right: Box::new(self.to_df_boolean_expr(r)), + }), + BE::Or(l, r) => Expr::BinaryExpr(BinaryExpr { + left: Box::new(self.to_df_boolean_expr(l)), + op: Operator::Or, + right: Box::new(self.to_df_boolean_expr(r)), + }), + BE::Not(inner) => Expr::Not(Box::new(self.to_df_boolean_expr(inner))), + BE::Exists(prop) => Expr::IsNotNull(Box::new( + self.to_df_value_expr(&crate::ast::ValueExpression::Property(prop.clone())), + )), + _ => lit(true), + } + } + + fn to_df_value_expr(&self, expr: &crate::ast::ValueExpression) -> Expr { + use crate::ast::{PropertyValue as PV, ValueExpression as VE}; + match expr { + VE::Property(prop) => { + // Create qualified column name: variable__property + let qualified_name = format!("{}__{}", prop.variable, prop.property); + col(&qualified_name) + } + VE::Variable(v) => col(v), + VE::Literal(PV::String(s)) => lit(s.clone()), + VE::Literal(PV::Integer(i)) => lit(*i), + VE::Literal(PV::Float(f)) => lit(*f), + VE::Literal(PV::Boolean(b)) => lit(*b), + VE::Literal(PV::Null) => { + datafusion::logical_expr::Expr::Literal(datafusion::scalar::ScalarValue::Null, None) + } + VE::Literal(PV::Parameter(_)) => lit(0), + VE::Literal(PV::Property(prop)) => { + // Create qualified column name: variable__property + let qualified_name = format!("{}__{}", prop.variable, prop.property); + col(&qualified_name) + } + VE::Function { .. } | VE::Arithmetic { .. } => lit(0), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ast::{ + BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, ValueExpression, + }; + use crate::logical_plan::{LogicalOperator, ProjectionItem}; + use crate::source_catalog::{InMemoryCatalog, SimpleTableSource}; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; + + fn person_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("age", DataType::Int64, true), + ])) + } + + fn make_catalog() -> Arc { + let person_src = Arc::new(SimpleTableSource::new(person_schema())); + let knows_schema = Arc::new(Schema::new(vec![ + Field::new("src_person_id", DataType::Int64, false), + Field::new("dst_person_id", DataType::Int64, false), + ])); + let knows_src = Arc::new(SimpleTableSource::new(knows_schema)); + Arc::new( + InMemoryCatalog::new() + .with_node_source("Person", person_src) + .with_relationship_source("KNOWS", knows_src), + ) + } + + #[test] + fn test_df_boolean_expr_in_list() { + let cfg = crate::config::GraphConfig::builder().build().unwrap(); + let planner = DataFusionPlanner::new(cfg); + let expr = BooleanExpression::In { + expression: ValueExpression::Property(PropertyRef { + variable: "rel".into(), + property: "relationship_type".into(), + }), + list: vec![ + ValueExpression::Literal(PropertyValue::String("WORKS_FOR".into())), + ValueExpression::Literal(PropertyValue::String("PART_OF".into())), + ], + }; + + if let Expr::InList(in_list) = planner.to_df_boolean_expr(&expr) { + assert!(!in_list.negated); + assert_eq!(in_list.list.len(), 2); + match *in_list.expr { + Expr::Column(ref col_expr) => { + assert_eq!(col_expr.name(), "rel__relationship_type"); + } + other => panic!("Expected column expression, got {:?}", other), + } + } else { + panic!("Expected InList expression"); + } + } + + #[test] + fn test_df_planner_scan_filter_project() { + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let pred = BooleanExpression::Comparison { + left: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "age".to_string(), + }), + operator: ComparisonOperator::GreaterThan, + right: ValueExpression::Literal(PropertyValue::Integer(30)), + }; + + let filter = LogicalOperator::Filter { + input: Box::new(scan), + predicate: pred, + }; + + let project = LogicalOperator::Project { + input: Box::new(filter), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".into(), + property: "name".into(), + }), + alias: None, + }], + }; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&project).unwrap(); + + let s = format!("{:?}", df_plan); + assert!(s.contains("Projection"), "plan missing Projection: {}", s); + assert!(s.contains("Filter"), "plan missing Filter: {}", s); + assert!(s.contains("TableScan"), "plan missing TableScan: {}", s); + assert!( + s.contains("Person") || s.contains("person"), + "plan missing table name: {}", + s + ); + } + + #[test] + fn test_df_planner_property_pushdown_filter() { + let mut props = std::collections::HashMap::new(); + props.insert( + "name".to_string(), + PropertyValue::String("Alice".to_string()), + ); + + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: props, + }; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&scan).unwrap(); + + let s = format!("{:?}", df_plan); + assert!(s.contains("Filter"), "plan missing Filter: {}", s); + assert!(s.contains("TableScan"), "plan missing TableScan: {}", s); + assert!( + s.contains("Person") || s.contains("person"), + "plan missing table name: {}", + s + ); + } + + #[test] + fn test_df_planner_expand_creates_join_filter() { + // MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN b.name + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + let expand = LogicalOperator::Expand { + input: Box::new(scan_a), + source_variable: "a".to_string(), + target_variable: "b".to_string(), + target_label: "Person".to_string(), + relationship_types: vec!["KNOWS".to_string()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: None, + properties: Default::default(), + target_properties: Default::default(), + }; + let project = LogicalOperator::Project { + input: Box::new(expand), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "b".into(), + property: "name".into(), + }), + alias: None, + }], }; + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&project).unwrap(); + + let s = format!("{:?}", df_plan); + assert!( + s.contains("Join(") && s.contains("Inner"), + "plan missing Inner Join: {}", + s + ); + assert!( + s.contains("TableScan") && s.contains("person"), + "plan missing person scan: {}", + s + ); + assert!( + s.contains("TableScan") && (s.contains("KNOWS") || s.contains("knows")), + "plan missing relationship scan: {}", + s + ); + } + + #[test] + fn test_scan_aliasing_projects_variable_prefixed_columns() { + // MATCH (n:Person) RETURN n.name + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; let project = LogicalOperator::Project { - input: Box::new(filter), + input: Box::new(scan), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { variable: "n".into(), @@ -849,49 +1582,111 @@ mod tests { let s = format!("{:?}", df_plan); assert!(s.contains("Projection"), "plan missing Projection: {}", s); - assert!(s.contains("Filter"), "plan missing Filter: {}", s); - assert!(s.contains("TableScan"), "plan missing TableScan: {}", s); assert!( - s.contains("Person") || s.contains("person"), - "plan missing table name: {}", + s.contains("n__name"), + "missing qualified projected column n__name: {}", s ); } #[test] - fn test_df_planner_property_pushdown_filter() { - let mut props = std::collections::HashMap::new(); - props.insert( - "name".to_string(), - PropertyValue::String("Alice".to_string()), + fn test_expand_uses_qualified_join_keys_with_type_alias() { + // MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + let expand = LogicalOperator::Expand { + input: Box::new(scan_a), + source_variable: "a".to_string(), + target_variable: "b".to_string(), + target_label: "Person".to_string(), + relationship_types: vec!["KNOWS".to_string()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: None, + properties: Default::default(), + target_properties: Default::default(), + }; + let project = LogicalOperator::Project { + input: Box::new(expand), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "a".into(), + property: "name".into(), + }), + alias: None, + }], + }; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&project).unwrap(); + let s = format!("{:?}", df_plan); + assert!( + s.contains("a__id"), + "missing qualified node id in join: {}", + s + ); + assert!( + s.contains("knows_1__src_person_id"), + "missing qualified rel key in join: {}", + s ); + } - let scan = LogicalOperator::ScanByLabel { - variable: "n".to_string(), + #[test] + fn test_expand_uses_relationship_variable_for_alias() { + // MATCH (a:Person)-[r:KNOWS]->(b:Person) RETURN r.src_person_id + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".to_string(), label: "Person".to_string(), - properties: props, + properties: Default::default(), + }; + let expand = LogicalOperator::Expand { + input: Box::new(scan_a), + source_variable: "a".to_string(), + target_variable: "b".to_string(), + target_label: "Person".to_string(), + relationship_types: vec!["KNOWS".to_string()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: Some("r".to_string()), + properties: Default::default(), + target_properties: Default::default(), + }; + let project = LogicalOperator::Project { + input: Box::new(expand), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "r".into(), + property: "src_person_id".into(), + }), + alias: None, + }], }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&scan).unwrap(); - + let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!(s.contains("Filter"), "plan missing Filter: {}", s); - assert!(s.contains("TableScan"), "plan missing TableScan: {}", s); assert!( - s.contains("Person") || s.contains("person"), - "plan missing table name: {}", + s.contains("r__src_person_id"), + "missing rel-var qualified column: {}", s ); } #[test] - fn test_df_planner_expand_creates_join_filter() { - // MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN b.name + fn test_where_on_relationship_property_with_rel_var() { + // MATCH (a:Person)-[r:KNOWS]->(b:Person) WHERE r.src_person_id = 1 RETURN a.name let scan_a = LogicalOperator::ScanByLabel { variable: "a".to_string(), label: "Person".to_string(), @@ -901,16 +1696,29 @@ mod tests { input: Box::new(scan_a), source_variable: "a".to_string(), target_variable: "b".to_string(), + target_label: "Person".to_string(), relationship_types: vec!["KNOWS".to_string()], direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: None, + relationship_variable: Some("r".to_string()), properties: Default::default(), + target_properties: Default::default(), }; - let project = LogicalOperator::Project { + let filter = LogicalOperator::Filter { input: Box::new(expand), + predicate: BooleanExpression::Comparison { + left: ValueExpression::Property(PropertyRef { + variable: "r".into(), + property: "src_person_id".into(), + }), + operator: ComparisonOperator::Equal, + right: ValueExpression::Literal(PropertyValue::Integer(1)), + }, + }; + let project = LogicalOperator::Project { + input: Box::new(filter), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "b".into(), + variable: "a".into(), property: "name".into(), }), alias: None, @@ -924,63 +1732,153 @@ mod tests { .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); let df_plan = planner.plan(&project).unwrap(); - let s = format!("{:?}", df_plan); + assert!(s.contains("Filter"), "missing Filter: {}", s); assert!( - s.contains("Join(") && s.contains("Inner"), - "plan missing Inner Join: {}", + s.contains("r__src_person_id"), + "missing qualified rel column in filter: {}", s ); + } + + #[test] + fn test_exists_on_relationship_property_is_qualified() { + // MATCH (a:Person)-[r:KNOWS]->(b:Person) WHERE EXISTS(r.src_person_id) RETURN a.name + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + let expand = LogicalOperator::Expand { + input: Box::new(scan_a), + source_variable: "a".to_string(), + target_variable: "b".to_string(), + target_label: "Person".to_string(), + relationship_types: vec!["KNOWS".to_string()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: Some("r".to_string()), + properties: Default::default(), + target_properties: Default::default(), + }; + let pred = BooleanExpression::Exists(PropertyRef { + variable: "r".into(), + property: "src_person_id".into(), + }); + let filter = LogicalOperator::Filter { + input: Box::new(expand), + predicate: pred, + }; + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&filter).unwrap(); + let s = format!("{:?}", df_plan); + assert!(s.contains("Filter"), "missing Filter: {}", s); assert!( - s.contains("TableScan") && s.contains("person"), - "plan missing person scan: {}", + s.contains("r__src_person_id") || s.contains("IsNotNull"), + "missing qualified rel column or IsNotNull in filter: {}", s ); + } + + #[test] + fn test_in_list_on_relationship_property_is_qualified() { + // MATCH (a:Person)-[r:KNOWS]->(b:Person) WHERE r.src_person_id IN [1,2] RETURN a.name + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + let expand = LogicalOperator::Expand { + input: Box::new(scan_a), + source_variable: "a".to_string(), + target_variable: "b".to_string(), + target_label: "Person".to_string(), + relationship_types: vec!["KNOWS".to_string()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: Some("r".to_string()), + properties: Default::default(), + target_properties: Default::default(), + }; + let filter = LogicalOperator::Filter { + input: Box::new(expand), + predicate: BooleanExpression::In { + expression: ValueExpression::Property(PropertyRef { + variable: "r".into(), + property: "src_person_id".into(), + }), + list: vec![ + ValueExpression::Literal(PropertyValue::Integer(1)), + ValueExpression::Literal(PropertyValue::Integer(2)), + ], + }, + }; + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&filter).unwrap(); + let s = format!("{:?}", df_plan); + assert!(s.contains("Filter"), "missing Filter: {}", s); assert!( - s.contains("TableScan") && (s.contains("KNOWS") || s.contains("knows")), - "plan missing relationship scan: {}", + s.contains("r__src_person_id"), + "missing qualified rel column in IN list filter: {}", s ); } #[test] - fn test_scan_aliasing_projects_variable_prefixed_columns() { - // MATCH (n:Person) RETURN n.name - let scan = LogicalOperator::ScanByLabel { - variable: "n".to_string(), + fn test_incoming_join_qualified_keys() { + // MATCH (a:Person)<-[:KNOWS]-(b:Person) RETURN a.name + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".to_string(), label: "Person".to_string(), properties: Default::default(), }; + let expand = LogicalOperator::Expand { + input: Box::new(scan_a), + source_variable: "a".to_string(), + target_variable: "b".to_string(), + target_label: "Person".to_string(), + relationship_types: vec!["KNOWS".to_string()], + direction: crate::ast::RelationshipDirection::Incoming, + relationship_variable: None, + properties: Default::default(), + target_properties: Default::default(), + }; let project = LogicalOperator::Project { - input: Box::new(scan), + input: Box::new(expand), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "n".into(), + variable: "a".into(), property: "name".into(), }), alias: None, }], }; - let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); let df_plan = planner.plan(&project).unwrap(); - let s = format!("{:?}", df_plan); - assert!(s.contains("Projection"), "plan missing Projection: {}", s); assert!( - s.contains("n__name"), - "missing qualified projected column n__name: {}", + s.contains("knows_1__dst_person_id"), + "incoming join should use dst key: {}", s ); } #[test] - fn test_expand_uses_qualified_join_keys_with_type_alias() { - // MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name + fn test_undirected_join_qualified_keys() { + // MATCH (a:Person)-[:KNOWS]-(b:Person) RETURN a.name let scan_a = LogicalOperator::ScanByLabel { variable: "a".to_string(), label: "Person".to_string(), @@ -990,10 +1888,12 @@ mod tests { input: Box::new(scan_a), source_variable: "a".to_string(), target_variable: "b".to_string(), + target_label: "Person".to_string(), relationship_types: vec!["KNOWS".to_string()], - direction: crate::ast::RelationshipDirection::Outgoing, + direction: crate::ast::RelationshipDirection::Undirected, relationship_variable: None, properties: Default::default(), + target_properties: Default::default(), }; let project = LogicalOperator::Project { input: Box::new(expand), @@ -1005,7 +1905,6 @@ mod tests { alias: None, }], }; - let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") .with_relationship("KNOWS", "src_person_id", "dst_person_id") @@ -1014,144 +1913,171 @@ mod tests { let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!( - s.contains("a__id"), - "missing qualified node id in join: {}", - s - ); assert!( s.contains("knows_1__src_person_id"), - "missing qualified rel key in join: {}", + "undirected uses src key side for predicate: {}", s ); } #[test] - fn test_expand_uses_relationship_variable_for_alias() { - // MATCH (a:Person)-[r:KNOWS]->(b:Person) RETURN r.src_person_id - let scan_a = LogicalOperator::ScanByLabel { - variable: "a".to_string(), - label: "Person".to_string(), + fn test_distinct_and_order_with_qualified_columns() { + // ORDER is currently skipped in physical planner; just ensure Distinct appears and plan builds + let scan = LogicalOperator::ScanByLabel { + variable: "n".into(), + label: "Person".into(), properties: Default::default(), }; - let expand = LogicalOperator::Expand { - input: Box::new(scan_a), - source_variable: "a".to_string(), - target_variable: "b".to_string(), - relationship_types: vec!["KNOWS".to_string()], - direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: Some("r".to_string()), + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".into(), + property: "name".into(), + }), + alias: None, + }], + }; + let distinct = LogicalOperator::Distinct { + input: Box::new(project), + }; + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + let df_plan = planner.plan(&distinct).unwrap(); + let s = format!("{:?}", df_plan); + assert!(s.contains("Distinct"), "missing Distinct in plan: {}", s); + } + + #[test] + fn test_skip_limit_after_aliasing() { + let scan = LogicalOperator::ScanByLabel { + variable: "n".into(), + label: "Person".into(), properties: Default::default(), }; let project = LogicalOperator::Project { - input: Box::new(expand), + input: Box::new(scan), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "r".into(), - property: "src_person_id".into(), + variable: "n".into(), + property: "name".into(), }), alias: None, }], }; - + let offset = LogicalOperator::Offset { + input: Box::new(project), + offset: 5, + }; + let limit = LogicalOperator::Limit { + input: Box::new(offset), + count: 10, + }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") - .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&project).unwrap(); + let df_plan = planner.plan(&limit).unwrap(); let s = format!("{:?}", df_plan); - assert!( - s.contains("r__src_person_id"), - "missing rel-var qualified column: {}", - s - ); + assert!(s.contains("Limit"), "missing Limit in plan: {}", s); } #[test] - fn test_where_on_relationship_property_with_rel_var() { - // MATCH (a:Person)-[r:KNOWS]->(b:Person) WHERE r.src_person_id = 1 RETURN a.name + fn test_where_rel_and_node_properties() { + // WHERE r.src_person_id = 1 AND a.age > 30 let scan_a = LogicalOperator::ScanByLabel { - variable: "a".to_string(), - label: "Person".to_string(), + variable: "a".into(), + label: "Person".into(), properties: Default::default(), }; let expand = LogicalOperator::Expand { input: Box::new(scan_a), - source_variable: "a".to_string(), - target_variable: "b".to_string(), - relationship_types: vec!["KNOWS".to_string()], + source_variable: "a".into(), + target_variable: "b".into(), + target_label: "Person".into(), + relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: Some("r".to_string()), + relationship_variable: Some("r".into()), properties: Default::default(), + target_properties: Default::default(), }; - let filter = LogicalOperator::Filter { - input: Box::new(expand), - predicate: BooleanExpression::Comparison { + let pred = BooleanExpression::And( + Box::new(BooleanExpression::Comparison { left: ValueExpression::Property(PropertyRef { variable: "r".into(), property: "src_person_id".into(), }), operator: ComparisonOperator::Equal, right: ValueExpression::Literal(PropertyValue::Integer(1)), - }, - }; - let project = LogicalOperator::Project { - input: Box::new(filter), - projections: vec![ProjectionItem { - expression: ValueExpression::Property(PropertyRef { + }), + Box::new(BooleanExpression::Comparison { + left: ValueExpression::Property(PropertyRef { variable: "a".into(), - property: "name".into(), + property: "age".into(), }), - alias: None, - }], + operator: ComparisonOperator::GreaterThan, + right: ValueExpression::Literal(PropertyValue::Integer(30)), + }), + ); + let filter = LogicalOperator::Filter { + input: Box::new(expand), + predicate: pred, }; - let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&project).unwrap(); + let df_plan = planner.plan(&filter).unwrap(); let s = format!("{:?}", df_plan); assert!(s.contains("Filter"), "missing Filter: {}", s); assert!( s.contains("r__src_person_id"), - "missing qualified rel column in filter: {}", + "missing qualified rel filter: {}", + s + ); + assert!( + s.contains("a__age") || s.contains("age"), + "missing node age filter: {}", s ); } #[test] - fn test_exists_on_relationship_property_is_qualified() { - // MATCH (a:Person)-[r:KNOWS]->(b:Person) WHERE EXISTS(r.src_person_id) RETURN a.name + fn test_exists_and_in_on_node_props_materialized() { + // EXISTS(a.name) and a.age IN [20,30] let scan_a = LogicalOperator::ScanByLabel { - variable: "a".to_string(), - label: "Person".to_string(), - properties: Default::default(), - }; - let expand = LogicalOperator::Expand { - input: Box::new(scan_a), - source_variable: "a".to_string(), - target_variable: "b".to_string(), - relationship_types: vec!["KNOWS".to_string()], - direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: Some("r".to_string()), + variable: "a".into(), + label: "Person".into(), properties: Default::default(), }; - let pred = BooleanExpression::Exists(PropertyRef { - variable: "r".into(), - property: "src_person_id".into(), - }); + let pred = BooleanExpression::And( + Box::new(BooleanExpression::Exists(PropertyRef { + variable: "a".into(), + property: "name".into(), + })), + Box::new(BooleanExpression::In { + expression: ValueExpression::Property(PropertyRef { + variable: "a".into(), + property: "age".into(), + }), + list: vec![ + ValueExpression::Literal(PropertyValue::Integer(20)), + ValueExpression::Literal(PropertyValue::Integer(30)), + ], + }), + ); let filter = LogicalOperator::Filter { - input: Box::new(expand), + input: Box::new(scan_a), predicate: pred, }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") - .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); @@ -1159,41 +2085,45 @@ mod tests { let s = format!("{:?}", df_plan); assert!(s.contains("Filter"), "missing Filter: {}", s); assert!( - s.contains("r__src_person_id") || s.contains("IsNotNull"), - "missing qualified rel column or IsNotNull in filter: {}", + s.contains("a__name") || s.contains("IsNotNull"), + "missing EXISTS on a__name: {}", + s + ); + assert!( + s.contains("a__age") || s.contains("age"), + "missing IN on a.age: {}", s ); } #[test] - fn test_in_list_on_relationship_property_is_qualified() { - // MATCH (a:Person)-[r:KNOWS]->(b:Person) WHERE r.src_person_id IN [1,2] RETURN a.name + fn test_varlength_expand_placeholder_builds() { + // MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) RETURN a.name let scan_a = LogicalOperator::ScanByLabel { - variable: "a".to_string(), - label: "Person".to_string(), + variable: "a".into(), + label: "Person".into(), properties: Default::default(), }; - let expand = LogicalOperator::Expand { + let vlexpand = LogicalOperator::VariableLengthExpand { input: Box::new(scan_a), - source_variable: "a".to_string(), - target_variable: "b".to_string(), - relationship_types: vec!["KNOWS".to_string()], + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: Some("r".to_string()), - properties: Default::default(), + relationship_variable: Some("r".into()), + min_length: Some(1), + max_length: Some(2), + target_properties: HashMap::new(), }; - let filter = LogicalOperator::Filter { - input: Box::new(expand), - predicate: BooleanExpression::In { + let project = LogicalOperator::Project { + input: Box::new(vlexpand), + projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "r".into(), - property: "src_person_id".into(), + variable: "a".into(), + property: "name".into(), }), - list: vec![ - ValueExpression::Literal(PropertyValue::Integer(1)), - ValueExpression::Literal(PropertyValue::Integer(2)), - ], - }, + alias: None, + }], }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") @@ -1201,38 +2131,39 @@ mod tests { .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&filter).unwrap(); + let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!(s.contains("Filter"), "missing Filter: {}", s); assert!( - s.contains("r__src_person_id"), - "missing qualified rel column in IN list filter: {}", + s.contains("Join(") && s.contains("Inner"), + "missing Inner Join: {}", s ); } #[test] - fn test_incoming_join_qualified_keys() { - // MATCH (a:Person)<-[:KNOWS]-(b:Person) RETURN a.name + fn test_varlength_expand_single_hop() { + // MATCH (a:Person)-[:KNOWS*1..1]->(b:Person) - equivalent to single hop let scan_a = LogicalOperator::ScanByLabel { - variable: "a".to_string(), - label: "Person".to_string(), + variable: "a".into(), + label: "Person".into(), properties: Default::default(), }; - let expand = LogicalOperator::Expand { + let vlexpand = LogicalOperator::VariableLengthExpand { input: Box::new(scan_a), - source_variable: "a".to_string(), - target_variable: "b".to_string(), - relationship_types: vec!["KNOWS".to_string()], - direction: crate::ast::RelationshipDirection::Incoming, + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], + direction: crate::ast::RelationshipDirection::Outgoing, relationship_variable: None, - properties: Default::default(), + min_length: Some(1), + max_length: Some(1), + target_properties: HashMap::new(), }; let project = LogicalOperator::Project { - input: Box::new(expand), + input: Box::new(vlexpand), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "a".into(), + variable: "b".into(), property: "name".into(), }), alias: None, @@ -1246,35 +2177,37 @@ mod tests { let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!( - s.contains("knows_1__dst_person_id"), - "incoming join should use dst key: {}", - s - ); + + // Should have joins but no UNION (only 1 hop) + assert!(s.contains("Join(")); + // Single hop shouldn't have Union + assert!(!s.contains("Union")); } #[test] - fn test_undirected_join_qualified_keys() { - // MATCH (a:Person)-[:KNOWS]-(b:Person) RETURN a.name + fn test_varlength_expand_with_union() { + // MATCH (a:Person)-[:KNOWS*2..3]->(b:Person) - should have UNION let scan_a = LogicalOperator::ScanByLabel { - variable: "a".to_string(), - label: "Person".to_string(), + variable: "a".into(), + label: "Person".into(), properties: Default::default(), }; - let expand = LogicalOperator::Expand { + let vlexpand = LogicalOperator::VariableLengthExpand { input: Box::new(scan_a), - source_variable: "a".to_string(), - target_variable: "b".to_string(), - relationship_types: vec!["KNOWS".to_string()], - direction: crate::ast::RelationshipDirection::Undirected, + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], + direction: crate::ast::RelationshipDirection::Outgoing, relationship_variable: None, - properties: Default::default(), + min_length: Some(2), + max_length: Some(3), + target_properties: HashMap::new(), }; let project = LogicalOperator::Project { - input: Box::new(expand), + input: Box::new(vlexpand), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "a".into(), + variable: "b".into(), property: "name".into(), }), alias: None, @@ -1288,117 +2221,125 @@ mod tests { let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!( - s.contains("knows_1__src_person_id"), - "undirected uses src key side for predicate: {}", - s - ); + + // Should have UNION for multiple hop counts + assert!(s.contains("Union") || s.contains("union")); + assert!(s.contains("Join(")); } #[test] - fn test_distinct_and_order_with_qualified_columns() { - // ORDER is currently skipped in physical planner; just ensure Distinct appears and plan builds - let scan = LogicalOperator::ScanByLabel { - variable: "n".into(), + fn test_varlength_expand_default_min() { + // MATCH (a:Person)-[:KNOWS*..3]->(b:Person) - min defaults to 1 + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".into(), label: "Person".into(), properties: Default::default(), }; + let vlexpand = LogicalOperator::VariableLengthExpand { + input: Box::new(scan_a), + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: None, + min_length: None, // Should default to 1 + max_length: Some(3), + target_properties: HashMap::new(), + }; let project = LogicalOperator::Project { - input: Box::new(scan), + input: Box::new(vlexpand), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "n".into(), + variable: "b".into(), property: "name".into(), }), alias: None, }], }; - let distinct = LogicalOperator::Distinct { - input: Box::new(project), - }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&distinct).unwrap(); + let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!(s.contains("Distinct"), "missing Distinct in plan: {}", s); + + // Should build successfully with default min + assert!(s.contains("Join(")); } #[test] - fn test_skip_limit_after_aliasing() { - let scan = LogicalOperator::ScanByLabel { - variable: "n".into(), + fn test_varlength_expand_default_max() { + // MATCH (a:Person)-[:KNOWS*2..]->(b:Person) - max defaults to 20 + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".into(), label: "Person".into(), properties: Default::default(), }; + let vlexpand = LogicalOperator::VariableLengthExpand { + input: Box::new(scan_a), + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: None, + min_length: Some(2), + max_length: None, // Should default to MAX_VARIABLE_LENGTH_HOPS (20) + target_properties: HashMap::new(), + }; let project = LogicalOperator::Project { - input: Box::new(scan), + input: Box::new(vlexpand), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "n".into(), + variable: "b".into(), property: "name".into(), }), alias: None, }], }; - let offset = LogicalOperator::Offset { - input: Box::new(project), - offset: 5, - }; - let limit = LogicalOperator::Limit { - input: Box::new(offset), - count: 10, - }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&limit).unwrap(); + let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!(s.contains("Limit"), "missing Limit in plan: {}", s); + + // Should build successfully with default max + assert!(s.contains("Union") || s.contains("union")); + assert!(s.contains("Join(")); } #[test] - fn test_where_rel_and_node_properties() { - // WHERE r.src_person_id = 1 AND a.age > 30 + fn test_varlength_expand_invalid_range() { + // MATCH (a:Person)-[:KNOWS*3..2]->(b:Person) - min > max should error let scan_a = LogicalOperator::ScanByLabel { variable: "a".into(), label: "Person".into(), properties: Default::default(), }; - let expand = LogicalOperator::Expand { + let vlexpand = LogicalOperator::VariableLengthExpand { input: Box::new(scan_a), source_variable: "a".into(), target_variable: "b".into(), relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: Some("r".into()), - properties: Default::default(), + relationship_variable: None, + min_length: Some(3), + max_length: Some(2), // Invalid: min > max + target_properties: HashMap::new(), }; - let pred = BooleanExpression::And( - Box::new(BooleanExpression::Comparison { - left: ValueExpression::Property(PropertyRef { - variable: "r".into(), - property: "src_person_id".into(), - }), - operator: ComparisonOperator::Equal, - right: ValueExpression::Literal(PropertyValue::Integer(1)), - }), - Box::new(BooleanExpression::Comparison { - left: ValueExpression::Property(PropertyRef { - variable: "a".into(), - property: "age".into(), + let project = LogicalOperator::Project { + input: Box::new(vlexpand), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "b".into(), + property: "name".into(), }), - operator: ComparisonOperator::GreaterThan, - right: ValueExpression::Literal(PropertyValue::Integer(30)), - }), - ); - let filter = LogicalOperator::Filter { - input: Box::new(expand), - predicate: pred, + alias: None, + }], }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") @@ -1406,72 +2347,60 @@ mod tests { .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&filter).unwrap(); - let s = format!("{:?}", df_plan); - assert!(s.contains("Filter"), "missing Filter: {}", s); - assert!( - s.contains("r__src_person_id"), - "missing qualified rel filter: {}", - s - ); - assert!( - s.contains("a__age") || s.contains("age"), - "missing node age filter: {}", - s - ); + let result = planner.plan(&project); + + // Should return error + assert!(result.is_err()); + let err_msg = format!("{:?}", result.unwrap_err()); + assert!(err_msg.contains("Invalid variable-length range")); } #[test] - fn test_exists_and_in_on_node_props_materialized() { - // EXISTS(a.name) and a.age IN [20,30] + fn test_varlength_expand_exceeds_max() { + // MATCH (a:Person)-[:KNOWS*1..25]->(b:Person) - exceeds MAX (20) let scan_a = LogicalOperator::ScanByLabel { variable: "a".into(), label: "Person".into(), properties: Default::default(), }; - let pred = BooleanExpression::And( - Box::new(BooleanExpression::Exists(PropertyRef { - variable: "a".into(), - property: "name".into(), - })), - Box::new(BooleanExpression::In { + let vlexpand = LogicalOperator::VariableLengthExpand { + input: Box::new(scan_a), + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: None, + min_length: Some(1), + max_length: Some(25), // Exceeds MAX_VARIABLE_LENGTH_HOPS + target_properties: HashMap::new(), + }; + let project = LogicalOperator::Project { + input: Box::new(vlexpand), + projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "a".into(), - property: "age".into(), + variable: "b".into(), + property: "name".into(), }), - list: vec![ - ValueExpression::Literal(PropertyValue::Integer(20)), - ValueExpression::Literal(PropertyValue::Integer(30)), - ], - }), - ); - let filter = LogicalOperator::Filter { - input: Box::new(scan_a), - predicate: pred, + alias: None, + }], }; let cfg = crate::config::GraphConfig::builder() .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") .build() .unwrap(); let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); - let df_plan = planner.plan(&filter).unwrap(); - let s = format!("{:?}", df_plan); - assert!(s.contains("Filter"), "missing Filter: {}", s); - assert!( - s.contains("a__name") || s.contains("IsNotNull"), - "missing EXISTS on a__name: {}", - s - ); - assert!( - s.contains("a__age") || s.contains("age"), - "missing IN on a.age: {}", - s - ); + let result = planner.plan(&project); + + // Should return error + assert!(result.is_err()); + let err_msg = format!("{:?}", result.unwrap_err()); + assert!(err_msg.contains("Variable-length paths with max length > 20")); } #[test] - fn test_varlength_expand_placeholder_builds() { - // MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) RETURN a.name + fn test_varlength_expand_with_filter() { + // MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) WHERE b.age > 30 RETURN b.name let scan_a = LogicalOperator::ScanByLabel { variable: "a".into(), label: "Person".into(), @@ -1483,15 +2412,27 @@ mod tests { target_variable: "b".into(), relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, - relationship_variable: Some("r".into()), + relationship_variable: None, min_length: Some(1), max_length: Some(2), + target_properties: HashMap::new(), }; - let project = LogicalOperator::Project { + let filter = LogicalOperator::Filter { input: Box::new(vlexpand), + predicate: BooleanExpression::Comparison { + left: ValueExpression::Property(PropertyRef { + variable: "b".into(), + property: "age".into(), + }), + operator: ComparisonOperator::GreaterThan, + right: ValueExpression::Literal(PropertyValue::Integer(30)), + }, + }; + let project = LogicalOperator::Project { + input: Box::new(filter), projections: vec![ProjectionItem { expression: ValueExpression::Property(PropertyRef { - variable: "a".into(), + variable: "b".into(), property: "name".into(), }), alias: None, @@ -1505,10 +2446,51 @@ mod tests { let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); let df_plan = planner.plan(&project).unwrap(); let s = format!("{:?}", df_plan); - assert!( - s.contains("Join(") && s.contains("Inner"), - "missing Inner Join: {}", - s + + // Should have filter and joins + assert!(s.contains("Filter") || s.contains("filter")); + assert!(s.contains("Join(")); + } + + #[test] + fn test_varlength_expand_analysis_registers_instances() { + // Test that analysis phase correctly registers multiple relationship instances + let scan_a = LogicalOperator::ScanByLabel { + variable: "a".into(), + label: "Person".into(), + properties: Default::default(), + }; + let vlexpand = LogicalOperator::VariableLengthExpand { + input: Box::new(scan_a), + source_variable: "a".into(), + target_variable: "b".into(), + relationship_types: vec!["KNOWS".into()], + direction: crate::ast::RelationshipDirection::Outgoing, + relationship_variable: None, + min_length: Some(1), + max_length: Some(2), + target_properties: HashMap::new(), + }; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_person_id", "dst_person_id") + .build() + .unwrap(); + let planner = DataFusionPlanner::new(cfg); + let analysis = planner.analyze(&vlexpand).unwrap(); + + // For *1..2, should register 1 + 2 = 3 instances + let knows_instances: Vec<_> = analysis + .relationship_instances + .iter() + .filter(|r| r.rel_type == "KNOWS") + .collect(); + + assert_eq!( + knows_instances.len(), + 3, + "Should register 3 KNOWS instances for *1..2" ); } @@ -1524,10 +2506,12 @@ mod tests { input: Box::new(scan_a), source_variable: "a".into(), target_variable: "b".into(), + target_label: "Person".into(), relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, relationship_variable: None, properties: Default::default(), + target_properties: Default::default(), }; let cfg = crate::config::GraphConfig::builder() @@ -1562,19 +2546,23 @@ mod tests { input: Box::new(scan_a), source_variable: "a".into(), target_variable: "b".into(), + target_label: "Person".into(), relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, relationship_variable: None, properties: Default::default(), + target_properties: Default::default(), }; let expand2 = LogicalOperator::Expand { input: Box::new(expand1), source_variable: "b".into(), target_variable: "c".into(), + target_label: "Person".into(), relationship_types: vec!["KNOWS".into()], direction: crate::ast::RelationshipDirection::Outgoing, relationship_variable: None, properties: Default::default(), + target_properties: Default::default(), }; let cfg = crate::config::GraphConfig::builder() diff --git a/rust/lance-graph/src/logical_plan.rs b/rust/lance-graph/src/logical_plan.rs index cc6dc225..2f45eed2 100644 --- a/rust/lance-graph/src/logical_plan.rs +++ b/rust/lance-graph/src/logical_plan.rs @@ -30,26 +30,53 @@ pub enum LogicalOperator { }, /// Traverse relationships (the core graph operation) + /// + /// Represents a single-hop relationship traversal: (source)-[rel]->(target) Expand { + /// The input operator (typically a node scan or previous expand) input: Box, + /// Variable name for the source node (e.g., "a" in (a)-[]->(b)) source_variable: String, + /// Variable name for the target node (e.g., "b" in (a)-[]->(b)) target_variable: String, + /// Label of the target node (e.g., "Person", "Book") + /// This is essential for looking up the correct schema during planning + target_label: String, + /// Types of relationships to traverse (e.g., ["KNOWS", "FRIEND_OF"]) relationship_types: Vec, + /// Direction of traversal (Outgoing, Incoming, or Undirected) direction: RelationshipDirection, + /// Optional variable name for the relationship itself (e.g., "r" in -[r]->) relationship_variable: Option, + /// Property filters to apply on the relationship properties: HashMap, + /// Property filters to apply on the target node + target_properties: HashMap, }, /// Variable-length path expansion (*1..2 syntax) + /// + /// Represents multi-hop relationship traversals: (source)-[rel*min..max]->(target) + /// This is implemented by unrolling into multiple fixed-length paths and unioning them VariableLengthExpand { + /// The input operator (typically a node scan) input: Box, + /// Variable name for the source node source_variable: String, + /// Variable name for the target node (reachable in min..max hops) target_variable: String, + /// Types of relationships to traverse in each hop relationship_types: Vec, + /// Direction of traversal for each hop direction: RelationshipDirection, + /// Optional variable name for the relationship pattern relationship_variable: Option, + /// Minimum number of hops (defaults to 1 if None) min_length: Option, + /// Maximum number of hops (defaults to system max if None) max_length: Option, + /// Property filters to apply on target nodes + target_properties: HashMap, }, /// Project specific columns (RETURN clause) @@ -297,7 +324,8 @@ impl LogicalPlanner { .first() .cloned() .unwrap_or_else(|| "Node".to_string()); - self.variables.insert(target_variable.clone(), target_label); + self.variables + .insert(target_variable.clone(), target_label.clone()); // Optimize fixed-length var-length expansions (*1 or *1..1) let next_plan = match segment.relationship.length.as_ref() { @@ -308,10 +336,12 @@ impl LogicalPlanner { input: Box::new(plan), source_variable: current_src.clone(), target_variable: target_variable.clone(), + target_label: target_label.clone(), relationship_types: segment.relationship.types.clone(), direction: segment.relationship.direction.clone(), relationship_variable: segment.relationship.variable.clone(), properties: segment.relationship.properties.clone(), + target_properties: segment.end_node.properties.clone(), } } Some(length_range) => LogicalOperator::VariableLengthExpand { @@ -323,15 +353,18 @@ impl LogicalPlanner { relationship_variable: segment.relationship.variable.clone(), min_length: length_range.min, max_length: length_range.max, + target_properties: segment.end_node.properties.clone(), }, None => LogicalOperator::Expand { input: Box::new(plan), source_variable: current_src.clone(), target_variable: target_variable.clone(), + target_label: target_label.clone(), relationship_types: segment.relationship.types.clone(), direction: segment.relationship.direction.clone(), relationship_variable: segment.relationship.variable.clone(), properties: segment.relationship.properties.clone(), + target_properties: segment.end_node.properties.clone(), }, }; diff --git a/rust/lance-graph/src/sql_converter.rs b/rust/lance-graph/src/sql_converter.rs index 6180b7f8..43d45b40 100644 --- a/rust/lance-graph/src/sql_converter.rs +++ b/rust/lance-graph/src/sql_converter.rs @@ -55,6 +55,7 @@ impl<'a> LogicalPlanToSqlConverter<'a> { direction, relationship_variable, properties, + .. } => self.convert_expand( input, source_variable, diff --git a/rust/lance-graph/tests/integration_datafusion_pipeline.rs b/rust/lance-graph/tests/integration_datafusion_pipeline.rs index b1a78a3b..bdd5251b 100644 --- a/rust/lance-graph/tests/integration_datafusion_pipeline.rs +++ b/rust/lance-graph/tests/integration_datafusion_pipeline.rs @@ -1,4 +1,4 @@ -use arrow_array::{Int64Array, RecordBatch, StringArray}; +use arrow_array::{Array, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use lance_graph::config::GraphConfig; use lance_graph::query::CypherQuery; @@ -115,6 +115,33 @@ fn create_graph_config() -> GraphConfig { .unwrap() } +// Helper function to execute a query and return results +async fn execute_test_query(cypher: &str) -> RecordBatch { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + let query = CypherQuery::new(cypher).unwrap().with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + query.execute_datafusion(datasets).await.unwrap() +} + +// Helper function to extract string column values +fn get_string_column(batch: &RecordBatch, col_idx: usize) -> Vec { + let array = batch + .column(col_idx) + .as_any() + .downcast_ref::() + .unwrap(); + (0..array.len()) + .map(|i| array.value(i).to_string()) + .collect() +} + // ============================================================================ // Basic Node Query Tests // ============================================================================ @@ -322,10 +349,12 @@ async fn test_datafusion_complex_filtering() { let person_batch = create_person_dataset(); let knows_batch = create_knows_dataset(); - let query = - CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) WHERE a.age > 30 RETURN a.name") - .unwrap() - .with_config(config); + // WHERE a.age > 30 filters source, {age: 30} filters target + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person {age: 30}) WHERE a.age > 30 RETURN a.name", + ) + .unwrap() + .with_config(config); let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); @@ -334,8 +363,8 @@ async fn test_datafusion_complex_filtering() { let result = query.execute_datafusion(datasets).await.unwrap(); assert_eq!(result.num_columns(), 1); - // Bob (35) has 1 edge: 2->3, David (40) has 1 edge: 4->5 - assert_eq!(result.num_rows(), 2); + // Only Bob (35) -> Charlie (30), David doesn't connect to anyone age 30 + assert_eq!(result.num_rows(), 1); // Verify exact results let source_names = result @@ -343,14 +372,8 @@ async fn test_datafusion_complex_filtering() { .as_any() .downcast_ref::() .unwrap(); - let name_set: std::collections::HashSet = (0..result.num_rows()) - .map(|i| source_names.value(i).to_string()) - .collect(); - let expected: std::collections::HashSet = ["Bob", "David"] - .into_iter() - .map(|s| s.to_string()) - .collect(); - assert_eq!(name_set, expected); + // Should only be Bob + assert_eq!(source_names.value(0), "Bob"); } #[tokio::test] @@ -670,38 +693,23 @@ async fn test_datafusion_one_hop_filtered_source_age_strict() { #[tokio::test] async fn test_datafusion_two_hop_basic() { - let config = create_graph_config(); - let person_batch = create_person_dataset(); - let knows_batch = create_knows_dataset(); - // Query: Find friends of friends // Edges: 1->2, 2->3, 3->4, 4->5, 1->3 // Two-hop paths: 1->2->3, 2->3->4, 3->4->5, 1->3->4 - let query = CypherQuery::new( + let out = execute_test_query( "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) RETURN c.name", ) - .unwrap() - .with_config(config); - - let mut datasets = HashMap::new(); - datasets.insert("Person".to_string(), person_batch); - datasets.insert("KNOWS".to_string(), knows_batch); - - let out = query.execute_datafusion(datasets).await.unwrap(); + .await; // Should return: Charlie (from 1->2->3), David (from 2->3->4 and 1->3->4), Eve (from 3->4->5) assert_eq!(out.num_columns(), 1); assert_eq!(out.num_rows(), 4); // 4 two-hop paths - let names = out - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); + let names = get_string_column(&out, 0); let mut counts = HashMap::::new(); - for i in 0..out.num_rows() { - *counts.entry(names.value(i).to_string()).or_insert(0) += 1; + for name in names { + *counts.entry(name).or_insert(0) += 1; } // Verify counts: Charlie:1, David:2, Eve:1 @@ -905,67 +913,68 @@ async fn test_datafusion_two_hop_with_relationship_variable() { #[tokio::test] async fn test_datafusion_two_hop_distinct() { - let config = create_graph_config(); - let person_batch = create_person_dataset(); - let knows_batch = create_knows_dataset(); - // Query: Get distinct final destinations in two-hop paths - let query = CypherQuery::new( + let out = execute_test_query( "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) RETURN DISTINCT c.name", ) - .unwrap() - .with_config(config); + .await; - let mut datasets = HashMap::new(); - datasets.insert("Person".to_string(), person_batch); - datasets.insert("KNOWS".to_string(), knows_batch); - - let out = query.execute_datafusion(datasets).await.unwrap(); - - // Distinct destinations: Charlie, David, Eve + assert_eq!(out.num_columns(), 1); + // Three distinct targets: Charlie, David, Eve assert_eq!(out.num_rows(), 3); - let names = out - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let result_set: std::collections::HashSet = (0..out.num_rows()) - .map(|i| names.value(i).to_string()) - .collect(); - - let expected: std::collections::HashSet = ["Charlie", "David", "Eve"] - .into_iter() - .map(|s| s.to_string()) - .collect(); + let mut names = get_string_column(&out, 0); + names.sort(); - assert_eq!(result_set, expected); + assert_eq!(names, vec!["Charlie", "David", "Eve"]); } #[tokio::test] async fn test_datafusion_two_hop_no_results() { - let config = create_graph_config(); - let person_batch = create_person_dataset(); - let knows_batch = create_knows_dataset(); - // Query: Two-hop starting from Eve (who has no outgoing edges) - let query = CypherQuery::new( + let out = execute_test_query( "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) WHERE a.name = 'Eve' RETURN c.name" ) - .unwrap() - .with_config(config); - - let mut datasets = HashMap::new(); - datasets.insert("Person".to_string(), person_batch); - datasets.insert("KNOWS".to_string(), knows_batch); - - let out = query.execute_datafusion(datasets).await.unwrap(); + .await; // Eve has no outgoing edges, so no two-hop paths assert_eq!(out.num_rows(), 0); } +#[tokio::test] +async fn test_datafusion_varlength_projection_correctness() { + // Test that variable-length path projection correctly handles qualified column names + // and doesn't accidentally include intermediate node columns + let out = execute_test_query( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) RETURN b.name", + ) + .await; + + // Alice can reach: Bob (1-hop), Charlie (1-hop and 2-hop via Bob), David (2-hop via Charlie) + // Total: 4 results (Bob, Charlie, Charlie, David) + assert_eq!(out.num_rows(), 4); + + // Verify schema only contains source and target columns, not intermediate nodes + let schema = out.schema(); + let column_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + + // Should only have b__ prefixed columns (target), no intermediate node columns + for name in &column_names { + assert!( + name.starts_with("b__"), + "Unexpected column in variable-length result: {}", + name + ); + // Ensure no double-qualified names like "b__intermediate__prop" + let remainder = &name[3..]; // Skip "b__" + assert!( + !remainder.contains("__"), + "Column name contains nested qualifiers: {}", + name + ); + } +} + // ============================================================================ // Complex Query Tests (Advanced Filtering & Multi-Condition) // ============================================================================ @@ -1041,9 +1050,11 @@ async fn test_datafusion_two_hop_return_relationship_properties() { let person_batch = create_person_dataset(); let knows_batch = create_knows_dataset(); - // Query: Return relationship properties from two-hop path + // Query: Filter two-hop paths by relationship property on first hop + // Only paths where first relationship has since_year = 2020 + // Alice-[2020]->Bob-[2019]->Charlie is the only match let query = CypherQuery::new( - "MATCH (a:Person)-[r1:KNOWS]->(b:Person)-[r2:KNOWS]->(c:Person) \ + "MATCH (a:Person)-[r1:KNOWS {since_year: 2020}]->(b:Person)-[r2:KNOWS]->(c:Person) \ RETURN a.name, c.name", ) .unwrap() @@ -1055,7 +1066,21 @@ async fn test_datafusion_two_hop_return_relationship_properties() { let out = query.execute_datafusion(datasets).await.unwrap(); assert_eq!(out.num_columns(), 2); - assert_eq!(out.num_rows(), 4); + // Only Alice->Bob->Charlie (Alice-[2020]->Bob-[2019]->Charlie) + assert_eq!(out.num_rows(), 1); + + let sources = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let targets = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(sources.value(0), "Alice"); + assert_eq!(targets.value(0), "Charlie"); } #[tokio::test] @@ -1064,12 +1089,12 @@ async fn test_datafusion_one_hop_with_city_filter() { let person_batch = create_person_dataset(); let knows_batch = create_knows_dataset(); - // Query: Filter targets by city (David has NULL city, should be excluded by comparison) - let query = CypherQuery::new( - "MATCH (a:Person)-[:KNOWS]->(b:Person) WHERE b.city = 'Seattle' RETURN b.name", - ) - .unwrap() - .with_config(config); + // Query: Filter targets by city using inline property filter + // Tests inline property filter instead of WHERE clause + let query = + CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person {city: 'Seattle'}) RETURN b.name") + .unwrap() + .with_config(config); let mut datasets = HashMap::new(); datasets.insert("Person".to_string(), person_batch); @@ -1376,6 +1401,22 @@ async fn test_datafusion_distinct_with_two_hop() { assert_eq!(result_set, expected); } +#[tokio::test] +async fn test_datafusion_expand_with_both_relationship_and_target_filters() { + // Query: Find people Alice knows since 2018 who are age 30 + // Alice-[2020]->Bob(35), Alice-[2018]->Charlie(30) + // Only Charlie matches both filters + let out = execute_test_query( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS {since_year: 2018}]->(b:Person {age: 30}) \ + RETURN b.name", + ) + .await; + + assert_eq!(out.num_rows(), 1); + let names = get_string_column(&out, 0); + assert_eq!(names[0], "Charlie"); +} + // ============================================================================ // ORDER BY Tests // ============================================================================ @@ -1619,37 +1660,19 @@ async fn test_datafusion_order_by_two_hop_query() { #[tokio::test] async fn test_datafusion_order_by_with_distinct() { - let config = create_graph_config(); - let person_batch = create_person_dataset(); - let knows_batch = create_knows_dataset(); - // Query: DISTINCT with ORDER BY - let query = CypherQuery::new( + let out = execute_test_query( "MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN DISTINCT b.name ORDER BY b.name", ) - .unwrap() - .with_config(config); - - let mut datasets = HashMap::new(); - datasets.insert("Person".to_string(), person_batch); - datasets.insert("KNOWS".to_string(), knows_batch); - - let out = query.execute_datafusion(datasets).await.unwrap(); + .await; // Distinct targets: Bob, Charlie, David, Eve assert_eq!(out.num_rows(), 4); - let names = out - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); + let names = get_string_column(&out, 0); // Alphabetical order - assert_eq!(names.value(0), "Bob"); - assert_eq!(names.value(1), "Charlie"); - assert_eq!(names.value(2), "David"); - assert_eq!(names.value(3), "Eve"); + assert_eq!(names, vec!["Bob", "Charlie", "David", "Eve"]); } // ============================================================================ @@ -1829,3 +1852,426 @@ async fn test_datafusion_return_alias_with_order_by() { assert_eq!(names.value(0), "David"); assert_eq!(names.value(1), "Bob"); } + +// ============================================================================ +// Variable-Length Path Tests +// ============================================================================ + +#[tokio::test] +async fn test_datafusion_varlength_single_hop() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: MATCH (a:Person)-[:KNOWS*1..1]->(b:Person) - equivalent to single hop + let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS*1..1]->(b:Person) RETURN b.name") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Same as single-hop: Alice→Bob, Alice→Charlie, Bob→Charlie, Charlie→David, David→Eve + assert_eq!(out.num_rows(), 5); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Collect all target names + let mut targets: Vec = (0..out.num_rows()) + .map(|i| names.value(i).to_string()) + .collect(); + targets.sort(); + + // Should have: Bob, Charlie(x2), David, Eve + assert_eq!(targets, vec!["Bob", "Charlie", "Charlie", "David", "Eve"]); +} + +#[tokio::test] +async fn test_datafusion_varlength_two_hops() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: MATCH (a:Person)-[:KNOWS*2..2]->(b:Person) - exactly 2 hops + let query = + CypherQuery::new("MATCH (a:Person)-[:KNOWS*2..2]->(b:Person) RETURN a.name, b.name") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // 2-hop paths: Alice→Bob→Charlie, Alice→Charlie→David, Bob→Charlie→David, Charlie→David→Eve + assert_eq!(out.num_rows(), 4); + + let sources = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let targets = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Collect all paths + let mut paths: Vec<(String, String)> = (0..out.num_rows()) + .map(|i| (sources.value(i).to_string(), targets.value(i).to_string())) + .collect(); + paths.sort(); + + assert_eq!( + paths, + vec![ + ("Alice".to_string(), "Charlie".to_string()), + ("Alice".to_string(), "David".to_string()), + ("Bob".to_string(), "David".to_string()), + ("Charlie".to_string(), "Eve".to_string()), + ] + ); +} + +#[tokio::test] +async fn test_datafusion_varlength_one_to_two_hops() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) - 1 or 2 hops + let query = CypherQuery::new( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) RETURN b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Alice 1-hop: Bob, Charlie + // Alice 2-hop: Charlie (via Bob), David (via Charlie) + // Total: 4 paths (Bob, Charlie x2, David) + assert_eq!(out.num_rows(), 4); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut targets: Vec = (0..out.num_rows()) + .map(|i| names.value(i).to_string()) + .collect(); + targets.sort(); + + assert_eq!(targets, vec!["Bob", "Charlie", "Charlie", "David"]); +} + +#[tokio::test] +async fn test_datafusion_varlength_with_filter() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Variable-length with filter on target + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) \ + WHERE b.age > 35 \ + RETURN a.name, b.name, b.age", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Only paths ending at David (age 40) + // Alice→Bob→David, Bob→David + let ages = out.column(2).as_any().downcast_ref::().unwrap(); + + for i in 0..out.num_rows() { + assert!(ages.value(i) > 35); + } +} + +#[tokio::test] +async fn test_datafusion_varlength_with_order_by() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Variable-length with ORDER BY + let query = CypherQuery::new( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) \ + RETURN b.name \ + ORDER BY b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 4); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Should be ordered alphabetically: Bob, Charlie (x2), David + assert_eq!(names.value(0), "Bob"); + assert_eq!(names.value(1), "Charlie"); + assert_eq!(names.value(2), "Charlie"); + assert_eq!(names.value(3), "David"); +} + +#[tokio::test] +async fn test_datafusion_varlength_with_limit() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Variable-length with LIMIT + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) \ + RETURN b.name \ + LIMIT 3", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Should limit to 3 results + assert_eq!(out.num_rows(), 3); +} + +#[tokio::test] +async fn test_datafusion_varlength_with_distinct() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Variable-length with DISTINCT + let query = CypherQuery::new( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) \ + RETURN DISTINCT b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Alice reaches: Bob, Charlie, David (3 distinct people within 2 hops) + assert_eq!(out.num_rows(), 3); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut targets: Vec = (0..out.num_rows()) + .map(|i| names.value(i).to_string()) + .collect(); + targets.sort(); + + assert_eq!(targets, vec!["Bob", "Charlie", "David"]); +} + +#[tokio::test] +async fn test_datafusion_varlength_three_hops() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: MATCH (a:Person)-[:KNOWS*3..3]->(b:Person) - exactly 3 hops + let query = CypherQuery::new( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS*3..3]->(b:Person) \ + RETURN b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Alice 3-hop: Alice→Bob→Charlie→David, Alice→Charlie→David→Eve + assert_eq!(out.num_rows(), 2); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut targets: Vec = (0..out.num_rows()) + .map(|i| names.value(i).to_string()) + .collect(); + targets.sort(); + + assert_eq!(targets, vec!["David", "Eve"]); +} + +#[tokio::test] +async fn test_datafusion_varlength_no_results() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Variable-length from Eve (who knows nobody) + let query = CypherQuery::new( + "MATCH (a:Person {name: 'Eve'})-[:KNOWS*1..2]->(b:Person) \ + RETURN b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Eve has no outgoing KNOWS relationships + assert_eq!(out.num_rows(), 0); +} + +#[tokio::test] +async fn test_datafusion_varlength_with_source_filter() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Variable-length with filter on source + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) \ + WHERE a.age > 30 \ + RETURN a.name, b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + let sources = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // All sources should have age > 30 (Bob: 35, David: 40) + for i in 0..out.num_rows() { + let source = sources.value(i); + assert!(source == "Bob" || source == "David"); + } +} + +#[tokio::test] +async fn test_datafusion_varlength_return_source_and_target() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Return both source and target + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS*2..2]->(b:Person) \ + RETURN a.name AS source, b.name AS target \ + ORDER BY source, target", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // 2-hop paths: Alice→Bob→Charlie, Alice→Charlie→David, Bob→Charlie→David, Charlie→David→Eve + assert_eq!(out.num_rows(), 4); + + let sources = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let targets = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Ordered by source, target + assert_eq!(sources.value(0), "Alice"); + assert_eq!(targets.value(0), "Charlie"); + + assert_eq!(sources.value(1), "Alice"); + assert_eq!(targets.value(1), "David"); + + assert_eq!(sources.value(2), "Bob"); + assert_eq!(targets.value(2), "David"); + + assert_eq!(sources.value(3), "Charlie"); + assert_eq!(targets.value(3), "Eve"); +} + +#[tokio::test] +async fn test_datafusion_varlength_count() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Count variable-length paths + let query = CypherQuery::new( + "MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) \ + RETURN b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Alice can reach 4 people within 2 hops + assert_eq!(out.num_rows(), 4); +}