From a3406bdb0fb79bc2ec6ed064cb2cdbf0d0fa99c1 Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Wed, 29 Oct 2025 19:46:11 -0700 Subject: [PATCH] feat: support column alias in datafusion --- rust/lance-graph/src/datafusion_planner.rs | 135 ++++++++++++- .../tests/integration_datafusion_pipeline.rs | 178 ++++++++++++++++++ 2 files changed, 312 insertions(+), 1 deletion(-) diff --git a/rust/lance-graph/src/datafusion_planner.rs b/rust/lance-graph/src/datafusion_planner.rs index b20b351a..c2aeba3e 100644 --- a/rust/lance-graph/src/datafusion_planner.rs +++ b/rust/lance-graph/src/datafusion_planner.rs @@ -321,7 +321,15 @@ impl DataFusionPlanner { let input_plan = self.build_operator(ctx, input)?; let exprs: Vec = projections .iter() - .map(|p| self.to_df_value_expr(&p.expression)) + .map(|p| { + let expr = self.to_df_value_expr(&p.expression); + // Apply alias if provided + if let Some(alias) = &p.alias { + expr.alias(alias) + } else { + expr + } + }) .collect(); Ok(LogicalPlanBuilder::from(input_plan) .project(exprs) @@ -1800,4 +1808,129 @@ mod tests { assert!(s.contains("Sort") || s.contains("sort")); assert!(s.contains("n__name")); } + + #[test] + fn test_project_with_alias() { + use crate::ast::{PropertyRef, ValueExpression}; + use crate::logical_plan::{LogicalOperator, ProjectionItem}; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + alias: Some("person_name".to_string()), + }], + }; + + let df_plan = planner.plan(&project).unwrap(); + let s = format!("{:?}", df_plan); + + // Should contain the alias + assert!(s.contains("person_name")); + } + + #[test] + fn test_project_with_multiple_aliases() { + use crate::ast::{PropertyRef, ValueExpression}; + use crate::logical_plan::{LogicalOperator, ProjectionItem}; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "p".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "name".to_string(), + }), + alias: Some("name".to_string()), + }, + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "age".to_string(), + }), + alias: Some("age".to_string()), + }, + ], + }; + + let df_plan = planner.plan(&project).unwrap(); + let s = format!("{:?}", df_plan); + + // Should contain both aliases + assert!(s.contains("name")); + assert!(s.contains("age")); + } + + #[test] + fn test_project_mixed_with_and_without_alias() { + use crate::ast::{PropertyRef, ValueExpression}; + use crate::logical_plan::{LogicalOperator, ProjectionItem}; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "p".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "name".to_string(), + }), + alias: Some("full_name".to_string()), + }, + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "p".to_string(), + property: "age".to_string(), + }), + alias: None, // No alias - should use qualified name + }, + ], + }; + + let df_plan = planner.plan(&project).unwrap(); + let s = format!("{:?}", df_plan); + + // Should contain the alias and the qualified name + assert!(s.contains("full_name")); + assert!(s.contains("p__age")); + } } diff --git a/rust/lance-graph/tests/integration_datafusion_pipeline.rs b/rust/lance-graph/tests/integration_datafusion_pipeline.rs index 97a00427..b1a78a3b 100644 --- a/rust/lance-graph/tests/integration_datafusion_pipeline.rs +++ b/rust/lance-graph/tests/integration_datafusion_pipeline.rs @@ -1651,3 +1651,181 @@ async fn test_datafusion_order_by_with_distinct() { assert_eq!(names.value(2), "David"); assert_eq!(names.value(3), "Eve"); } + +// ============================================================================ +// Column Alias Tests +// ============================================================================ + +#[tokio::test] +async fn test_datafusion_return_with_single_alias() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: RETURN with alias + let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS person_name") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 5); + + // Check that the column is named "person_name" not "p__name" + let schema = out.schema(); + assert_eq!(schema.fields().len(), 1); + assert_eq!(schema.field(0).name(), "person_name"); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!names.value(0).is_empty()); // Has data +} + +#[tokio::test] +async fn test_datafusion_return_with_multiple_aliases() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: Multiple columns with aliases + let query = + CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name AS name, p.age AS age") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Age > 30: Bob(35), Charlie(30 - excluded), David(40) + assert_eq!(out.num_rows(), 2); + + // Check column names are aliased + let schema = out.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "name"); + assert_eq!(schema.field(1).name(), "age"); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ages = out.column(1).as_any().downcast_ref::().unwrap(); + + // Verify data + let mut results: Vec<(String, i64)> = (0..out.num_rows()) + .map(|i| (names.value(i).to_string(), ages.value(i))) + .collect(); + results.sort_by_key(|r| r.1); + + assert_eq!(results[0], ("Bob".to_string(), 35)); + assert_eq!(results[1], ("David".to_string(), 40)); +} + +#[tokio::test] +async fn test_datafusion_return_mixed_with_and_without_alias() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: Mix of aliased and non-aliased columns + let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS full_name, p.age LIMIT 3") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 3); + + // Check column names + let schema = out.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "full_name"); // Aliased + assert_eq!(schema.field(1).name(), "p__age"); // Not aliased - qualified name +} + +#[tokio::test] +async fn test_datafusion_return_alias_with_relationship() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Alias in relationship query + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) \ + RETURN a.name AS source, b.name AS target \ + ORDER BY source, target \ + LIMIT 3", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 3); + + // Check column names are aliased + let schema = out.schema(); + assert_eq!(schema.field(0).name(), "source"); + assert_eq!(schema.field(1).name(), "target"); + + let sources = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let targets = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // First 3 ordered by source, target + assert_eq!(sources.value(0), "Alice"); + assert_eq!(targets.value(0), "Bob"); +} + +#[tokio::test] +async fn test_datafusion_return_alias_with_order_by() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: Alias with ORDER BY (ORDER BY uses original property reference) + let query = + CypherQuery::new("MATCH (p:Person) RETURN p.name AS name ORDER BY p.age DESC LIMIT 2") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 2); + + // Check column name is aliased + let schema = out.schema(); + assert_eq!(schema.field(0).name(), "name"); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Ordered by age DESC: David(40), Bob(35) + assert_eq!(names.value(0), "David"); + assert_eq!(names.value(1), "Bob"); +}