Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 192 additions & 3 deletions rust/lance-graph/src/datafusion_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,30 @@ impl DataFusionPlanner {
.build()
.unwrap())
}
LogicalOperator::Sort { input, .. } => {
// Schema-less placeholder: skip sort for now
self.build_operator(ctx, input)
LogicalOperator::Sort { input, sort_items } => {
use datafusion::logical_expr::SortExpr;

let input_plan = self.build_operator(ctx, input)?;

// Convert sort items to DataFusion sort expressions
let sort_exprs: Vec<SortExpr> = sort_items
.iter()
.map(|item| {
let expr = self.to_df_value_expr(&item.expression);
let asc = matches!(item.direction, crate::ast::SortDirection::Ascending);
SortExpr {
expr,
asc,
nulls_first: true,
}
})
.collect();

Ok(LogicalPlanBuilder::from(input_plan)
.sort(sort_exprs)
.unwrap()
.build()
.unwrap())
}
LogicalOperator::Limit { input, count } => {
let input_plan = self.build_operator(ctx, input)?;
Expand Down Expand Up @@ -1611,4 +1632,172 @@ mod tests {
// Third call should fail (no more instances)
assert!(ctx.next_relationship_instance("KNOWS").is_err());
}

#[test]
fn test_order_by_single_column_asc() {
use crate::ast::{PropertyRef, SortDirection, ValueExpression};
use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem};

let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());

// Build: Project -> Sort
let scan = LogicalOperator::ScanByLabel {
variable: "n".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};

let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
alias: None,
}],
};

let sort = LogicalOperator::Sort {
input: Box::new(project),
sort_items: vec![SortItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
direction: SortDirection::Ascending,
}],
};

let df_plan = planner.plan(&sort).unwrap();
let s = format!("{:?}", df_plan);

// Should contain Sort operator
println!("Plan: {}", s);
assert!(s.contains("Sort") || s.contains("sort"));
assert!(s.contains("n__name"));
}

#[test]
fn test_order_by_multiple_columns() {
use crate::ast::{PropertyRef, SortDirection, ValueExpression};
use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem};

let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());

let scan = LogicalOperator::ScanByLabel {
variable: "n".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};

let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![
ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
alias: None,
},
ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "age".to_string(),
}),
alias: None,
},
],
};

let sort = LogicalOperator::Sort {
input: Box::new(project),
sort_items: vec![
SortItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "age".to_string(),
}),
direction: SortDirection::Descending,
},
SortItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
direction: SortDirection::Ascending,
},
],
};

let df_plan = planner.plan(&sort).unwrap();
let s = format!("{:?}", df_plan);

// Should contain Sort with both columns
assert!(s.contains("Sort") || s.contains("sort"));
assert!(s.contains("n__age"));
assert!(s.contains("n__name"));
}

#[test]
fn test_order_by_with_limit() {
use crate::ast::{PropertyRef, SortDirection, ValueExpression};
use crate::logical_plan::{LogicalOperator, ProjectionItem, SortItem};

let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());

let scan = LogicalOperator::ScanByLabel {
variable: "n".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};

let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
alias: None,
}],
};

let sort = LogicalOperator::Sort {
input: Box::new(project),
sort_items: vec![SortItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".to_string(),
property: "name".to_string(),
}),
direction: SortDirection::Ascending,
}],
};

let limit = LogicalOperator::Limit {
input: Box::new(sort),
count: 10,
};

let df_plan = planner.plan(&limit).unwrap();
let s = format!("{:?}", df_plan);

// Should contain both Limit and Sort
assert!(s.contains("Limit") || s.contains("limit"));
assert!(s.contains("Sort") || s.contains("sort"));
assert!(s.contains("n__name"));
}
}
Loading
Loading