From fcecaef598df1557ef3b026349cf6ef27794e3ef Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 1 Nov 2025 20:45:34 -0700 Subject: [PATCH 1/4] feat: expose datafusion query execution apis --- rust/lance-graph/src/query.rs | 566 +++++++++++++++++++++++++++++++--- 1 file changed, 519 insertions(+), 47 deletions(-) diff --git a/rust/lance-graph/src/query.rs b/rust/lance-graph/src/query.rs index f5a4e187..76e869ba 100644 --- a/rust/lance-graph/src/query.rs +++ b/rust/lance-graph/src/query.rs @@ -84,52 +84,57 @@ impl CypherQuery { &self.parameters } - /// Execute using the DataFusion planner with enhanced filtering support - /// Pipeline: Semantic Analysis -> Logical Plan -> Physical Plan (DataFusion) + /// Execute using the DataFusion planner with in-memory datasets /// - /// This implementation uses DataFusion's DefaultTableSource with proper catalog - /// integration to support filtering and basic query operations. + /// # Overview + /// This convenience method creates both a catalog and session context from the provided + /// in-memory RecordBatches. It's ideal for testing and small datasets that fit in memory. /// - /// WARNING: Experimental API. Semantics (e.g., row multiplicity) and performance characteristics - /// may change as the DataFusion planner matures. Some features like ORDER BY are not yet implemented - /// in this path. Prefer the `execute` for stability, or opt into this method knowingly. + /// For production use with external data sources (CSV, Parquet, databases), use + /// `execute_with_datafusion_context` instead, which automatically builds the catalog + /// from the SessionContext. + /// + /// # Arguments + /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships) + /// + /// # Returns + /// A single RecordBatch containing the query results + /// + /// # Example + /// ```ignore + /// use std::collections::HashMap; + /// use arrow::record_batch::RecordBatch; + /// use lance_graph::query::CypherQuery; + /// + /// // Create in-memory datasets + /// let mut datasets = HashMap::new(); + /// datasets.insert("Person".to_string(), person_batch); + /// datasets.insert("KNOWS".to_string(), knows_batch); + /// + /// // Parse and execute query + /// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name, f.name")? + /// .with_config(config); + /// let result = query.execute_datafusion(datasets).await?; + /// ``` pub async fn execute_datafusion( &self, datasets: HashMap, ) -> Result { - use crate::datafusion_planner::{DataFusionPlanner, GraphPhysicalPlanner}; - use crate::semantic::SemanticAnalyzer; - use arrow::compute::concat_batches; + use crate::source_catalog::InMemoryCatalog; + use datafusion::datasource::{DefaultTableSource, MemTable}; use datafusion::execution::context::SessionContext; - - // Require a config for DataFusion execution - let config = self.config.as_ref().ok_or_else(|| GraphError::PlanError { - message: "Graph configuration is required for DataFusion execution".to_string(), - location: snafu::Location::new(file!(), line!(), column!()), - })?; + use std::sync::Arc; if datasets.is_empty() { - return Err(GraphError::PlanError { + return Err(GraphError::ConfigError { message: "No input datasets provided".to_string(), location: snafu::Location::new(file!(), line!(), column!()), }); } - // Phase 1: Semantic Analysis - let mut analyzer = SemanticAnalyzer::new(config.clone()); - analyzer.analyze(&self.ast)?; - - // Phase 2: Logical Planning - let mut logical_planner = LogicalPlanner::new(); - let logical_plan = logical_planner.plan(&self.ast)?; - // Create session context and catalog, register tables in both let ctx = SessionContext::new(); - use crate::source_catalog::InMemoryCatalog; - use datafusion::datasource::{DefaultTableSource, MemTable}; - use std::sync::Arc; - - let mut catalog = InMemoryCatalog::new(); + let mut catalog: InMemoryCatalog = InMemoryCatalog::new(); for (name, batch) in &datasets { let mem_table = Arc::new( @@ -144,8 +149,9 @@ impl CypherQuery { let table_source = Arc::new(DefaultTableSource::new(mem_table.clone())); // Register as both node and relationship source (planner will use whichever is appropriate) - catalog = catalog.with_node_source(name, table_source.clone()); - catalog = catalog.with_relationship_source(name, table_source); + catalog = catalog + .with_node_source(name, table_source.clone()) + .with_relationship_source(name, table_source.clone()); // Register in session context for execution (using the same MemTable instance) ctx.register_table(name, mem_table) @@ -155,35 +161,214 @@ impl CypherQuery { })?; } - // Use DataFusion planner with catalog that has the actual MemTables - let df_planner = DataFusionPlanner::with_catalog(config.clone(), Arc::new(catalog)); + // Delegate to common execution logic + self.execute_with_catalog_and_context(Arc::new(catalog), ctx) + .await + } + + /// Execute query with a DataFusion SessionContext, automatically building the catalog + /// + /// This is a convenience method that builds the graph catalog by querying the + /// SessionContext for table schemas. The GraphConfig determines which tables to + /// look up (node labels and relationship types). + /// + /// This method is ideal for integrating with DataFusion's rich data source ecosystem + /// (CSV, Parquet, Delta Lake, Iceberg, etc.) without manually building a catalog. + /// + /// # Arguments + /// * `ctx` - DataFusion SessionContext with pre-registered tables + /// + /// # Returns + /// Query results as an Arrow RecordBatch + /// + /// # Errors + /// Returns error if: + /// - GraphConfig is not set (use `.with_config()` first) + /// - Required tables are not registered in the SessionContext + /// - Query execution fails + /// + /// # Example + /// ```ignore + /// use datafusion::execution::context::SessionContext; + /// use datafusion::prelude::CsvReadOptions; + /// use lance_graph::{CypherQuery, GraphConfig}; + /// + /// // Step 1: Create GraphConfig + /// let config = GraphConfig::builder() + /// .with_node_label("Person", "person_id") + /// .with_relationship("KNOWS", "src_id", "dst_id") + /// .build()?; + /// + /// // Step 2: Register data sources in DataFusion + /// let ctx = SessionContext::new(); + /// ctx.register_csv("Person", "data/persons.csv", CsvReadOptions::default()).await?; + /// ctx.register_parquet("KNOWS", "s3://bucket/knows.parquet", Default::default()).await?; + /// + /// // Step 3: Execute query (catalog is built automatically) + /// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name")? + /// .with_config(config); + /// let result = query.execute_with_datafusion_context(ctx).await?; + /// ``` + /// + /// # Note + /// The catalog is built by querying the SessionContext for schemas of tables + /// mentioned in the GraphConfig. Table names must match between GraphConfig + /// (node labels/relationship types) and SessionContext (registered table names). + pub async fn execute_with_datafusion_context( + &self, + ctx: datafusion::execution::context::SessionContext, + ) -> Result { + use crate::source_catalog::InMemoryCatalog; + use datafusion::datasource::DefaultTableSource; + use std::sync::Arc; + + // Require a config + let config = self + .config + .as_ref() + .ok_or_else(|| GraphError::ConfigError { + message: "Graph configuration is required for query execution".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + // Build catalog by querying SessionContext for table providers + let mut catalog = InMemoryCatalog::new(); + + // Register node sources + for label in config.node_mappings.keys() { + let table_provider = + ctx.table_provider(label) + .await + .map_err(|e| GraphError::ConfigError { + message: format!( + "Node label '{}' not found in SessionContext: {}", + label, e + ), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + catalog = catalog.with_node_source(label, table_source); + } + + // Register relationship sources + for rel_type in config.relationship_mappings.keys() { + let table_provider = + ctx.table_provider(rel_type) + .await + .map_err(|e| GraphError::ConfigError { + message: format!( + "Relationship type '{}' not found in SessionContext: {}", + rel_type, e + ), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + catalog = catalog.with_relationship_source(rel_type, table_source); + } + + // Execute using the built catalog + self.execute_with_catalog_and_context(Arc::new(catalog), ctx) + .await + } + + /// Execute query with an explicit catalog and session context + /// + /// This is the most flexible API for advanced users who want to provide their own + /// catalog implementation or have fine-grained control over both the catalog and + /// session context. + /// + /// # Arguments + /// * `catalog` - Graph catalog containing node and relationship schemas for planning + /// * `ctx` - DataFusion SessionContext with registered data sources for execution + /// + /// # Returns + /// Query results as an Arrow RecordBatch + /// + /// # Errors + /// Returns error if query parsing, planning, or execution fails + /// + /// # Example + /// ```ignore + /// use std::sync::Arc; + /// use datafusion::execution::context::SessionContext; + /// use lance_graph::source_catalog::InMemoryCatalog; + /// use lance_graph::query::CypherQuery; + /// + /// // Create custom catalog + /// let catalog = InMemoryCatalog::new() + /// .with_node_source("Person", custom_table_source); + /// + /// // Create SessionContext + /// let ctx = SessionContext::new(); + /// ctx.register_table("Person", custom_table).unwrap(); + /// + /// // Execute with explicit catalog and context + /// let query = CypherQuery::parse("MATCH (p:Person) RETURN p.name")? + /// .with_config(config); + /// let result = query.execute_with_catalog_and_context(Arc::new(catalog), ctx).await?; + /// ``` + pub async fn execute_with_catalog_and_context( + &self, + catalog: std::sync::Arc, + ctx: datafusion::execution::context::SessionContext, + ) -> Result { + use crate::datafusion_planner::{DataFusionPlanner, GraphPhysicalPlanner}; + use crate::semantic::SemanticAnalyzer; + use arrow::compute::concat_batches; + + // Require a config for DataFusion execution + let config = self + .config + .as_ref() + .ok_or_else(|| GraphError::ConfigError { + message: "Graph configuration is required for DataFusion execution".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; + + // Phase 1: Semantic Analysis + let mut analyzer = SemanticAnalyzer::new(config.clone()); + analyzer.analyze(&self.ast)?; + + // Phase 2: Logical Planning + let mut logical_planner = LogicalPlanner::new(); + let logical_plan = logical_planner.plan(&self.ast)?; + + // Phase 3: DataFusion Logical Planning + // Convert graph logical plan to DataFusion logical plan + let df_planner = DataFusionPlanner::with_catalog(config.clone(), catalog); let df_logical_plan = df_planner.plan(&logical_plan)?; - // Execute the logical plan against the registered tables + // Phase 4: Physical Planning and Execution + // DataFusion optimizes the logical plan, creates a physical execution plan, + // and executes it against the pre-configured SessionContext let df = ctx .execute_logical_plan(df_logical_plan) .await - .map_err(|e| GraphError::PlanError { + .map_err(|e| GraphError::ExecutionError { message: format!("Failed to execute DataFusion plan: {}", e), location: snafu::Location::new(file!(), line!(), column!()), })?; + // Get schema before collecting (in case result is empty) + let result_schema = df.schema().inner().clone(); + // Collect results - let batches = df.collect().await.map_err(|e| GraphError::PlanError { - message: format!("Failed to collect DataFusion results: {}", e), + let batches = df.collect().await.map_err(|e| GraphError::ExecutionError { + message: format!("Failed to collect query results: {}", e), location: snafu::Location::new(file!(), line!(), column!()), })?; if batches.is_empty() { - // Return empty batch with schema from first dataset - let first_batch = datasets.values().next().unwrap(); - let empty_batch = arrow::record_batch::RecordBatch::new_empty(first_batch.schema()); - return Ok(empty_batch); + // Return empty batch with the schema from the DataFrame + // This preserves column structure even when there are no rows + return Ok(arrow::record_batch::RecordBatch::new_empty(result_schema)); } // Combine all batches let schema = batches[0].schema(); - concat_batches(&schema, &batches).map_err(|e| GraphError::PlanError { + concat_batches(&schema, &batches).map_err(|e| GraphError::ExecutionError { message: format!("Failed to concatenate result batches: {}", e), location: snafu::Location::new(file!(), line!(), column!()), }) @@ -204,10 +389,13 @@ impl CypherQuery { use std::sync::Arc; // Require a config for now, even if we don't fully exploit it yet - let _config = self.config.as_ref().ok_or_else(|| GraphError::PlanError { - message: "Graph configuration is required for query execution".to_string(), - location: snafu::Location::new(file!(), line!(), column!()), - })?; + let _config = self + .config + .as_ref() + .ok_or_else(|| GraphError::ConfigError { + message: "Graph configuration is required for query execution".to_string(), + location: snafu::Location::new(file!(), line!(), column!()), + })?; if datasets.is_empty() { return Err(GraphError::PlanError { @@ -1775,4 +1963,288 @@ mod tests { ["Alice", "Bob"].iter().map(|s| s.to_string()).collect(); assert_eq!(name_set, expected, "Should return Alice and Bob"); } + + #[tokio::test] + async fn test_execute_with_context_simple_scan() { + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion::datasource::MemTable; + use datafusion::execution::context::SessionContext; + use std::sync::Arc; + + // Create test data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])), + Arc::new(Int64Array::from(vec![28, 34, 29])), + ], + ) + .unwrap(); + + // Create SessionContext and register data source + let mem_table = + Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap()); + let ctx = SessionContext::new(); + ctx.register_table("Person", mem_table).unwrap(); + + // Create query + let cfg = GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let query = CypherQuery::new("MATCH (p:Person) RETURN p.name") + .unwrap() + .with_config(cfg); + + // Execute with context (catalog built automatically) + let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + + // Verify results + assert_eq!(result.num_rows(), 3); + assert_eq!(result.num_columns(), 1); + + let names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(names.value(0), "Alice"); + assert_eq!(names.value(1), "Bob"); + assert_eq!(names.value(2), "Carol"); + } + + #[tokio::test] + async fn test_execute_with_context_with_filter() { + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion::datasource::MemTable; + use datafusion::execution::context::SessionContext; + use std::sync::Arc; + + // Create test data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])), + Arc::new(Int64Array::from(vec![28, 34, 29, 42])), + ], + ) + .unwrap(); + + // Create SessionContext + let mem_table = + Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap()); + let ctx = SessionContext::new(); + ctx.register_table("Person", mem_table).unwrap(); + + // Create query with filter + let cfg = GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age") + .unwrap() + .with_config(cfg); + + // Execute with context + let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + + // Verify: should return Bob (34) and David (42) + assert_eq!(result.num_rows(), 2); + assert_eq!(result.num_columns(), 2); + + let names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ages = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let results: Vec<(String, i64)> = (0..result.num_rows()) + .map(|i| (names.value(i).to_string(), ages.value(i))) + .collect(); + + assert!(results.contains(&("Bob".to_string(), 34))); + assert!(results.contains(&("David".to_string(), 42))); + } + + #[tokio::test] + async fn test_execute_with_context_relationship_traversal() { + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion::datasource::MemTable; + use datafusion::execution::context::SessionContext; + use std::sync::Arc; + + // Create Person nodes + let person_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])); + let person_batch = RecordBatch::try_new( + person_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])), + ], + ) + .unwrap(); + + // Create KNOWS relationships + let knows_schema = Arc::new(Schema::new(vec![ + Field::new("src_id", DataType::Int64, false), + Field::new("dst_id", DataType::Int64, false), + Field::new("since", DataType::Int64, false), + ])); + let knows_batch = RecordBatch::try_new( + knows_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2])), + Arc::new(Int64Array::from(vec![2, 3])), + Arc::new(Int64Array::from(vec![2020, 2021])), + ], + ) + .unwrap(); + + // Create SessionContext and register tables + let person_table = Arc::new( + MemTable::try_new(person_schema.clone(), vec![vec![person_batch.clone()]]).unwrap(), + ); + let knows_table = Arc::new( + MemTable::try_new(knows_schema.clone(), vec![vec![knows_batch.clone()]]).unwrap(), + ); + + let ctx = SessionContext::new(); + ctx.register_table("Person", person_table).unwrap(); + ctx.register_table("KNOWS", knows_table).unwrap(); + + // Create query + let cfg = GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_id", "dst_id") + .build() + .unwrap(); + + let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name") + .unwrap() + .with_config(cfg); + + // Execute with context + let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + + // Verify: should return 2 relationships (Alice->Bob, Bob->Carol) + assert_eq!(result.num_rows(), 2); + assert_eq!(result.num_columns(), 2); + + let src_names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let dst_names = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let relationships: Vec<(String, String)> = (0..result.num_rows()) + .map(|i| { + ( + src_names.value(i).to_string(), + dst_names.value(i).to_string(), + ) + }) + .collect(); + + assert!(relationships.contains(&("Alice".to_string(), "Bob".to_string()))); + assert!(relationships.contains(&("Bob".to_string(), "Carol".to_string()))); + } + + #[tokio::test] + async fn test_execute_with_context_order_by_limit() { + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion::datasource::MemTable; + use datafusion::execution::context::SessionContext; + use std::sync::Arc; + + // Create test data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("score", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])), + Arc::new(Int64Array::from(vec![85, 92, 78, 95])), + ], + ) + .unwrap(); + + // Create SessionContext + let mem_table = + Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap()); + let ctx = SessionContext::new(); + ctx.register_table("Student", mem_table).unwrap(); + + // Create query with ORDER BY and LIMIT + let cfg = GraphConfig::builder() + .with_node_label("Student", "id") + .build() + .unwrap(); + + let query = CypherQuery::new( + "MATCH (s:Student) RETURN s.name, s.score ORDER BY s.score DESC LIMIT 2", + ) + .unwrap() + .with_config(cfg); + + // Execute with context + let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + + // Verify: should return top 2 scores (David: 95, Bob: 92) + assert_eq!(result.num_rows(), 2); + assert_eq!(result.num_columns(), 2); + + let names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let scores = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // First row should be David (95) + assert_eq!(names.value(0), "David"); + assert_eq!(scores.value(0), 95); + + // Second row should be Bob (92) + assert_eq!(names.value(1), "Bob"); + assert_eq!(scores.value(1), 92); + } } From 2f837ff2c2baea267fc0b19d38790cd1a1de6623 Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 1 Nov 2025 20:47:52 -0700 Subject: [PATCH 2/4] fix: fix the unqualified column names --- rust/lance-graph/src/datafusion_planner.rs | 185 +++++++++++++++++- .../tests/test_datafusion_pipeline.rs | 24 +-- 2 files changed, 194 insertions(+), 15 deletions(-) diff --git a/rust/lance-graph/src/datafusion_planner.rs b/rust/lance-graph/src/datafusion_planner.rs index e1cf5d85..7cd5b41f 100644 --- a/rust/lance-graph/src/datafusion_planner.rs +++ b/rust/lance-graph/src/datafusion_planner.rs @@ -372,11 +372,13 @@ impl DataFusionPlanner { .iter() .map(|p| { let expr = self.to_df_value_expr(&p.expression); - // Apply alias if provided + // Apply alias if provided, otherwise use Cypher dot notation if let Some(alias) = &p.alias { expr.alias(alias) } else { - expr + // Convert to Cypher dot notation (e.g., p__name -> p.name) + let cypher_name = self.to_cypher_column_name(&p.expression); + expr.alias(cypher_name) } }) .collect(); @@ -1433,6 +1435,33 @@ impl DataFusionPlanner { VE::Function { .. } | VE::Arithmetic { .. } => lit(0), } } + + /// Convert a ValueExpression to Cypher dot notation for column naming + /// + /// This generates user-friendly column names following Cypher conventions: + /// - Property references: `p.name` (variable.property) + /// - Other expressions: Use the expression as-is + /// + /// This is used when no explicit alias is provided in RETURN clauses. + fn to_cypher_column_name(&self, expr: &crate::ast::ValueExpression) -> String { + use crate::ast::ValueExpression as VE; + match expr { + VE::Property(prop) => { + // Convert to Cypher dot notation: variable.property + format!("{}.{}", prop.variable, prop.property) + } + VE::Variable(v) => v.clone(), + VE::Literal(crate::ast::PropertyValue::Property(prop)) => { + // Handle nested property references + format!("{}.{}", prop.variable, prop.property) + } + _ => { + // For other expressions (literals, functions), use a generic name + // In practice, these should always have explicit aliases + "expr".to_string() + } + } + } } #[cfg(test)] @@ -1549,7 +1578,7 @@ mod tests { } #[test] - fn test_df_planner_property_pushdown_filter() { + fn test_df_planner_inline_property_filter() { let mut props = std::collections::HashMap::new(); props.insert( "name".to_string(), @@ -3042,6 +3071,156 @@ mod tests { ); } + #[test] + fn test_cypher_dot_notation_simple_property() { + // Test that projections without aliases use Cypher dot notation + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "p".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + // Project without alias - should use Cypher dot notation + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "name".to_string(), + }), + alias: None, // No explicit alias + }], + }; + + let df_plan = planner.plan(&project).unwrap(); + let plan_str = format!("{:?}", df_plan); + + // Should contain Cypher dot notation "p.name", not "p__name" + assert!( + plan_str.contains("p.name"), + "Plan should contain Cypher dot notation 'p.name': {}", + plan_str + ); + assert!( + !plan_str.contains("p__name AS"), + "Plan should not contain DataFusion qualified name 'p__name AS': {}", + plan_str + ); + } + + #[test] + fn test_cypher_dot_notation_multiple_properties() { + // Test multiple properties from the same variable + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "p".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + // Project multiple properties without aliases + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "name".to_string(), + }), + alias: None, + }, + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "age".to_string(), + }), + alias: None, + }, + ], + }; + + let df_plan = planner.plan(&project).unwrap(); + let plan_str = format!("{:?}", df_plan); + + // Should contain both Cypher dot notations + assert!( + plan_str.contains("p.name"), + "Plan should contain 'p.name': {}", + plan_str + ); + assert!( + plan_str.contains("p.age"), + "Plan should contain 'p.age': {}", + plan_str + ); + } + + #[test] + fn test_cypher_dot_notation_mixed_with_and_without_alias() { + // Test mix of aliased and non-aliased projections + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "p".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "name".to_string(), + }), + alias: Some("full_name".to_string()), // Explicit alias + }, + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "age".to_string(), + }), + alias: None, // No alias - should use dot notation + }, + ], + }; + + let df_plan = planner.plan(&project).unwrap(); + let plan_str = format!("{:?}", df_plan); + + // Should contain explicit alias + assert!( + plan_str.contains("full_name"), + "Plan should contain explicit alias 'full_name': {}", + plan_str + ); + // Should contain Cypher dot notation for non-aliased property + assert!( + plan_str.contains("p.age"), + "Plan should contain Cypher dot notation 'p.age': {}", + plan_str + ); + } + // ======================================================================== // Failure Scenario Tests // ======================================================================== diff --git a/rust/lance-graph/tests/test_datafusion_pipeline.rs b/rust/lance-graph/tests/test_datafusion_pipeline.rs index 5a220170..a9b036a5 100644 --- a/rust/lance-graph/tests/test_datafusion_pipeline.rs +++ b/rust/lance-graph/tests/test_datafusion_pipeline.rs @@ -1395,22 +1395,22 @@ async fn test_datafusion_varlength_projection_correctness() { // Total: 4 results (Bob, Charlie, Charlie, David) assert_eq!(out.num_rows(), 4); - // Verify schema only contains source and target columns, not intermediate nodes + // Verify schema only contains the requested column (now in Cypher dot notation) let schema = out.schema(); let column_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); - // Should only have b__ prefixed columns (target), no intermediate node columns + // Should only have the 'b.name' column (Cypher dot notation) + assert_eq!(column_names.len(), 1); + assert_eq!( + column_names[0], "b.name", + "Expected Cypher dot notation 'b.name' column" + ); + + // Verify no DataFusion qualified names remain (no __) for name in &column_names { assert!( - name.starts_with("b__"), - "Unexpected column in variable-length result: {}", - name - ); - // Ensure no double-qualified names like "b__intermediate__prop" - let remainder = &name[3..]; // Skip "b__" - assert!( - !remainder.contains("__"), - "Column name contains nested qualifiers: {}", + !name.contains("__"), + "Column name should not contain DataFusion qualifiers: {}", name ); } @@ -1911,7 +1911,7 @@ async fn test_datafusion_return_mixed_with_and_without_alias() { let schema = out.schema(); assert_eq!(schema.fields().len(), 2); assert_eq!(schema.field(0).name(), "full_name"); // Aliased - assert_eq!(schema.field(1).name(), "p__age"); // Not aliased - qualified name + assert_eq!(schema.field(1).name(), "p.age"); // Not aliased - Cypher dot notation } #[tokio::test] From 516e61954369f81c4b20aab044d0ff36a055bc6f Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 1 Nov 2025 20:48:08 -0700 Subject: [PATCH 3/4] test: add integration tests --- .../tests/test_datafusion_wth_context.rs | 343 ++++++++++++++++++ 1 file changed, 343 insertions(+) create mode 100644 rust/lance-graph/tests/test_datafusion_wth_context.rs diff --git a/rust/lance-graph/tests/test_datafusion_wth_context.rs b/rust/lance-graph/tests/test_datafusion_wth_context.rs new file mode 100644 index 00000000..a9516016 --- /dev/null +++ b/rust/lance-graph/tests/test_datafusion_wth_context.rs @@ -0,0 +1,343 @@ +use datafusion::execution::context::SessionContext; +use lance_graph::config::GraphConfig; +use lance_graph::query::CypherQuery; + +#[tokio::test] +async fn test_execute_with_context_csv_simple() { + // Create temporary CSV files for testing + let temp_dir = tempfile::tempdir().unwrap(); + let person_csv_path = temp_dir.path().join("persons.csv"); + let knows_csv_path = temp_dir.path().join("knows.csv"); + + // Write Person CSV + std::fs::write( + &person_csv_path, + "id,name,age\n\ + 1,Alice,28\n\ + 2,Bob,34\n\ + 3,Carol,29\n\ + 4,David,42\n", + ) + .unwrap(); + + // Write KNOWS relationship CSV + std::fs::write( + &knows_csv_path, + "src_id,dst_id,since\n\ + 1,2,2020\n\ + 2,3,2021\n\ + 1,3,2019\n", + ) + .unwrap(); + + // Create graph configuration + let config = GraphConfig::builder() + .with_node_label("Person", "id") + .with_relationship("KNOWS", "src_id", "dst_id") + .build() + .unwrap(); + + // Create SessionContext and register CSV files + let ctx = SessionContext::new(); + + ctx.register_csv( + "Person", + person_csv_path.to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + ctx.register_csv( + "KNOWS", + knows_csv_path.to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + // Test 1: Simple node scan with filter + // Note: No need to manually build catalog - it's automatic! + let query1 = + CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age ORDER BY p.age") + .unwrap() + .with_config(config.clone()); + + let result1 = query1 + .execute_with_datafusion_context(ctx.clone()) + .await + .unwrap(); + + // Should return Bob (34) and David (42) + assert_eq!(result1.num_rows(), 2); + assert_eq!(result1.num_columns(), 2); + + // Verify column names use Cypher dot notation + assert_eq!(result1.schema().field(0).name(), "p.name"); + assert_eq!(result1.schema().field(1).name(), "p.age"); + + let names = result1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ages = result1 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Verify first row is Bob (34) + assert_eq!(names.value(0), "Bob"); + assert_eq!(ages.value(0), 34); + + // Verify second row is David (42) + assert_eq!(names.value(1), "David"); + assert_eq!(ages.value(1), 42); + + // Test 2: Relationship traversal + let query2 = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name ORDER BY a.name", + ) + .unwrap() + .with_config(config); + + let result2 = query2.execute_with_datafusion_context(ctx).await.unwrap(); + + // Should return 3 relationships: Alice->Bob, Alice->Carol, Bob->Carol + assert_eq!(result2.num_rows(), 3); + assert_eq!(result2.num_columns(), 2); + + // Verify column names + assert_eq!(result2.schema().field(0).name(), "a.name"); + assert_eq!(result2.schema().field(1).name(), "b.name"); + + let src_names = result2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let dst_names = result2 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Collect all relationships + let relationships: Vec<(String, String)> = (0..result2.num_rows()) + .map(|i| { + ( + src_names.value(i).to_string(), + dst_names.value(i).to_string(), + ) + }) + .collect(); + + // Verify we have the expected relationships + assert!(relationships.contains(&("Alice".to_string(), "Bob".to_string()))); + assert!(relationships.contains(&("Alice".to_string(), "Carol".to_string()))); + assert!(relationships.contains(&("Bob".to_string(), "Carol".to_string()))); +} + +#[tokio::test] +async fn test_execute_with_context_complex_query() { + // Create temporary CSV files + let temp_dir = tempfile::tempdir().unwrap(); + let employee_csv_path = temp_dir.path().join("employees.csv"); + let department_csv_path = temp_dir.path().join("departments.csv"); + let works_in_csv_path = temp_dir.path().join("works_in.csv"); + + // Write Employee CSV + std::fs::write( + &employee_csv_path, + "emp_id,name,salary\n\ + 101,Alice,75000\n\ + 102,Bob,85000\n\ + 103,Carol,65000\n\ + 104,David,95000\n\ + 105,Eve,72000\n", + ) + .unwrap(); + + // Write Department CSV + std::fs::write( + &department_csv_path, + "dept_id,name,budget\n\ + 1,Engineering,500000\n\ + 2,Sales,300000\n\ + 3,HR,200000\n", + ) + .unwrap(); + + // Write WORKS_IN relationship CSV + std::fs::write( + &works_in_csv_path, + "employee_id,department_id,role\n\ + 101,1,Engineer\n\ + 102,1,Senior Engineer\n\ + 103,2,Sales Rep\n\ + 104,1,Manager\n\ + 105,3,HR Specialist\n", + ) + .unwrap(); + + // Create graph configuration + let config = GraphConfig::builder() + .with_node_label("Employee", "emp_id") + .with_node_label("Department", "dept_id") + .with_relationship("WORKS_IN", "employee_id", "department_id") + .build() + .unwrap(); + + // Create SessionContext and register CSV files + let ctx = SessionContext::new(); + + ctx.register_csv( + "Employee", + employee_csv_path.to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + ctx.register_csv( + "Department", + department_csv_path.to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + ctx.register_csv( + "WORKS_IN", + works_in_csv_path.to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + // Query: Find high-earning employees in Engineering department + let query = CypherQuery::new( + "MATCH (e:Employee)-[:WORKS_IN]->(d:Department) \ + WHERE d.name = 'Engineering' AND e.salary > 80000 \ + RETURN e.name, e.salary, d.name \ + ORDER BY e.salary DESC", + ) + .unwrap() + .with_config(config); + + let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + + // Should return David (95000) and Bob (85000) from Engineering + assert_eq!(result.num_rows(), 2); + assert_eq!(result.num_columns(), 3); + + // Verify column names use Cypher dot notation + assert_eq!(result.schema().field(0).name(), "e.name"); + assert_eq!(result.schema().field(1).name(), "e.salary"); + assert_eq!(result.schema().field(2).name(), "d.name"); + + let emp_names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let salaries = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let dept_names = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + + // First row: David with highest salary + assert_eq!(emp_names.value(0), "David"); + assert_eq!(salaries.value(0), 95000); + assert_eq!(dept_names.value(0), "Engineering"); + + // Second row: Bob + assert_eq!(emp_names.value(1), "Bob"); + assert_eq!(salaries.value(1), 85000); + assert_eq!(dept_names.value(1), "Engineering"); +} + +#[tokio::test] +async fn test_execute_with_context_missing_table() { + let config = GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let ctx = SessionContext::new(); + // Note: Not registering any tables! + + let query = CypherQuery::new("MATCH (p:Person) RETURN p.name") + .unwrap() + .with_config(config); + + let result = query.execute_with_datafusion_context(ctx).await; + + // Should error because Person table is not registered + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Person") && err_msg.contains("not found"), + "Error should mention missing Person table: {}", + err_msg + ); +} + +#[tokio::test] +async fn test_execute_with_context_aliases() { + let temp_dir = tempfile::tempdir().unwrap(); + let person_csv_path = temp_dir.path().join("persons.csv"); + + std::fs::write( + &person_csv_path, + "id,name,age\n\ + 1,Alice,28\n\ + 2,Bob,34\n", + ) + .unwrap(); + + let config = GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_csv( + "Person", + person_csv_path.to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + // Query with explicit aliases + let query = CypherQuery::new( + "MATCH (p:Person) RETURN p.name AS person_name, p.age AS person_age ORDER BY p.age", + ) + .unwrap() + .with_config(config); + + let result = query.execute_with_datafusion_context(ctx).await.unwrap(); + + assert_eq!(result.num_rows(), 2); + + // Verify explicit aliases are preserved + assert_eq!(result.schema().field(0).name(), "person_name"); + assert_eq!(result.schema().field(1).name(), "person_age"); + + let names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(names.value(0), "Alice"); + assert_eq!(names.value(1), "Bob"); +} From b10a2b569c8811e59307634d6aec88f38f3a7d19 Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 1 Nov 2025 20:57:57 -0700 Subject: [PATCH 4/4] fix typo --- ..._datafusion_wth_context.rs => test_datafusion_with_context.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename rust/lance-graph/tests/{test_datafusion_wth_context.rs => test_datafusion_with_context.rs} (100%) diff --git a/rust/lance-graph/tests/test_datafusion_wth_context.rs b/rust/lance-graph/tests/test_datafusion_with_context.rs similarity index 100% rename from rust/lance-graph/tests/test_datafusion_wth_context.rs rename to rust/lance-graph/tests/test_datafusion_with_context.rs