diff --git a/python/python/tests/test_graph.py b/python/python/tests/test_graph.py index a045abc6..da818746 100644 --- a/python/python/tests/test_graph.py +++ b/python/python/tests/test_graph.py @@ -69,18 +69,9 @@ def test_basic_node_selection(graph_env, execute_method): result = getattr(query, execute_method)({"Person": datasets["Person"]}) data = result.to_pydict() - # TODO: remove this if/else statements when the execute() also returns - # Cypher dot notation - if execute_method == "execute": - # execute() returns unqualified names for simple queries - assert set(data.keys()) == {"name", "age"} - assert len(data["name"]) == 4 - assert "Alice" in set(data["name"]) - else: - # execute_datafusion() returns Cypher dot notation - assert set(data.keys()) == {"p.name", "p.age"} - assert len(data["p.name"]) == 4 - assert "Alice" in set(data["p.name"]) + assert set(data.keys()) == {"p.name", "p.age"} + assert len(data["p.name"]) == 4 + assert "Alice" in set(data["p.name"]) @pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) @@ -92,14 +83,9 @@ def test_filtered_query(graph_env, execute_method): result = getattr(query, execute_method)({"Person": datasets["Person"]}) data = result.to_pydict() - if execute_method == "execute": - assert len(data["name"]) == 2 - assert set(data["name"]) == {"Bob", "David"} - assert all(age > 30 for age in data["age"]) - else: - assert len(data["p.name"]) == 2 - assert set(data["p.name"]) == {"Bob", "David"} - assert all(age > 30 for age in data["p.age"]) + assert len(data["p.name"]) == 2 + assert set(data["p.name"]) == {"Bob", "David"} + assert all(age > 30 for age in data["p.age"]) @pytest.mark.parametrize("execute_method", ["execute", "execute_datafusion"]) @@ -209,11 +195,5 @@ def test_distinct_clause(graph_env, execute_method): ) data = result.to_pydict() - if execute_method == "execute": - # execute() returns qualified column names for relationship queries - assert len(data["c__company_name"]) == 3 - assert set(data["c__company_name"]) == {"TechCorp", "DataInc", "CloudSoft"} - else: - # execute_datafusion() returns Cypher dot notation - assert len(data["c.company_name"]) == 3 - assert set(data["c.company_name"]) == {"TechCorp", "DataInc", "CloudSoft"} + assert len(data["c.company_name"]) == 3 + assert set(data["c.company_name"]) == {"TechCorp", "DataInc", "CloudSoft"} diff --git a/python/python/tests/test_knowledge_graph_config.py b/python/python/tests/test_knowledge_graph_config.py index 1e34bf42..0736de4e 100644 --- a/python/python/tests/test_knowledge_graph_config.py +++ b/python/python/tests/test_knowledge_graph_config.py @@ -21,7 +21,7 @@ def test_build_graph_config_from_mapping_supports_simple_nodes(): .execute({"Person": table}) .to_pydict() ) - assert data["person_id"] == [1, 2] + assert data["id"] == [1, 2] def test_build_graph_config_from_mapping_requires_id_field(): diff --git a/rust/lance-graph/src/query.rs b/rust/lance-graph/src/query.rs index 077c3e44..7306a3e8 100644 --- a/rust/lance-graph/src/query.rs +++ b/rust/lance-graph/src/query.rs @@ -15,7 +15,7 @@ use self::path_executor::PathExecutor; mod aliases; mod clauses; mod expr; -use crate::query::expr::{to_df_boolean_expr_with_vars, to_df_literal}; +mod simple_executor; /// A Cypher query that can be executed against Lance datasets #[derive(Debug, Clone)] @@ -601,14 +601,39 @@ impl CypherQuery { Ok(output) } - /// Execute this Cypher query against Lance datasets + /// Execute the query against provided in-memory datasets using the DataFusion planner /// - /// Note: This initial implementation supports a single-table projection/filter/limit - /// workflow to enable basic end-to-end execution. Multi-table/path execution will be - /// wired up via the DataFusion planner in a follow-up. + /// This is the primary execution method that uses the full DataFusion-based planner + /// for comprehensive query support including joins, aggregations, and complex patterns. + /// + /// For legacy single-table queries, use `execute_simple()` instead. pub async fn execute( &self, datasets: HashMap, + ) -> Result { + self.execute_datafusion(datasets).await + } + + /// Explain the query execution plan using the DataFusion planner + /// + /// This method provides a high-level overview of the query execution plan + /// using the DataFusion planner, which is useful for debugging and optimization. + pub async fn explain( + &self, + datasets: HashMap, + ) -> Result { + self.explain_datafusion(datasets).await + } + + /// Execute simple single-table queries (legacy implementation) + /// + /// This method supports basic projection/filter/limit workflows on a single table. + /// For full query support including joins and complex patterns, use `execute()` instead. + /// + /// Note: This implementation is retained for backward compatibility and simple use cases. + pub async fn execute_simple( + &self, + datasets: HashMap, ) -> Result { use arrow::compute::concat_batches; use datafusion::datasource::MemTable; @@ -676,7 +701,9 @@ impl CypherQuery { // Apply WHERE if present (limited support: simple comparisons on a single column) if let Some(where_clause) = &self.ast.where_clause { - if let Some(filter_expr) = to_df_boolean_expr_simple(&where_clause.expression) { + if let Some(filter_expr) = + simple_executor::to_df_boolean_expr_simple(&where_clause.expression) + { df = df.filter(filter_expr).map_err(|e| GraphError::PlanError { message: format!("Failed to apply filter: {}", e), location: snafu::Location::new(file!(), line!(), column!()), @@ -690,7 +717,7 @@ impl CypherQuery { .return_clause .items .iter() - .map(|item| to_df_value_expr_simple(&item.expression)) + .map(|item| simple_executor::to_df_value_expr_simple(&item.expression)) .collect(); if !proj_exprs.is_empty() { df = df.select(proj_exprs).map_err(|e| GraphError::PlanError { @@ -709,7 +736,7 @@ impl CypherQuery { // Apply ORDER BY if present if let Some(order_by) = &self.ast.order_by { - let sort_expr = to_df_order_by_expr_simple(&order_by.items); + let sort_expr = simple_executor::to_df_order_by_expr_simple(&order_by.items); df = df.sort(sort_expr).map_err(|e| GraphError::PlanError { message: format!("Failed to apply ORDER BY: {}", e), location: snafu::Location::new(file!(), line!(), column!()), @@ -745,28 +772,6 @@ impl CypherQuery { Ok(merged) } - /// Validate the query against the provided configuration - pub fn validate(&self) -> Result<()> { - // Check that all referenced labels exist in configuration - for match_clause in &self.ast.match_clauses { - for pattern in &match_clause.patterns { - self.validate_pattern(pattern)?; - } - } - - // Validate WHERE clause if present - if let Some(where_clause) = &self.ast.where_clause { - self.validate_boolean_expression(&where_clause.expression)?; - } - - // Validate RETURN clause - for item in &self.ast.return_clause.items { - self.validate_value_expression(&item.expression)?; - } - - Ok(()) - } - /// Get all node labels referenced in this query pub fn referenced_node_labels(&self) -> Vec { let mut labels = Vec::new(); @@ -812,58 +817,6 @@ impl CypherQuery { variables } - // Validation helper methods - - fn validate_pattern(&self, pattern: &crate::ast::GraphPattern) -> Result<()> { - match pattern { - crate::ast::GraphPattern::Node(node) => { - for label in &node.labels { - if let Some(config) = &self.config { - if config.get_node_mapping(label).is_none() { - return Err(GraphError::PlanError { - message: format!("No mapping found for node label '{}'", label), - location: snafu::Location::new(file!(), line!(), column!()), - }); - } - } - } - Ok(()) - } - crate::ast::GraphPattern::Path(path) => { - self.validate_pattern(&crate::ast::GraphPattern::Node(path.start_node.clone()))?; - for segment in &path.segments { - for rel_type in &segment.relationship.types { - if let Some(config) = &self.config { - if config.get_relationship_mapping(rel_type).is_none() { - return Err(GraphError::PlanError { - message: format!( - "No mapping found for relationship type '{}'", - rel_type - ), - location: snafu::Location::new(file!(), line!(), column!()), - }); - } - } - } - self.validate_pattern(&crate::ast::GraphPattern::Node( - segment.end_node.clone(), - ))?; - } - Ok(()) - } - } - } - - fn validate_boolean_expression(&self, _expr: &crate::ast::BooleanExpression) -> Result<()> { - // TODO: Implement validation of boolean expressions - Ok(()) - } - - fn validate_value_expression(&self, _expr: &crate::ast::ValueExpression) -> Result<()> { - // TODO: Implement validation of value expressions - Ok(()) - } - // Collection helper methods fn collect_node_labels_from_pattern( @@ -1014,521 +967,6 @@ impl CypherQuery { let df = exec.apply_return(df, &self.ast)?; Ok(Some(df)) } - - // Attempt execution for a single-path pattern using joins. - // Supports single-hop and two-hop expansions - #[allow(dead_code)] - async fn try_execute_single_hop_path( - &self, - ctx: &datafusion::prelude::SessionContext, - ) -> Result> { - use crate::ast::{GraphPattern, RelationshipDirection, ValueExpression}; - use datafusion::prelude::*; - - // Only handle a single MATCH with a single path and exactly one segment - let [mc] = self.ast.match_clauses.as_slice() else { - return Ok(None); - }; - let match_clause = mc; - let path = match match_clause.patterns.as_slice() { - [GraphPattern::Path(p)] if (p.segments.len() == 1 || p.segments.len() == 2) => p, - _ => return Ok(None), - }; - let seg = &path.segments[0]; - let rel_type = match seg.relationship.types.first() { - Some(t) => t.as_str(), - None => return Ok(None), - }; - let start_label = match path.start_node.labels.first() { - Some(l) => l.as_str(), - None => return Ok(None), - }; - let end_label = match seg.end_node.labels.first() { - Some(l) => l.as_str(), - None => return Ok(None), - }; - - let start_alias = path.start_node.variable.as_deref().unwrap_or(start_label); - let rel_alias = seg.relationship.variable.as_deref().unwrap_or(rel_type); - let end_alias = seg.end_node.variable.as_deref().unwrap_or(end_label); - - // Validate mappings - let cfg = self.require_config()?; - let start_map = cfg - .get_node_mapping(start_label) - .ok_or_else(|| GraphError::PlanError { - message: format!("No node mapping for '{}'", start_label), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let end_map = cfg - .get_node_mapping(end_label) - .ok_or_else(|| GraphError::PlanError { - message: format!("No node mapping for '{}'", end_label), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let rel_map = - cfg.get_relationship_mapping(rel_type) - .ok_or_else(|| GraphError::PlanError { - message: format!("No relationship mapping for '{}'", rel_type), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // Read tables and alias - let mut left = ctx - .table(start_label) - .await - .map_err(|e| GraphError::PlanError { - message: format!("Failed to read table '{}': {}", start_label, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - // Alias and flatten columns to '__' to avoid ambiguity - let left_schema = left.schema(); - let left_proj: Vec = left_schema - .fields() - .iter() - .map(|f| { - datafusion::logical_expr::col(f.name()).alias(format!( - "{}__{}", - start_alias, - f.name() - )) - }) - .collect(); - left = left - .alias(start_alias)? - .select(left_proj) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to alias/select '{}': {}", start_label, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - for (k, v) in &path.start_node.properties { - let expr = to_df_literal(v); - left = left - .filter(datafusion::logical_expr::Expr::BinaryExpr( - datafusion::logical_expr::BinaryExpr { - left: Box::new(datafusion::logical_expr::col(format!( - "{}__{}", - start_alias, k - ))), - op: datafusion::logical_expr::Operator::Eq, - right: Box::new(expr), - }, - )) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to apply filter: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - - let mut rel_df = ctx - .table(rel_type) - .await - .map_err(|e| GraphError::PlanError { - message: format!("Failed to read table '{}': {}", rel_type, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let rel_schema = rel_df.schema(); - let rel_proj: Vec = rel_schema - .fields() - .iter() - .map(|f| { - datafusion::logical_expr::col(f.name()).alias(format!( - "{}__{}", - rel_alias, - f.name() - )) - }) - .collect(); - rel_df = rel_df - .alias(rel_alias)? - .select(rel_proj) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to alias/select '{}': {}", rel_type, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // Join start -> relationship - let (left_key, right_key) = match seg.relationship.direction { - RelationshipDirection::Outgoing | RelationshipDirection::Undirected => ( - format!("{}__{}", start_alias, start_map.id_field), - format!("{}__{}", rel_alias, rel_map.source_id_field), - ), - RelationshipDirection::Incoming => ( - format!("{}__{}", start_alias, start_map.id_field), - format!("{}__{}", rel_alias, rel_map.target_id_field), - ), - }; - let mut joined = left - .join( - rel_df, - JoinType::Inner, - &[left_key.as_str()], - &[right_key.as_str()], - None, - ) - .map_err(|e| GraphError::PlanError { - message: format!("Join failed (node->rel): {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // Join relationship -> end (or mid, for 2-hop) - let mut right = ctx - .table(end_label) - .await - .map_err(|e| GraphError::PlanError { - message: format!("Failed to read table '{}': {}", end_label, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let right_schema = right.schema(); - let right_proj: Vec = right_schema - .fields() - .iter() - .map(|f| { - datafusion::logical_expr::col(f.name()).alias(format!( - "{}__{}", - end_alias, - f.name() - )) - }) - .collect(); - right = right - .alias(end_alias)? - .select(right_proj) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to alias/select '{}': {}", end_label, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let (left_key2, right_key2) = match seg.relationship.direction { - RelationshipDirection::Outgoing | RelationshipDirection::Undirected => ( - format!("{}__{}", rel_alias, rel_map.target_id_field), - format!("{}__{}", end_alias, end_map.id_field), - ), - RelationshipDirection::Incoming => ( - format!("{}__{}", rel_alias, rel_map.source_id_field), - format!("{}__{}", end_alias, end_map.id_field), - ), - }; - joined = joined - .join( - right, - JoinType::Inner, - &[left_key2.as_str()], - &[right_key2.as_str()], - None, - ) - .map_err(|e| GraphError::PlanError { - message: format!("Join failed (rel->node): {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // If there is a second segment (two-hop), continue chaining joins - if path.segments.len() == 2 { - let seg2 = &path.segments[1]; - let rel_type2 = match seg2.relationship.types.first() { - Some(t) => t.as_str(), - None => return Ok(None), - }; - let end2_label = match seg2.end_node.labels.first() { - Some(l) => l.as_str(), - None => return Ok(None), - }; - - let mid_alias = end_alias; // end of seg1 is the mid node - let mut rel2_alias = seg2 - .relationship - .variable - .as_deref() - .unwrap_or(rel_type2) - .to_string(); - let mut end2_alias = seg2 - .end_node - .variable - .as_deref() - .unwrap_or(end2_label) - .to_string(); - // Ensure unique aliases to avoid duplicate-qualified column names - use std::collections::HashSet; - let mut used_aliases: HashSet = [ - start_alias.to_string(), - rel_alias.to_string(), - end_alias.to_string(), - ] - .into_iter() - .collect(); - let mut uniquify = |alias: &mut String| { - if used_aliases.insert(alias.clone()) { - return; - } - let base = alias.clone(); - let mut i = 2usize; - loop { - let cand = format!("{}_{}", base, i); - if used_aliases.insert(cand.clone()) { - *alias = cand; - break; - } - i += 1; - } - }; - uniquify(&mut rel2_alias); - uniquify(&mut end2_alias); - - // Validate mappings - let _mid_map = end_map; // end of seg1 - let rel2_map = - cfg.get_relationship_mapping(rel_type2) - .ok_or_else(|| GraphError::PlanError { - message: format!("No relationship mapping for '{}'", rel_type2), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - let end2_map = - cfg.get_node_mapping(end2_label) - .ok_or_else(|| GraphError::PlanError { - message: format!("No node mapping for '{}'", end2_label), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // Read rel2 and end2 - let mut rel2_df = ctx - .table(rel_type2) - .await - .map_err(|e| GraphError::PlanError { - message: format!("Failed to read table '{}': {}", rel_type2, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - rel2_df = rel2_df.alias(&rel2_alias)?; - - // Determine the mid-equivalent column from rel1 to avoid ambiguous mid.id on the left - let mid_equiv_from_rel1 = match seg.relationship.direction { - RelationshipDirection::Outgoing | RelationshipDirection::Undirected => { - rel_map.target_id_field.as_str() - } - RelationshipDirection::Incoming => rel_map.source_id_field.as_str(), - }; - - // Join mid -> rel2 using mid-equivalent column from rel1 - let (left_key3, right_key3) = match seg2.relationship.direction { - RelationshipDirection::Outgoing | RelationshipDirection::Undirected => { - (mid_equiv_from_rel1, rel2_map.source_id_field.as_str()) - } - RelationshipDirection::Incoming => { - (mid_equiv_from_rel1, rel2_map.target_id_field.as_str()) - } - }; - joined = joined - .join(rel2_df, JoinType::Inner, &[left_key3], &[right_key3], None) - .map_err(|e| GraphError::PlanError { - message: format!("Join failed (mid->rel2): {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // Join rel2 -> end2 - let mut end2_df = ctx - .table(end2_label) - .await - .map_err(|e| GraphError::PlanError { - message: format!("Failed to read table '{}': {}", end2_label, e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - end2_df = end2_df.alias(&end2_alias)?; - // If left side already contains a column with the same name as the right join key, - // rename the right key to avoid ambiguous unqualified field references. - let mut right_join_key = end2_map.id_field.clone(); - { - let left_schema = joined.schema(); - if left_schema - .fields() - .iter() - .any(|f| f.name() == &right_join_key) - { - use datafusion::logical_expr::{col, Expr}; - let new_key = format!("{}__rhs", right_join_key); - let schema = end2_df.schema(); - let mut proj: Vec = Vec::with_capacity(schema.fields().len()); - for f in schema.fields() { - if f.name() == &right_join_key { - proj.push(col(f.name()).alias(&new_key)); - } else { - proj.push(col(f.name())); - } - } - end2_df = end2_df.select(proj).map_err(|e| GraphError::PlanError { - message: format!("Failed to prepare right join side: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - right_join_key = new_key; - } - } - - let (left_key4, right_key4) = match seg2.relationship.direction { - RelationshipDirection::Outgoing | RelationshipDirection::Undirected => { - (rel2_map.target_id_field.as_str(), right_join_key.as_str()) - } - RelationshipDirection::Incoming => { - (rel2_map.source_id_field.as_str(), right_join_key.as_str()) - } - }; - joined = joined - .join(end2_df, JoinType::Inner, &[left_key4], &[right_key4], None) - .map_err(|e| GraphError::PlanError { - message: format!("Join failed (rel2->end2): {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - - // Update end_alias to refer to final node for projection/WHERE qualification below - let end_alias = end2_alias.as_str(); - - // WHERE (qualified across all known aliases) - if let Some(where_clause) = &self.ast.where_clause { - if let Some(expr) = - to_df_boolean_expr_with_vars(&where_clause.expression, &|var, prop| { - let alias = if Some(var) == path.start_node.variable.as_deref() { - start_alias - } else if Some(var) == seg.relationship.variable.as_deref() { - rel_alias - } else if Some(var) == seg.end_node.variable.as_deref() { - mid_alias - } else if Some(var) == seg2.relationship.variable.as_deref() { - &rel2_alias - } else if Some(var) == seg2.end_node.variable.as_deref() { - end_alias - } else { - var - }; - format!("{}.{}", alias, prop) - }) - { - joined = joined.filter(expr).map_err(|e| GraphError::PlanError { - message: format!("Failed to apply WHERE: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - } - - // Project RETURN across aliases - let mut proj: Vec = Vec::new(); - for item in &self.ast.return_clause.items { - if let ValueExpression::Property(prop) = &item.expression { - let col_name = if Some(prop.variable.as_str()) - == path.start_node.variable.as_deref() - { - format!("{}.{}", start_alias, prop.property) - } else if Some(prop.variable.as_str()) == seg.relationship.variable.as_deref() { - format!("{}.{}", rel_alias, prop.property) - } else if Some(prop.variable.as_str()) == seg.end_node.variable.as_deref() { - format!("{}.{}", mid_alias, prop.property) - } else if Some(prop.variable.as_str()) == seg2.relationship.variable.as_deref() - { - format!("{}.{}", rel2_alias, prop.property) - } else if Some(prop.variable.as_str()) == seg2.end_node.variable.as_deref() { - format!("{}.{}", end_alias, prop.property) - } else { - format!("{}.{}", prop.variable, prop.property) - }; - let mut e = datafusion::logical_expr::col(&col_name); - if let Some(a) = &item.alias { - e = e.alias(a); - } - proj.push(e); - } - } - if !proj.is_empty() { - joined = joined.select(proj).map_err(|e| GraphError::PlanError { - message: format!("Failed to project: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - - // DISTINCT and LIMIT - if self.ast.return_clause.distinct { - joined = joined.distinct().map_err(|e| GraphError::PlanError { - message: format!("Failed to apply DISTINCT: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - if let Some(limit) = self.ast.limit { - joined = - joined - .limit(0, Some(limit as usize)) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to apply LIMIT: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - - return Ok(Some(joined)); - } - - // WHERE (qualified) - if let Some(where_clause) = &self.ast.where_clause { - if let Some(expr) = - to_df_boolean_expr_with_vars(&where_clause.expression, &|var, prop| { - let alias = if Some(var) == path.start_node.variable.as_deref() { - start_alias - } else if Some(var) == seg.relationship.variable.as_deref() { - rel_alias - } else if Some(var) == seg.end_node.variable.as_deref() { - end_alias - } else { - var - }; - format!("{}.{}", alias, prop) - }) - { - joined = joined.filter(expr).map_err(|e| GraphError::PlanError { - message: format!("Failed to apply WHERE: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - } - - // Project RETURN - let mut proj: Vec = Vec::new(); - for item in &self.ast.return_clause.items { - if let ValueExpression::Property(prop) = &item.expression { - let col_name = - if Some(prop.variable.as_str()) == path.start_node.variable.as_deref() { - format!("{}.{}", start_alias, prop.property) - } else if Some(prop.variable.as_str()) == seg.relationship.variable.as_deref() { - format!("{}.{}", rel_alias, prop.property) - } else if Some(prop.variable.as_str()) == seg.end_node.variable.as_deref() { - format!("{}.{}", end_alias, prop.property) - } else { - format!("{}.{}", prop.variable, prop.property) - }; - let mut e = datafusion::logical_expr::col(&col_name); - if let Some(a) = &item.alias { - e = e.alias(a); - } - proj.push(e); - } - } - if !proj.is_empty() { - joined = joined.select(proj).map_err(|e| GraphError::PlanError { - message: format!("Failed to project: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - - // DISTINCT and LIMIT - if self.ast.return_clause.distinct { - joined = joined.distinct().map_err(|e| GraphError::PlanError { - message: format!("Failed to apply DISTINCT: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - if let Some(limit) = self.ast.limit { - joined = joined - .limit(0, Some(limit as usize)) - .map_err(|e| GraphError::PlanError { - message: format!("Failed to apply LIMIT: {}", e), - location: snafu::Location::new(file!(), line!(), column!()), - })?; - } - - Ok(Some(joined)) - } } /// Builder for constructing Cypher queries programmatically @@ -1649,104 +1087,10 @@ impl CypherQueryBuilder { parameters: self.parameters, }; - query.validate()?; Ok(query) } } -/// Minimal translator for simple boolean expressions into DataFusion Expr -fn to_df_boolean_expr_simple( - expr: &crate::ast::BooleanExpression, -) -> Option { - use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO, ValueExpression as VE}; - use datafusion::logical_expr::{col, Expr, Operator}; - match expr { - BE::Comparison { - left, - operator, - right, - } => { - // Only support property literal - let (col_name, lit_expr) = match (left, right) { - (VE::Property(prop), VE::Literal(val)) => { - (prop.property.clone(), to_df_literal(val)) - } - (VE::Literal(val), VE::Property(prop)) => { - (prop.property.clone(), to_df_literal(val)) - } - _ => return None, - }; - let op = match operator { - CO::Equal => Operator::Eq, - CO::NotEqual => Operator::NotEq, - CO::LessThan => Operator::Lt, - CO::LessThanOrEqual => Operator::LtEq, - CO::GreaterThan => Operator::Gt, - CO::GreaterThanOrEqual => Operator::GtEq, - }; - Some(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr { - left: Box::new(col(col_name)), - op, - right: Box::new(lit_expr), - })) - } - BE::And(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( - datafusion::logical_expr::BinaryExpr { - left: Box::new(to_df_boolean_expr_simple(l)?), - op: Operator::And, - right: Box::new(to_df_boolean_expr_simple(r)?), - }, - )), - BE::Or(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( - datafusion::logical_expr::BinaryExpr { - left: Box::new(to_df_boolean_expr_simple(l)?), - op: Operator::Or, - right: Box::new(to_df_boolean_expr_simple(r)?), - }, - )), - BE::Not(inner) => Some(datafusion::logical_expr::Expr::Not(Box::new( - to_df_boolean_expr_simple(inner)?, - ))), - BE::Exists(prop) => Some(datafusion::logical_expr::Expr::IsNotNull(Box::new( - datafusion::logical_expr::Expr::Column(datafusion::common::Column::from_name( - prop.property.clone(), - )), - ))), - _ => None, - } -} - -/// Build ORDER BY expressions for simple queries (single table) -fn to_df_order_by_expr_simple( - items: &[crate::ast::OrderByItem], -) -> Vec { - use datafusion::logical_expr::SortExpr; - - items - .iter() - .map(|item| { - let expr = to_df_value_expr_simple(&item.expression); - let asc = matches!(item.direction, crate::ast::SortDirection::Ascending); - SortExpr { - expr, - asc, - nulls_first: false, - } - }) - .collect() -} - -fn to_df_value_expr_simple(expr: &crate::ast::ValueExpression) -> datafusion::logical_expr::Expr { - use crate::ast::ValueExpression as VE; - use datafusion::logical_expr::{col, lit}; - match expr { - VE::Property(prop) => col(&prop.property), - VE::Variable(v) => col(v), - VE::Literal(v) => crate::query::expr::to_df_literal(v), - VE::Function { .. } | VE::Arithmetic { .. } => lit(0), - } -} - #[cfg(test)] mod tests { use super::*; @@ -1833,7 +1177,7 @@ mod tests { let mut data = HashMap::new(); data.insert("people".to_string(), batch); - let out = q.execute(data).await.unwrap(); + let out = q.execute_simple(data).await.unwrap(); assert_eq!(out.num_rows(), 2); let names = out .column(0) @@ -1902,7 +1246,7 @@ mod tests { data.insert("Person".to_string(), people); data.insert("KNOWS".to_string(), knows); - let out = q.execute(data).await.unwrap(); + let out = q.execute_simple(data).await.unwrap(); // Expect two rows: Bob, Carol (the targets) let names = out .column(0) @@ -1950,7 +1294,7 @@ mod tests { let mut data = HashMap::new(); data.insert("people".to_string(), batch); - let out = q.execute(data).await.unwrap(); + let out = q.execute_simple(data).await.unwrap(); let ages = out.column(1).as_any().downcast_ref::().unwrap(); let collected: Vec = (0..out.num_rows()).map(|i| ages.value(i)).collect(); assert_eq!(collected, vec![28, 29, 34, 42]); @@ -1989,7 +1333,7 @@ mod tests { let mut data = HashMap::new(); data.insert("people".to_string(), batch); - let out = q.execute(data).await.unwrap(); + let out = q.execute_simple(data).await.unwrap(); assert_eq!(out.num_rows(), 2); let ages = out.column(0).as_any().downcast_ref::().unwrap(); let collected: Vec = (0..out.num_rows()).map(|i| ages.value(i)).collect(); @@ -2021,7 +1365,7 @@ mod tests { let mut data = HashMap::new(); data.insert("people".to_string(), batch); - let out = q.execute(data).await.unwrap(); + let out = q.execute_simple(data).await.unwrap(); assert_eq!(out.num_rows(), 2); let ages = out.column(0).as_any().downcast_ref::().unwrap(); let collected: Vec = (0..out.num_rows()).map(|i| ages.value(i)).collect(); @@ -2084,7 +1428,7 @@ mod tests { } // For comparison, try legacy execution - let legacy_result = query.execute(datasets).await.unwrap(); + let legacy_result = query.execute_simple(datasets).await.unwrap(); println!( "Legacy result: {} rows, {} columns", legacy_result.num_rows(), diff --git a/rust/lance-graph/src/query/simple_executor.rs b/rust/lance-graph/src/query/simple_executor.rs new file mode 100644 index 00000000..5a95bf09 --- /dev/null +++ b/rust/lance-graph/src/query/simple_executor.rs @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Helper functions for simple single-table query execution + +/// Minimal translator for simple boolean expressions into DataFusion Expr +pub(super) fn to_df_boolean_expr_simple( + expr: &crate::ast::BooleanExpression, +) -> Option { + use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO, ValueExpression as VE}; + use crate::query::expr::to_df_literal; + use datafusion::logical_expr::{col, Expr, Operator}; + match expr { + BE::Comparison { + left, + operator, + right, + } => { + let (col_name, lit_expr) = match (left, right) { + (VE::Property(prop), VE::Literal(val)) => { + (prop.property.clone(), to_df_literal(val)) + } + (VE::Literal(val), VE::Property(prop)) => { + (prop.property.clone(), to_df_literal(val)) + } + _ => return None, + }; + let op = match operator { + CO::Equal => Operator::Eq, + CO::NotEqual => Operator::NotEq, + CO::LessThan => Operator::Lt, + CO::LessThanOrEqual => Operator::LtEq, + CO::GreaterThan => Operator::Gt, + CO::GreaterThanOrEqual => Operator::GtEq, + }; + Some(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr { + left: Box::new(col(col_name)), + op, + right: Box::new(lit_expr), + })) + } + BE::And(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( + datafusion::logical_expr::BinaryExpr { + left: Box::new(to_df_boolean_expr_simple(l)?), + op: Operator::And, + right: Box::new(to_df_boolean_expr_simple(r)?), + }, + )), + BE::Or(l, r) => Some(datafusion::logical_expr::Expr::BinaryExpr( + datafusion::logical_expr::BinaryExpr { + left: Box::new(to_df_boolean_expr_simple(l)?), + op: Operator::Or, + right: Box::new(to_df_boolean_expr_simple(r)?), + }, + )), + BE::Not(inner) => Some(datafusion::logical_expr::Expr::Not(Box::new( + to_df_boolean_expr_simple(inner)?, + ))), + BE::Exists(prop) => Some(datafusion::logical_expr::Expr::IsNotNull(Box::new( + datafusion::logical_expr::Expr::Column(datafusion::common::Column::from_name( + prop.property.clone(), + )), + ))), + _ => None, + } +} + +/// Build ORDER BY expressions for simple queries +pub(super) fn to_df_order_by_expr_simple( + items: &[crate::ast::OrderByItem], +) -> Vec { + use datafusion::logical_expr::SortExpr; + items + .iter() + .map(|item| { + let expr = to_df_value_expr_simple(&item.expression); + let asc = matches!(item.direction, crate::ast::SortDirection::Ascending); + SortExpr { + expr, + asc, + nulls_first: false, + } + }) + .collect() +} + +/// Build value expressions for simple queries +pub(super) fn to_df_value_expr_simple( + expr: &crate::ast::ValueExpression, +) -> datafusion::logical_expr::Expr { + use crate::ast::ValueExpression as VE; + use crate::query::expr::to_df_literal; + use datafusion::logical_expr::{col, lit}; + match expr { + VE::Property(prop) => col(&prop.property), + VE::Variable(v) => col(v), + VE::Literal(v) => to_df_literal(v), + VE::Function { .. } | VE::Arithmetic { .. } => lit(0), + } +}