Skip to content

Commit 40f3b9a

Browse files
authored
feat: support column alias in datafusion (#23)
1 parent dc30df2 commit 40f3b9a

2 files changed

Lines changed: 312 additions & 1 deletion

File tree

rust/lance-graph/src/datafusion_planner.rs

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,15 @@ impl DataFusionPlanner {
321321
let input_plan = self.build_operator(ctx, input)?;
322322
let exprs: Vec<Expr> = projections
323323
.iter()
324-
.map(|p| self.to_df_value_expr(&p.expression))
324+
.map(|p| {
325+
let expr = self.to_df_value_expr(&p.expression);
326+
// Apply alias if provided
327+
if let Some(alias) = &p.alias {
328+
expr.alias(alias)
329+
} else {
330+
expr
331+
}
332+
})
325333
.collect();
326334
Ok(LogicalPlanBuilder::from(input_plan)
327335
.project(exprs)
@@ -1800,4 +1808,129 @@ mod tests {
18001808
assert!(s.contains("Sort") || s.contains("sort"));
18011809
assert!(s.contains("n__name"));
18021810
}
1811+
1812+
#[test]
1813+
fn test_project_with_alias() {
1814+
use crate::ast::{PropertyRef, ValueExpression};
1815+
use crate::logical_plan::{LogicalOperator, ProjectionItem};
1816+
1817+
let cfg = crate::config::GraphConfig::builder()
1818+
.with_node_label("Person", "id")
1819+
.build()
1820+
.unwrap();
1821+
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
1822+
1823+
let scan = LogicalOperator::ScanByLabel {
1824+
variable: "n".to_string(),
1825+
label: "Person".to_string(),
1826+
properties: Default::default(),
1827+
};
1828+
1829+
let project = LogicalOperator::Project {
1830+
input: Box::new(scan),
1831+
projections: vec![ProjectionItem {
1832+
expression: ValueExpression::Property(PropertyRef {
1833+
variable: "n".to_string(),
1834+
property: "name".to_string(),
1835+
}),
1836+
alias: Some("person_name".to_string()),
1837+
}],
1838+
};
1839+
1840+
let df_plan = planner.plan(&project).unwrap();
1841+
let s = format!("{:?}", df_plan);
1842+
1843+
// Should contain the alias
1844+
assert!(s.contains("person_name"));
1845+
}
1846+
1847+
#[test]
1848+
fn test_project_with_multiple_aliases() {
1849+
use crate::ast::{PropertyRef, ValueExpression};
1850+
use crate::logical_plan::{LogicalOperator, ProjectionItem};
1851+
1852+
let cfg = crate::config::GraphConfig::builder()
1853+
.with_node_label("Person", "id")
1854+
.build()
1855+
.unwrap();
1856+
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
1857+
1858+
let scan = LogicalOperator::ScanByLabel {
1859+
variable: "p".to_string(),
1860+
label: "Person".to_string(),
1861+
properties: Default::default(),
1862+
};
1863+
1864+
let project = LogicalOperator::Project {
1865+
input: Box::new(scan),
1866+
projections: vec![
1867+
ProjectionItem {
1868+
expression: ValueExpression::Property(PropertyRef {
1869+
variable: "p".to_string(),
1870+
property: "name".to_string(),
1871+
}),
1872+
alias: Some("name".to_string()),
1873+
},
1874+
ProjectionItem {
1875+
expression: ValueExpression::Property(PropertyRef {
1876+
variable: "p".to_string(),
1877+
property: "age".to_string(),
1878+
}),
1879+
alias: Some("age".to_string()),
1880+
},
1881+
],
1882+
};
1883+
1884+
let df_plan = planner.plan(&project).unwrap();
1885+
let s = format!("{:?}", df_plan);
1886+
1887+
// Should contain both aliases
1888+
assert!(s.contains("name"));
1889+
assert!(s.contains("age"));
1890+
}
1891+
1892+
#[test]
1893+
fn test_project_mixed_with_and_without_alias() {
1894+
use crate::ast::{PropertyRef, ValueExpression};
1895+
use crate::logical_plan::{LogicalOperator, ProjectionItem};
1896+
1897+
let cfg = crate::config::GraphConfig::builder()
1898+
.with_node_label("Person", "id")
1899+
.build()
1900+
.unwrap();
1901+
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
1902+
1903+
let scan = LogicalOperator::ScanByLabel {
1904+
variable: "p".to_string(),
1905+
label: "Person".to_string(),
1906+
properties: Default::default(),
1907+
};
1908+
1909+
let project = LogicalOperator::Project {
1910+
input: Box::new(scan),
1911+
projections: vec![
1912+
ProjectionItem {
1913+
expression: ValueExpression::Property(PropertyRef {
1914+
variable: "p".to_string(),
1915+
property: "name".to_string(),
1916+
}),
1917+
alias: Some("full_name".to_string()),
1918+
},
1919+
ProjectionItem {
1920+
expression: ValueExpression::Property(PropertyRef {
1921+
variable: "p".to_string(),
1922+
property: "age".to_string(),
1923+
}),
1924+
alias: None, // No alias - should use qualified name
1925+
},
1926+
],
1927+
};
1928+
1929+
let df_plan = planner.plan(&project).unwrap();
1930+
let s = format!("{:?}", df_plan);
1931+
1932+
// Should contain the alias and the qualified name
1933+
assert!(s.contains("full_name"));
1934+
assert!(s.contains("p__age"));
1935+
}
18031936
}

rust/lance-graph/tests/integration_datafusion_pipeline.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,3 +1651,181 @@ async fn test_datafusion_order_by_with_distinct() {
16511651
assert_eq!(names.value(2), "David");
16521652
assert_eq!(names.value(3), "Eve");
16531653
}
1654+
1655+
// ============================================================================
1656+
// Column Alias Tests
1657+
// ============================================================================
1658+
1659+
#[tokio::test]
1660+
async fn test_datafusion_return_with_single_alias() {
1661+
let config = create_graph_config();
1662+
let person_batch = create_person_dataset();
1663+
1664+
// Query: RETURN with alias
1665+
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS person_name")
1666+
.unwrap()
1667+
.with_config(config);
1668+
1669+
let mut datasets = HashMap::new();
1670+
datasets.insert("Person".to_string(), person_batch);
1671+
1672+
let out = query.execute_datafusion(datasets).await.unwrap();
1673+
1674+
assert_eq!(out.num_rows(), 5);
1675+
1676+
// Check that the column is named "person_name" not "p__name"
1677+
let schema = out.schema();
1678+
assert_eq!(schema.fields().len(), 1);
1679+
assert_eq!(schema.field(0).name(), "person_name");
1680+
1681+
let names = out
1682+
.column(0)
1683+
.as_any()
1684+
.downcast_ref::<StringArray>()
1685+
.unwrap();
1686+
assert!(!names.value(0).is_empty()); // Has data
1687+
}
1688+
1689+
#[tokio::test]
1690+
async fn test_datafusion_return_with_multiple_aliases() {
1691+
let config = create_graph_config();
1692+
let person_batch = create_person_dataset();
1693+
1694+
// Query: Multiple columns with aliases
1695+
let query =
1696+
CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name AS name, p.age AS age")
1697+
.unwrap()
1698+
.with_config(config);
1699+
1700+
let mut datasets = HashMap::new();
1701+
datasets.insert("Person".to_string(), person_batch);
1702+
1703+
let out = query.execute_datafusion(datasets).await.unwrap();
1704+
1705+
// Age > 30: Bob(35), Charlie(30 - excluded), David(40)
1706+
assert_eq!(out.num_rows(), 2);
1707+
1708+
// Check column names are aliased
1709+
let schema = out.schema();
1710+
assert_eq!(schema.fields().len(), 2);
1711+
assert_eq!(schema.field(0).name(), "name");
1712+
assert_eq!(schema.field(1).name(), "age");
1713+
1714+
let names = out
1715+
.column(0)
1716+
.as_any()
1717+
.downcast_ref::<StringArray>()
1718+
.unwrap();
1719+
let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
1720+
1721+
// Verify data
1722+
let mut results: Vec<(String, i64)> = (0..out.num_rows())
1723+
.map(|i| (names.value(i).to_string(), ages.value(i)))
1724+
.collect();
1725+
results.sort_by_key(|r| r.1);
1726+
1727+
assert_eq!(results[0], ("Bob".to_string(), 35));
1728+
assert_eq!(results[1], ("David".to_string(), 40));
1729+
}
1730+
1731+
#[tokio::test]
1732+
async fn test_datafusion_return_mixed_with_and_without_alias() {
1733+
let config = create_graph_config();
1734+
let person_batch = create_person_dataset();
1735+
1736+
// Query: Mix of aliased and non-aliased columns
1737+
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS full_name, p.age LIMIT 3")
1738+
.unwrap()
1739+
.with_config(config);
1740+
1741+
let mut datasets = HashMap::new();
1742+
datasets.insert("Person".to_string(), person_batch);
1743+
1744+
let out = query.execute_datafusion(datasets).await.unwrap();
1745+
1746+
assert_eq!(out.num_rows(), 3);
1747+
1748+
// Check column names
1749+
let schema = out.schema();
1750+
assert_eq!(schema.fields().len(), 2);
1751+
assert_eq!(schema.field(0).name(), "full_name"); // Aliased
1752+
assert_eq!(schema.field(1).name(), "p__age"); // Not aliased - qualified name
1753+
}
1754+
1755+
#[tokio::test]
1756+
async fn test_datafusion_return_alias_with_relationship() {
1757+
let config = create_graph_config();
1758+
let person_batch = create_person_dataset();
1759+
let knows_batch = create_knows_dataset();
1760+
1761+
// Query: Alias in relationship query
1762+
let query = CypherQuery::new(
1763+
"MATCH (a:Person)-[:KNOWS]->(b:Person) \
1764+
RETURN a.name AS source, b.name AS target \
1765+
ORDER BY source, target \
1766+
LIMIT 3",
1767+
)
1768+
.unwrap()
1769+
.with_config(config);
1770+
1771+
let mut datasets = HashMap::new();
1772+
datasets.insert("Person".to_string(), person_batch);
1773+
datasets.insert("KNOWS".to_string(), knows_batch);
1774+
1775+
let out = query.execute_datafusion(datasets).await.unwrap();
1776+
1777+
assert_eq!(out.num_rows(), 3);
1778+
1779+
// Check column names are aliased
1780+
let schema = out.schema();
1781+
assert_eq!(schema.field(0).name(), "source");
1782+
assert_eq!(schema.field(1).name(), "target");
1783+
1784+
let sources = out
1785+
.column(0)
1786+
.as_any()
1787+
.downcast_ref::<StringArray>()
1788+
.unwrap();
1789+
let targets = out
1790+
.column(1)
1791+
.as_any()
1792+
.downcast_ref::<StringArray>()
1793+
.unwrap();
1794+
1795+
// First 3 ordered by source, target
1796+
assert_eq!(sources.value(0), "Alice");
1797+
assert_eq!(targets.value(0), "Bob");
1798+
}
1799+
1800+
#[tokio::test]
1801+
async fn test_datafusion_return_alias_with_order_by() {
1802+
let config = create_graph_config();
1803+
let person_batch = create_person_dataset();
1804+
1805+
// Query: Alias with ORDER BY (ORDER BY uses original property reference)
1806+
let query =
1807+
CypherQuery::new("MATCH (p:Person) RETURN p.name AS name ORDER BY p.age DESC LIMIT 2")
1808+
.unwrap()
1809+
.with_config(config);
1810+
1811+
let mut datasets = HashMap::new();
1812+
datasets.insert("Person".to_string(), person_batch);
1813+
1814+
let out = query.execute_datafusion(datasets).await.unwrap();
1815+
1816+
assert_eq!(out.num_rows(), 2);
1817+
1818+
// Check column name is aliased
1819+
let schema = out.schema();
1820+
assert_eq!(schema.field(0).name(), "name");
1821+
1822+
let names = out
1823+
.column(0)
1824+
.as_any()
1825+
.downcast_ref::<StringArray>()
1826+
.unwrap();
1827+
1828+
// Ordered by age DESC: David(40), Bob(35)
1829+
assert_eq!(names.value(0), "David");
1830+
assert_eq!(names.value(1), "Bob");
1831+
}

0 commit comments

Comments
 (0)