Skip to content

Commit dc30df2

Browse files
authored
feat: support ORDER BY in the datafusion planner (#22)
* feat: support ORDER BY in the datafusion planner * test: add more complex integration tests of datafusion execution
1 parent 76966d6 commit dc30df2

2 files changed

Lines changed: 890 additions & 3 deletions

File tree

rust/lance-graph/src/datafusion_planner.rs

Lines changed: 192 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,30 @@ impl DataFusionPlanner {
337337
.build()
338338
.unwrap())
339339
}
340-
LogicalOperator::Sort { input, .. } => {
341-
// Schema-less placeholder: skip sort for now
342-
self.build_operator(ctx, input)
340+
LogicalOperator::Sort { input, sort_items } => {
341+
use datafusion::logical_expr::SortExpr;
342+
343+
let input_plan = self.build_operator(ctx, input)?;
344+
345+
// Convert sort items to DataFusion sort expressions
346+
let sort_exprs: Vec<SortExpr> = sort_items
347+
.iter()
348+
.map(|item| {
349+
let expr = self.to_df_value_expr(&item.expression);
350+
let asc = matches!(item.direction, crate::ast::SortDirection::Ascending);
351+
SortExpr {
352+
expr,
353+
asc,
354+
nulls_first: true,
355+
}
356+
})
357+
.collect();
358+
359+
Ok(LogicalPlanBuilder::from(input_plan)
360+
.sort(sort_exprs)
361+
.unwrap()
362+
.build()
363+
.unwrap())
343364
}
344365
LogicalOperator::Limit { input, count } => {
345366
let input_plan = self.build_operator(ctx, input)?;
@@ -1611,4 +1632,172 @@ mod tests {
16111632
// Third call should fail (no more instances)
16121633
assert!(ctx.next_relationship_instance("KNOWS").is_err());
16131634
}
1635+
1636+
#[test]
1637+
fn test_order_by_single_column_asc() {
1638+
use crate::ast::{PropertyRef, SortDirection, ValueExpression};
1639+
use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem};
1640+
1641+
let cfg = crate::config::GraphConfig::builder()
1642+
.with_node_label("Person", "id")
1643+
.build()
1644+
.unwrap();
1645+
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
1646+
1647+
// Build: Project -> Sort
1648+
let scan = LogicalOperator::ScanByLabel {
1649+
variable: "n".to_string(),
1650+
label: "Person".to_string(),
1651+
properties: Default::default(),
1652+
};
1653+
1654+
let project = LogicalOperator::Project {
1655+
input: Box::new(scan),
1656+
projections: vec![ProjectionItem {
1657+
expression: ValueExpression::Property(PropertyRef {
1658+
variable: "n".to_string(),
1659+
property: "name".to_string(),
1660+
}),
1661+
alias: None,
1662+
}],
1663+
};
1664+
1665+
let sort = LogicalOperator::Sort {
1666+
input: Box::new(project),
1667+
sort_items: vec![SortItem {
1668+
expression: ValueExpression::Property(PropertyRef {
1669+
variable: "n".to_string(),
1670+
property: "name".to_string(),
1671+
}),
1672+
direction: SortDirection::Ascending,
1673+
}],
1674+
};
1675+
1676+
let df_plan = planner.plan(&sort).unwrap();
1677+
let s = format!("{:?}", df_plan);
1678+
1679+
// Should contain Sort operator
1680+
println!("Plan: {}", s);
1681+
assert!(s.contains("Sort") || s.contains("sort"));
1682+
assert!(s.contains("n__name"));
1683+
}
1684+
1685+
#[test]
1686+
fn test_order_by_multiple_columns() {
1687+
use crate::ast::{PropertyRef, SortDirection, ValueExpression};
1688+
use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem};
1689+
1690+
let cfg = crate::config::GraphConfig::builder()
1691+
.with_node_label("Person", "id")
1692+
.build()
1693+
.unwrap();
1694+
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
1695+
1696+
let scan = LogicalOperator::ScanByLabel {
1697+
variable: "n".to_string(),
1698+
label: "Person".to_string(),
1699+
properties: Default::default(),
1700+
};
1701+
1702+
let project = LogicalOperator::Project {
1703+
input: Box::new(scan),
1704+
projections: vec![
1705+
ProjectionItem {
1706+
expression: ValueExpression::Property(PropertyRef {
1707+
variable: "n".to_string(),
1708+
property: "name".to_string(),
1709+
}),
1710+
alias: None,
1711+
},
1712+
ProjectionItem {
1713+
expression: ValueExpression::Property(PropertyRef {
1714+
variable: "n".to_string(),
1715+
property: "age".to_string(),
1716+
}),
1717+
alias: None,
1718+
},
1719+
],
1720+
};
1721+
1722+
let sort = LogicalOperator::Sort {
1723+
input: Box::new(project),
1724+
sort_items: vec![
1725+
SortItem {
1726+
expression: ValueExpression::Property(PropertyRef {
1727+
variable: "n".to_string(),
1728+
property: "age".to_string(),
1729+
}),
1730+
direction: SortDirection::Descending,
1731+
},
1732+
SortItem {
1733+
expression: ValueExpression::Property(PropertyRef {
1734+
variable: "n".to_string(),
1735+
property: "name".to_string(),
1736+
}),
1737+
direction: SortDirection::Ascending,
1738+
},
1739+
],
1740+
};
1741+
1742+
let df_plan = planner.plan(&sort).unwrap();
1743+
let s = format!("{:?}", df_plan);
1744+
1745+
// Should contain Sort with both columns
1746+
assert!(s.contains("Sort") || s.contains("sort"));
1747+
assert!(s.contains("n__age"));
1748+
assert!(s.contains("n__name"));
1749+
}
1750+
1751+
#[test]
1752+
fn test_order_by_with_limit() {
1753+
use crate::ast::{PropertyRef, SortDirection, ValueExpression};
1754+
use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem};
1755+
1756+
let cfg = crate::config::GraphConfig::builder()
1757+
.with_node_label("Person", "id")
1758+
.build()
1759+
.unwrap();
1760+
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
1761+
1762+
let scan = LogicalOperator::ScanByLabel {
1763+
variable: "n".to_string(),
1764+
label: "Person".to_string(),
1765+
properties: Default::default(),
1766+
};
1767+
1768+
let project = LogicalOperator::Project {
1769+
input: Box::new(scan),
1770+
projections: vec![ProjectionItem {
1771+
expression: ValueExpression::Property(PropertyRef {
1772+
variable: "n".to_string(),
1773+
property: "name".to_string(),
1774+
}),
1775+
alias: None,
1776+
}],
1777+
};
1778+
1779+
let sort = LogicalOperator::Sort {
1780+
input: Box::new(project),
1781+
sort_items: vec![SortItem {
1782+
expression: ValueExpression::Property(PropertyRef {
1783+
variable: "n".to_string(),
1784+
property: "name".to_string(),
1785+
}),
1786+
direction: SortDirection::Ascending,
1787+
}],
1788+
};
1789+
1790+
let limit = LogicalOperator::Limit {
1791+
input: Box::new(sort),
1792+
count: 10,
1793+
};
1794+
1795+
let df_plan = planner.plan(&limit).unwrap();
1796+
let s = format!("{:?}", df_plan);
1797+
1798+
// Should contain both Limit and Sort
1799+
assert!(s.contains("Limit") || s.contains("limit"));
1800+
assert!(s.contains("Sort") || s.contains("sort"));
1801+
assert!(s.contains("n__name"));
1802+
}
16141803
}

0 commit comments

Comments
 (0)