diff --git a/rust/lance-graph/src/datafusion_planner.rs b/rust/lance-graph/src/datafusion_planner.rs index 56c038d7..b20b351a 100644 --- a/rust/lance-graph/src/datafusion_planner.rs +++ b/rust/lance-graph/src/datafusion_planner.rs @@ -337,9 +337,30 @@ impl DataFusionPlanner { .build() .unwrap()) } - LogicalOperator::Sort { input, .. } => { - // Schema-less placeholder: skip sort for now - self.build_operator(ctx, input) + LogicalOperator::Sort { input, sort_items } => { + use datafusion::logical_expr::SortExpr; + + let input_plan = self.build_operator(ctx, input)?; + + // Convert sort items to DataFusion sort expressions + let sort_exprs: Vec = sort_items + .iter() + .map(|item| { + let expr = self.to_df_value_expr(&item.expression); + let asc = matches!(item.direction, crate::ast::SortDirection::Ascending); + SortExpr { + expr, + asc, + nulls_first: true, + } + }) + .collect(); + + Ok(LogicalPlanBuilder::from(input_plan) + .sort(sort_exprs) + .unwrap() + .build() + .unwrap()) } LogicalOperator::Limit { input, count } => { let input_plan = self.build_operator(ctx, input)?; @@ -1611,4 +1632,172 @@ mod tests { // Third call should fail (no more instances) assert!(ctx.next_relationship_instance("KNOWS").is_err()); } + + #[test] + fn test_order_by_single_column_asc() { + use crate::ast::{PropertyRef, SortDirection, ValueExpression}; + use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem}; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + // Build: Project -> Sort + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + alias: None, + }], + }; + + let sort = LogicalOperator::Sort { + input: Box::new(project), + sort_items: vec![SortItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + direction: SortDirection::Ascending, + }], + }; + + let df_plan = planner.plan(&sort).unwrap(); + let s = format!("{:?}", df_plan); + + // Should contain Sort operator + println!("Plan: {}", s); + assert!(s.contains("Sort") || s.contains("sort")); + assert!(s.contains("n__name")); + } + + #[test] + fn test_order_by_multiple_columns() { + use crate::ast::{PropertyRef, SortDirection, ValueExpression}; + use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem}; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + alias: None, + }, + ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "age".to_string(), + }), + alias: None, + }, + ], + }; + + let sort = LogicalOperator::Sort { + input: Box::new(project), + sort_items: vec![ + SortItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "age".to_string(), + }), + direction: SortDirection::Descending, + }, + SortItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + direction: SortDirection::Ascending, + }, + ], + }; + + let df_plan = planner.plan(&sort).unwrap(); + let s = format!("{:?}", df_plan); + + // Should contain Sort with both columns + assert!(s.contains("Sort") || s.contains("sort")); + assert!(s.contains("n__age")); + assert!(s.contains("n__name")); + } + + #[test] + fn test_order_by_with_limit() { + use crate::ast::{PropertyRef, SortDirection, ValueExpression}; + use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem}; + + let cfg = crate::config::GraphConfig::builder() + .with_node_label("Person", "id") + .build() + .unwrap(); + let planner = DataFusionPlanner::with_catalog(cfg, make_catalog()); + + let scan = LogicalOperator::ScanByLabel { + variable: "n".to_string(), + label: "Person".to_string(), + properties: Default::default(), + }; + + let project = LogicalOperator::Project { + input: Box::new(scan), + projections: vec![ProjectionItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + alias: None, + }], + }; + + let sort = LogicalOperator::Sort { + input: Box::new(project), + sort_items: vec![SortItem { + expression: ValueExpression::Property(PropertyRef { + variable: "n".to_string(), + property: "name".to_string(), + }), + direction: SortDirection::Ascending, + }], + }; + + let limit = LogicalOperator::Limit { + input: Box::new(sort), + count: 10, + }; + + let df_plan = planner.plan(&limit).unwrap(); + let s = format!("{:?}", df_plan); + + // Should contain both Limit and Sort + assert!(s.contains("Limit") || s.contains("limit")); + assert!(s.contains("Sort") || s.contains("sort")); + assert!(s.contains("n__name")); + } } diff --git a/rust/lance-graph/tests/integration_datafusion_pipeline.rs b/rust/lance-graph/tests/integration_datafusion_pipeline.rs index 203969c9..97a00427 100644 --- a/rust/lance-graph/tests/integration_datafusion_pipeline.rs +++ b/rust/lance-graph/tests/integration_datafusion_pipeline.rs @@ -115,6 +115,10 @@ fn create_graph_config() -> GraphConfig { .unwrap() } +// ============================================================================ +// Basic Node Query Tests +// ============================================================================ + #[tokio::test] async fn test_datafusion_simple_node_scan() { let config = create_graph_config(); @@ -225,6 +229,10 @@ async fn test_datafusion_multiple_conditions() { assert_eq!(name_set, expected); } +// ============================================================================ +// Basic Relationship Query Tests +// ============================================================================ + #[tokio::test] async fn test_datafusion_relationship_traversal() { let config = create_graph_config(); @@ -656,6 +664,10 @@ async fn test_datafusion_one_hop_filtered_source_age_strict() { assert_eq!(set, expected); } +// ============================================================================ +// Two-Hop Path Query Tests +// ============================================================================ + #[tokio::test] async fn test_datafusion_two_hop_basic() { let config = create_graph_config(); @@ -953,3 +965,689 @@ async fn test_datafusion_two_hop_no_results() { // Eve has no outgoing edges, so no two-hop paths assert_eq!(out.num_rows(), 0); } + +// ============================================================================ +// Complex Query Tests (Advanced Filtering & Multi-Condition) +// ============================================================================ + +#[tokio::test] +async fn test_datafusion_two_hop_with_multiple_filters() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Two-hop with filters on source, intermediate, and target + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \ + WHERE a.age < 30 AND b.age >= 30 AND c.age > 25 \ + RETURN a.name, b.name, c.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // a.age < 30: Alice(25), Eve(28) + // b.age >= 30: Bob(35), Charlie(30), David(40) + // c.age > 25: Bob(35), Charlie(30), David(40), Eve(28) + // Paths from Alice: Alice->Bob->Charlie, Alice->Charlie->David + // Valid: Alice(25)->Bob(35)->Charlie(30), Alice(25)->Charlie(30)->David(40) + assert_eq!(out.num_rows(), 2); + + let a_names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let b_names = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let c_names = out + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut paths = Vec::new(); + for i in 0..out.num_rows() { + paths.push(( + a_names.value(i).to_string(), + b_names.value(i).to_string(), + c_names.value(i).to_string(), + )); + } + + assert!(paths.contains(&( + "Alice".to_string(), + "Bob".to_string(), + "Charlie".to_string() + ))); + assert!(paths.contains(&( + "Alice".to_string(), + "Charlie".to_string(), + "David".to_string() + ))); +} + +#[tokio::test] +async fn test_datafusion_two_hop_return_relationship_properties() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Return relationship properties from two-hop path + let query = CypherQuery::new( + "MATCH (a:Person)-[r1:KNOWS]->(b:Person)-[r2:KNOWS]->(c:Person) \ + RETURN a.name, c.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + assert_eq!(out.num_columns(), 2); + assert_eq!(out.num_rows(), 4); +} + +#[tokio::test] +async fn test_datafusion_one_hop_with_city_filter() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Filter targets by city (David has NULL city, should be excluded by comparison) + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) WHERE b.city = 'Seattle' RETURN b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Only Eve has city = 'Seattle' and is reachable (David->Eve) + assert_eq!(out.num_rows(), 1); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(names.value(0), "Eve"); +} + +#[tokio::test] +async fn test_datafusion_two_hop_with_limit() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Two-hop with LIMIT + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \ + RETURN c.name LIMIT 2", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Should return only 2 rows (limited from 4 total paths) + assert_eq!(out.num_rows(), 2); +} + +#[tokio::test] +async fn test_datafusion_complex_boolean_expression() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Complex boolean expression with AND/OR + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) \ + WHERE (a.age > 30 AND b.age < 35) OR (a.name = 'Alice' AND b.name = 'Bob') \ + RETURN a.name, b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Matches: + // - Bob(35)->Charlie(30): age > 30 AND age < 35 + // - David(40)->Eve(28): age > 30 AND age < 35 + // - Alice(25)->Bob(35): name = 'Alice' AND name = 'Bob' + assert_eq!(out.num_rows(), 3); + + let a_names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let b_names = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut pairs = Vec::new(); + for i in 0..out.num_rows() { + pairs.push((a_names.value(i).to_string(), b_names.value(i).to_string())); + } + + assert!(pairs.contains(&("Alice".to_string(), "Bob".to_string()))); + assert!(pairs.contains(&("Bob".to_string(), "Charlie".to_string()))); + assert!(pairs.contains(&("David".to_string(), "Eve".to_string()))); +} + +#[tokio::test] +async fn test_datafusion_two_hop_same_intermediate_node() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Find paths through Charlie specifically + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \ + WHERE b.name = 'Charlie' \ + RETURN a.name, c.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Paths through Charlie: Bob->Charlie->David, Alice->Charlie->David + assert_eq!(out.num_rows(), 2); + + let a_names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let c_names = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut pairs = Vec::new(); + for i in 0..out.num_rows() { + pairs.push((a_names.value(i).to_string(), c_names.value(i).to_string())); + } + + assert!(pairs.contains(&("Bob".to_string(), "David".to_string()))); + assert!(pairs.contains(&("Alice".to_string(), "David".to_string()))); +} + +#[tokio::test] +async fn test_datafusion_one_hop_multiple_properties() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Return multiple properties from both source and target + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) \ + RETURN a.name, a.age, b.name, b.age", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_columns(), 4); + assert_eq!(out.num_rows(), 5); + + let a_names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let a_ages = out.column(1).as_any().downcast_ref::().unwrap(); + let b_names = out + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + let b_ages = out.column(3).as_any().downcast_ref::().unwrap(); + + // Verify at least one row has correct data + let mut found_alice_bob = false; + for i in 0..out.num_rows() { + if a_names.value(i) == "Alice" && b_names.value(i) == "Bob" { + assert_eq!(a_ages.value(i), 25); + assert_eq!(b_ages.value(i), 35); + found_alice_bob = true; + } + } + assert!(found_alice_bob); +} + +#[tokio::test] +async fn test_datafusion_two_hop_count_paths_per_source() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Count two-hop paths from Alice + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \ + WHERE a.name = 'Alice' \ + RETURN c.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Alice's two-hop paths: Alice->Bob->Charlie, Alice->Charlie->David + assert_eq!(out.num_rows(), 2); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut counts = HashMap::::new(); + for i in 0..out.num_rows() { + *counts.entry(names.value(i).to_string()).or_insert(0) += 1; + } + + assert_eq!(counts.get("Charlie"), Some(&1)); + assert_eq!(counts.get("David"), Some(&1)); +} + +#[tokio::test] +async fn test_datafusion_filter_on_both_nodes_and_edges() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Filter on both node properties and relationship existence + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) \ + WHERE a.age >= 25 AND a.age <= 30 AND b.age > 30 \ + RETURN a.name, b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // a: age 25-30 = Alice(25), Charlie(30), Eve(28) + // b: age > 30 = Bob(35), David(40) + // Edges: Alice->Bob, Charlie->David + assert_eq!(out.num_rows(), 2); + + let a_names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let b_names = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut pairs = Vec::new(); + for i in 0..out.num_rows() { + pairs.push((a_names.value(i).to_string(), b_names.value(i).to_string())); + } + + assert!(pairs.contains(&("Alice".to_string(), "Bob".to_string()))); + assert!(pairs.contains(&("Charlie".to_string(), "David".to_string()))); +} + +#[tokio::test] +async fn test_datafusion_distinct_with_two_hop() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Get distinct source nodes that have two-hop paths + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \ + RETURN DISTINCT a.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Sources with two-hop paths: Alice, Bob, Charlie + assert_eq!(out.num_rows(), 3); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let result_set: std::collections::HashSet = (0..out.num_rows()) + .map(|i| names.value(i).to_string()) + .collect(); + + let expected: std::collections::HashSet = ["Alice", "Bob", "Charlie"] + .into_iter() + .map(|s| s.to_string()) + .collect(); + + assert_eq!(result_set, expected); +} + +// ============================================================================ +// ORDER BY Tests +// ============================================================================ + +#[tokio::test] +async fn test_datafusion_order_by_single_column_asc() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: ORDER BY name ascending + let query = CypherQuery::new("MATCH (p:Person) RETURN p.name ORDER BY p.name") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 5); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Verify alphabetical order: Alice, Bob, Charlie, David, Eve + assert_eq!(names.value(0), "Alice"); + assert_eq!(names.value(1), "Bob"); + assert_eq!(names.value(2), "Charlie"); + assert_eq!(names.value(3), "David"); + assert_eq!(names.value(4), "Eve"); +} + +#[tokio::test] +async fn test_datafusion_order_by_single_column_desc() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: ORDER BY age descending + let query = CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age DESC") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 5); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ages = out.column(1).as_any().downcast_ref::().unwrap(); + + // Verify descending age order: David(40), Bob(35), Charlie(30), Eve(28), Alice(25) + assert_eq!(names.value(0), "David"); + assert_eq!(ages.value(0), 40); + assert_eq!(names.value(1), "Bob"); + assert_eq!(ages.value(1), 35); + assert_eq!(names.value(2), "Charlie"); + assert_eq!(ages.value(2), 30); + assert_eq!(names.value(3), "Eve"); + assert_eq!(ages.value(3), 28); + assert_eq!(names.value(4), "Alice"); + assert_eq!(ages.value(4), 25); +} + +#[tokio::test] +async fn test_datafusion_order_by_multiple_columns() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: ORDER BY age DESC, name ASC (secondary sort by name) + let query = + CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age DESC, p.name ASC") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 5); + + let _names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ages = out.column(1).as_any().downcast_ref::().unwrap(); + + // First by age DESC, then by name ASC + assert_eq!(ages.value(0), 40); // David + assert_eq!(ages.value(1), 35); // Bob + assert_eq!(ages.value(2), 30); // Charlie + assert_eq!(ages.value(3), 28); // Eve + assert_eq!(ages.value(4), 25); // Alice +} + +#[tokio::test] +async fn test_datafusion_order_by_with_limit() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: ORDER BY age DESC LIMIT 3 (top 3 oldest) + let query = + CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age DESC LIMIT 3") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Should only return 3 rows + assert_eq!(out.num_rows(), 3); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ages = out.column(1).as_any().downcast_ref::().unwrap(); + + // Top 3 oldest: David(40), Bob(35), Charlie(30) + assert_eq!(names.value(0), "David"); + assert_eq!(ages.value(0), 40); + assert_eq!(names.value(1), "Bob"); + assert_eq!(ages.value(1), 35); + assert_eq!(names.value(2), "Charlie"); + assert_eq!(ages.value(2), 30); +} + +#[tokio::test] +async fn test_datafusion_order_by_with_filter() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + + // Query: Filter then order + let query = + CypherQuery::new("MATCH (p:Person) WHERE p.age >= 30 RETURN p.name ORDER BY p.name") + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Age >= 30: Bob(35), Charlie(30), David(40) + assert_eq!(out.num_rows(), 3); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Alphabetical: Bob, Charlie, David + assert_eq!(names.value(0), "Bob"); + assert_eq!(names.value(1), "Charlie"); + assert_eq!(names.value(2), "David"); +} + +#[tokio::test] +async fn test_datafusion_order_by_relationship_query() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Order relationship results by target name + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name ORDER BY b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 5); + + let b_names = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Targets ordered: Bob, Charlie(x2), David, Eve + assert_eq!(b_names.value(0), "Bob"); + assert_eq!(b_names.value(1), "Charlie"); + assert_eq!(b_names.value(2), "Charlie"); + assert_eq!(b_names.value(3), "David"); + assert_eq!(b_names.value(4), "Eve"); +} + +#[tokio::test] +async fn test_datafusion_order_by_two_hop_query() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: Two-hop with ORDER BY on final target + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \ + RETURN a.name, c.name ORDER BY c.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + assert_eq!(out.num_rows(), 4); + + let c_names = out + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + // Final targets ordered: Charlie, David(x2), Eve + assert_eq!(c_names.value(0), "Charlie"); + assert_eq!(c_names.value(1), "David"); + assert_eq!(c_names.value(2), "David"); + assert_eq!(c_names.value(3), "Eve"); +} + +#[tokio::test] +async fn test_datafusion_order_by_with_distinct() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Query: DISTINCT with ORDER BY + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN DISTINCT b.name ORDER BY b.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let out = query.execute_datafusion(datasets).await.unwrap(); + + // Distinct targets: Bob, Charlie, David, Eve + assert_eq!(out.num_rows(), 4); + + let names = out + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Alphabetical order + assert_eq!(names.value(0), "Bob"); + assert_eq!(names.value(1), "Charlie"); + assert_eq!(names.value(2), "David"); + assert_eq!(names.value(3), "Eve"); +}