Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 134 additions & 1 deletion rust/lance-graph/src/datafusion_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,15 @@ impl DataFusionPlanner {
let input_plan = self.build_operator(ctx, input)?;
let exprs: Vec<Expr> = projections
.iter()
.map(|p| self.to_df_value_expr(&p.expression))
.map(|p| {
let expr = self.to_df_value_expr(&p.expression);
// Apply alias if provided
if let Some(alias) = &p.alias {
expr.alias(alias)
} else {
expr
}
})
.collect();
Ok(LogicalPlanBuilder::from(input_plan)
.project(exprs)
Expand Down Expand Up @@ -1800,4 +1808,129 @@ mod tests {
assert!(s.contains("Sort") || s.contains("sort"));
assert!(s.contains("n__name"));
}

#[test]
fn test_project_with_alias() {
use crate::ast::{PropertyRef, ValueExpression};
use crate::logical_plan::{LogicalOperator, ProjectionItem};

let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());

let scan = LogicalOperator::ScanByLabel {
variable: "n".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};

let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
alias: Some("person_name".to_string()),
}],
};

let df_plan = planner.plan(&project).unwrap();
let s = format!("{:?}", df_plan);

// Should contain the alias
assert!(s.contains("person_name"));
}

#[test]
fn test_project_with_multiple_aliases() {
use crate::ast::{PropertyRef, ValueExpression};
use crate::logical_plan::{LogicalOperator, ProjectionItem};

let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());

let scan = LogicalOperator::ScanByLabel {
variable: "p".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};

let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![
ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "p".to_string(),
property: "name".to_string(),
}),
alias: Some("name".to_string()),
},
ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "p".to_string(),
property: "age".to_string(),
}),
alias: Some("age".to_string()),
},
],
};

let df_plan = planner.plan(&project).unwrap();
let s = format!("{:?}", df_plan);

// Should contain both aliases
assert!(s.contains("name"));
assert!(s.contains("age"));
}

#[test]
fn test_project_mixed_with_and_without_alias() {
use crate::ast::{PropertyRef, ValueExpression};
use crate::logical_plan::{LogicalOperator, ProjectionItem};

let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());

let scan = LogicalOperator::ScanByLabel {
variable: "p".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};

let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![
ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "p".to_string(),
property: "name".to_string(),
}),
alias: Some("full_name".to_string()),
},
ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "p".to_string(),
property: "age".to_string(),
}),
alias: None, // No alias - should use qualified name
},
],
};

let df_plan = planner.plan(&project).unwrap();
let s = format!("{:?}", df_plan);

// Should contain the alias and the qualified name
assert!(s.contains("full_name"));
assert!(s.contains("p__age"));
}
}
178 changes: 178 additions & 0 deletions rust/lance-graph/tests/integration_datafusion_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1651,3 +1651,181 @@ async fn test_datafusion_order_by_with_distinct() {
assert_eq!(names.value(2), "David");
assert_eq!(names.value(3), "Eve");
}

// ============================================================================
// Column Alias Tests
// ============================================================================

#[tokio::test]
async fn test_datafusion_return_with_single_alias() {
let config = create_graph_config();
let person_batch = create_person_dataset();

// Query: RETURN with alias
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS person_name")
.unwrap()
.with_config(config);

let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);

let out = query.execute_datafusion(datasets).await.unwrap();

assert_eq!(out.num_rows(), 5);

// Check that the column is named "person_name" not "p__name"
let schema = out.schema();
assert_eq!(schema.fields().len(), 1);
assert_eq!(schema.field(0).name(), "person_name");

let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(!names.value(0).is_empty()); // Has data
}

#[tokio::test]
async fn test_datafusion_return_with_multiple_aliases() {
let config = create_graph_config();
let person_batch = create_person_dataset();

// Query: Multiple columns with aliases
let query =
CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name AS name, p.age AS age")
.unwrap()
.with_config(config);

let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);

let out = query.execute_datafusion(datasets).await.unwrap();

// Age > 30: Bob(35), Charlie(30 - excluded), David(40)
assert_eq!(out.num_rows(), 2);

// Check column names are aliased
let schema = out.schema();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.field(0).name(), "name");
assert_eq!(schema.field(1).name(), "age");

let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();

// Verify data
let mut results: Vec<(String, i64)> = (0..out.num_rows())
.map(|i| (names.value(i).to_string(), ages.value(i)))
.collect();
results.sort_by_key(|r| r.1);

assert_eq!(results[0], ("Bob".to_string(), 35));
assert_eq!(results[1], ("David".to_string(), 40));
}

#[tokio::test]
async fn test_datafusion_return_mixed_with_and_without_alias() {
let config = create_graph_config();
let person_batch = create_person_dataset();

// Query: Mix of aliased and non-aliased columns
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS full_name, p.age LIMIT 3")
.unwrap()
.with_config(config);

let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);

let out = query.execute_datafusion(datasets).await.unwrap();

assert_eq!(out.num_rows(), 3);

// Check column names
let schema = out.schema();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.field(0).name(), "full_name"); // Aliased
assert_eq!(schema.field(1).name(), "p__age"); // Not aliased - qualified name
}

#[tokio::test]
async fn test_datafusion_return_alias_with_relationship() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();

// Query: Alias in relationship query
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person) \
RETURN a.name AS source, b.name AS target \
ORDER BY source, target \
LIMIT 3",
)
.unwrap()
.with_config(config);

let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);

let out = query.execute_datafusion(datasets).await.unwrap();

assert_eq!(out.num_rows(), 3);

// Check column names are aliased
let schema = out.schema();
assert_eq!(schema.field(0).name(), "source");
assert_eq!(schema.field(1).name(), "target");

let sources = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let targets = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

// First 3 ordered by source, target
assert_eq!(sources.value(0), "Alice");
assert_eq!(targets.value(0), "Bob");
}

#[tokio::test]
async fn test_datafusion_return_alias_with_order_by() {
let config = create_graph_config();
let person_batch = create_person_dataset();

// Query: Alias with ORDER BY (ORDER BY uses original property reference)
let query =
CypherQuery::new("MATCH (p:Person) RETURN p.name AS name ORDER BY p.age DESC LIMIT 2")
.unwrap()
.with_config(config);

let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);

let out = query.execute_datafusion(datasets).await.unwrap();

assert_eq!(out.num_rows(), 2);

// Check column name is aliased
let schema = out.schema();
assert_eq!(schema.field(0).name(), "name");

let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

// Ordered by age DESC: David(40), Bob(35)
assert_eq!(names.value(0), "David");
assert_eq!(names.value(1), "Bob");
}
Loading