Skip to content

Commit c8d1069

Browse files
authored
Support Simple Unwind in lance-graph (#112)
1 parent 0d2a49b commit c8d1069

10 files changed

Lines changed: 545 additions & 118 deletions

File tree

crates/lance-graph/src/ast.rs

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ use std::collections::HashMap;
1313
/// A complete Cypher query
1414
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1515
pub struct CypherQuery {
16-
/// MATCH clauses
17-
pub match_clauses: Vec<MatchClause>,
16+
/// READING clauses (MATCH, UNWIND, etc.)
17+
pub reading_clauses: Vec<ReadingClause>,
1818
/// WHERE clause (optional, before WITH if present)
1919
pub where_clause: Option<WhereClause>,
2020
/// WITH clause (optional) - intermediate projection/aggregation
2121
pub with_clause: Option<WithClause>,
22-
/// MATCH clauses after WITH (optional) - query chaining
23-
pub post_with_match_clauses: Vec<MatchClause>,
22+
/// Post-WITH READING clauses
23+
pub post_with_reading_clauses: Vec<ReadingClause>,
2424
/// WHERE clause after WITH (optional) - filters the WITH results
2525
pub post_with_where_clause: Option<WhereClause>,
2626
/// RETURN clause
@@ -37,28 +37,31 @@ impl CypherQuery {
3737
/// Extract all node labels referenced in the query
3838
pub fn get_node_labels(&self) -> Vec<String> {
3939
let mut labels = Vec::new();
40-
for match_clause in &self.match_clauses {
41-
for pattern in &match_clause.patterns {
42-
match pattern {
43-
GraphPattern::Node(node) => {
44-
for label in &node.labels {
45-
if !labels.contains(label) {
46-
labels.push(label.clone());
47-
}
48-
}
49-
}
50-
GraphPattern::Path(path) => {
51-
for label in &path.start_node.labels {
52-
if !labels.contains(label) {
53-
labels.push(label.clone());
40+
// Iterate all match clauses directly
41+
for clause in &self.reading_clauses {
42+
if let ReadingClause::Match(match_clause) = clause {
43+
for pattern in &match_clause.patterns {
44+
match pattern {
45+
GraphPattern::Node(node) => {
46+
for label in &node.labels {
47+
if !labels.contains(label) {
48+
labels.push(label.clone());
49+
}
5450
}
5551
}
56-
for segment in &path.segments {
57-
for label in &segment.end_node.labels {
52+
GraphPattern::Path(path) => {
53+
for label in &path.start_node.labels {
5854
if !labels.contains(label) {
5955
labels.push(label.clone());
6056
}
6157
}
58+
for segment in &path.segments {
59+
for label in &segment.end_node.labels {
60+
if !labels.contains(label) {
61+
labels.push(label.clone());
62+
}
63+
}
64+
}
6265
}
6366
}
6467
}
@@ -70,21 +73,38 @@ impl CypherQuery {
7073
/// Extract all relationship types referenced in the query
7174
pub fn get_relationship_types(&self) -> Vec<String> {
7275
let mut types = Vec::new();
73-
for match_clause in &self.match_clauses {
74-
for pattern in &match_clause.patterns {
75-
if let GraphPattern::Path(path) = pattern {
76-
for segment in &path.segments {
77-
for rel_type in &segment.relationship.types {
78-
if !types.contains(rel_type) {
79-
types.push(rel_type.clone());
80-
}
81-
}
82-
}
76+
for clause in &self.reading_clauses {
77+
if let ReadingClause::Match(match_clause) = clause {
78+
for pattern in &match_clause.patterns {
79+
self.collect_relationship_types_from_pattern(pattern, &mut types);
8380
}
8481
}
8582
}
8683
types
8784
}
85+
86+
fn collect_relationship_types_from_pattern(
87+
&self,
88+
pattern: &GraphPattern,
89+
types: &mut Vec<String>,
90+
) {
91+
if let GraphPattern::Path(path) = pattern {
92+
for segment in &path.segments {
93+
for rel_type in &segment.relationship.types {
94+
if !types.contains(rel_type) {
95+
types.push(rel_type.clone());
96+
}
97+
}
98+
}
99+
}
100+
}
101+
}
102+
103+
/// A clause that reads from the graph (MATCH, UNWIND)
104+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
105+
pub enum ReadingClause {
106+
Match(MatchClause),
107+
Unwind(UnwindClause),
88108
}
89109

90110
/// A MATCH clause containing graph patterns
@@ -94,6 +114,15 @@ pub struct MatchClause {
94114
pub patterns: Vec<GraphPattern>,
95115
}
96116

117+
/// An UNWIND clause
118+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
119+
pub struct UnwindClause {
120+
/// Expression to unwind
121+
pub expression: ValueExpression,
122+
/// Alias for the unwound values
123+
pub alias: String,
124+
}
125+
97126
/// A graph pattern (nodes and relationships)
98127
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
99128
pub enum GraphPattern {

crates/lance-graph/src/datafusion_planner/analysis.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ fn analyze_operator(
206206
analyze_operator(left, analysis, rel_counter)?;
207207
analyze_operator(right, analysis, rel_counter)?;
208208
}
209+
LogicalOperator::Unwind { input, .. } => {
210+
if let Some(op) = input {
211+
analyze_operator(op, analysis, rel_counter)?;
212+
}
213+
}
209214
}
210215
Ok(())
211216
}

crates/lance-graph/src/datafusion_planner/builder/basic_ops.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,45 @@ impl DataFusionPlanner {
141141
.build()
142142
.map_err(|e| self.plan_error("Failed to build plan", e))
143143
}
144+
145+
pub(crate) fn build_unwind(
146+
&self,
147+
ctx: &mut PlanningContext,
148+
input: &Option<Box<LogicalOperator>>,
149+
expression: &crate::ast::ValueExpression,
150+
alias: &str,
151+
) -> Result<LogicalPlan> {
152+
let input_plan = if let Some(input_op) = input {
153+
self.build_operator(ctx, input_op)?
154+
} else {
155+
// Create an empty relation that produces one row for UNWIND [..]
156+
LogicalPlanBuilder::empty(true)
157+
.build()
158+
.map_err(|e| self.plan_error("Failed to create empty relation", e))?
159+
};
160+
161+
// Convert expression to DataFusion Expr
162+
let df_expr = super::super::expression::to_df_value_expr(expression);
163+
164+
// We project the list expression first (aliased as the target alias temporarily)
165+
// DataFusion unnest takes a column name.
166+
let builder = LogicalPlanBuilder::from(input_plan);
167+
168+
let builder = builder
169+
.project(vec![
170+
datafusion::logical_expr::wildcard(),
171+
datafusion::logical_expr::select_expr::SelectExpr::Expression(df_expr.alias(alias)),
172+
])
173+
.map_err(|e| self.plan_error("Failed to project unwind expression", e))?;
174+
175+
let builder = builder
176+
.unnest_column(alias)
177+
.map_err(|e| self.plan_error("Failed to build unnest", e))?;
178+
179+
builder
180+
.build()
181+
.map_err(|e| self.plan_error("Failed to build unwind plan", e))
182+
}
144183
}
145184

146185
#[cfg(test)]

crates/lance-graph/src/datafusion_planner/builder/helpers.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ impl DataFusionPlanner {
8585
Self::collect_variables(left, vars);
8686
Self::collect_variables(right, vars);
8787
}
88+
LogicalOperator::Unwind { input, alias, .. } => {
89+
if let Some(op) = input {
90+
Self::collect_variables(op, vars);
91+
}
92+
vars.push(alias.clone());
93+
}
8894
}
8995
}
9096
}

crates/lance-graph/src/datafusion_planner/builder/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ impl DataFusionPlanner {
9696
right,
9797
join_type,
9898
} => self.build_join(ctx, left, right, join_type),
99+
LogicalOperator::Unwind {
100+
input,
101+
expression,
102+
alias,
103+
} => self.build_unwind(ctx, input, expression, alias),
99104
}
100105
}
101106
}

crates/lance-graph/src/logical_plan.rs

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ pub enum LogicalOperator {
2323
properties: HashMap<String, PropertyValue>,
2424
},
2525

26+
/// Unwind a list into a sequence of rows
27+
Unwind {
28+
/// The input operator
29+
input: Option<Box<LogicalOperator>>,
30+
/// The expression to unwind (must yield a list)
31+
expression: ValueExpression,
32+
/// The alias for the unwound element
33+
alias: String,
34+
},
35+
2636
/// Apply a filter predicate (WHERE clause)
2737
Filter {
2838
input: Box<LogicalOperator>,
@@ -153,8 +163,8 @@ impl LogicalPlanner {
153163

154164
/// Convert a Cypher AST to a logical plan
155165
pub fn plan(&mut self, query: &CypherQuery) -> Result<LogicalOperator> {
156-
// Start with the MATCH clause(s)
157-
let mut plan = self.plan_match_clauses(&query.match_clauses)?;
166+
// Plan main MATCH clauses
167+
let mut plan = self.plan_reading_clauses(None, &query.reading_clauses)?;
158168

159169
// Apply WHERE clause if present (before WITH)
160170
if let Some(where_clause) = &query.where_clause {
@@ -169,9 +179,9 @@ impl LogicalPlanner {
169179
plan = self.plan_with_clause(with_clause, plan)?;
170180
}
171181

172-
// Apply post-WITH MATCH clauses if present (query chaining)
173-
for match_clause in &query.post_with_match_clauses {
174-
plan = self.plan_match_clause_with_base(Some(plan), match_clause)?;
182+
// Plan post-WITH MATCH clauses
183+
if !query.post_with_reading_clauses.is_empty() {
184+
plan = self.plan_reading_clauses(Some(plan), &query.post_with_reading_clauses)?;
175185
}
176186

177187
// Apply post-WITH WHERE clause if present
@@ -219,25 +229,63 @@ impl LogicalPlanner {
219229
Ok(plan)
220230
}
221231

222-
/// Plan MATCH clauses - the core graph pattern matching
223-
fn plan_match_clauses(&mut self, match_clauses: &[MatchClause]) -> Result<LogicalOperator> {
224-
if match_clauses.is_empty() {
232+
fn plan_reading_clauses(
233+
&mut self,
234+
base_plan: Option<LogicalOperator>,
235+
reading_clauses: &[ReadingClause],
236+
) -> Result<LogicalOperator> {
237+
let mut plan = base_plan;
238+
239+
if reading_clauses.is_empty() && plan.is_none() {
225240
return Err(GraphError::PlanError {
226-
message: "Query must have at least one MATCH clause".to_string(),
241+
message: "Query must have at least one MATCH or UNWIND clause".to_string(),
227242
location: snafu::Location::new(file!(), line!(), column!()),
228243
});
229244
}
230245

231-
let plan = match_clauses.iter().try_fold(None, |plan, clause| {
232-
self.plan_match_clause_with_base(plan, clause).map(Some)
233-
})?;
246+
for clause in reading_clauses {
247+
plan = Some(self.plan_reading_clause_with_base(plan, clause)?);
248+
}
234249

235250
plan.ok_or_else(|| GraphError::PlanError {
236-
message: "Failed to plan MATCH clauses".to_string(),
251+
message: "Failed to plan clauses".to_string(),
237252
location: snafu::Location::new(file!(), line!(), column!()),
238253
})
239254
}
240255

256+
/// Plan a single READING clause, optionally starting from an existing base plan
257+
fn plan_reading_clause_with_base(
258+
&mut self,
259+
base: Option<LogicalOperator>,
260+
clause: &ReadingClause,
261+
) -> Result<LogicalOperator> {
262+
match clause {
263+
ReadingClause::Match(match_clause) => {
264+
self.plan_match_clause_with_base(base, match_clause)
265+
}
266+
ReadingClause::Unwind(unwind_clause) => {
267+
self.plan_unwind_clause_with_base(base, unwind_clause)
268+
}
269+
}
270+
}
271+
272+
/// Plan an UNWIND clause
273+
fn plan_unwind_clause_with_base(
274+
&mut self,
275+
base: Option<LogicalOperator>,
276+
unwind_clause: &UnwindClause,
277+
) -> Result<LogicalOperator> {
278+
// Register the alias variable
279+
self.variables
280+
.insert(unwind_clause.alias.clone(), "Unwound".to_string());
281+
282+
Ok(LogicalOperator::Unwind {
283+
input: base.map(Box::new),
284+
expression: unwind_clause.expression.clone(),
285+
alias: unwind_clause.alias.clone(),
286+
})
287+
}
288+
241289
/// Plan a single MATCH clause, optionally starting from an existing base plan
242290
fn plan_match_clause_with_base(
243291
&mut self,
@@ -398,6 +446,7 @@ impl LogicalPlanner {
398446
fn extract_variable_from_plan(&self, plan: &LogicalOperator) -> Result<String> {
399447
match plan {
400448
LogicalOperator::ScanByLabel { variable, .. } => Ok(variable.clone()),
449+
LogicalOperator::Unwind { alias, .. } => Ok(alias.clone()),
401450
LogicalOperator::Expand {
402451
target_variable, ..
403452
} => Ok(target_variable.clone()),

0 commit comments

Comments
 (0)