Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 59 additions & 30 deletions crates/lance-graph/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use std::collections::HashMap;
/// A complete Cypher query
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CypherQuery {
/// MATCH clauses
pub match_clauses: Vec<MatchClause>,
/// READING clauses (MATCH, UNWIND, etc.)
pub reading_clauses: Vec<ReadingClause>,
/// WHERE clause (optional, before WITH if present)
pub where_clause: Option<WhereClause>,
/// WITH clause (optional) - intermediate projection/aggregation
pub with_clause: Option<WithClause>,
/// MATCH clauses after WITH (optional) - query chaining
pub post_with_match_clauses: Vec<MatchClause>,
/// Post-WITH READING clauses
pub post_with_reading_clauses: Vec<ReadingClause>,
/// WHERE clause after WITH (optional) - filters the WITH results
pub post_with_where_clause: Option<WhereClause>,
/// RETURN clause
Expand All @@ -37,28 +37,31 @@ impl CypherQuery {
/// Extract all node labels referenced in the query
pub fn get_node_labels(&self) -> Vec<String> {
let mut labels = Vec::new();
for match_clause in &self.match_clauses {
for pattern in &match_clause.patterns {
match pattern {
GraphPattern::Node(node) => {
for label in &node.labels {
if !labels.contains(label) {
labels.push(label.clone());
}
}
}
GraphPattern::Path(path) => {
for label in &path.start_node.labels {
if !labels.contains(label) {
labels.push(label.clone());
// Iterate all match clauses directly
for clause in &self.reading_clauses {
if let ReadingClause::Match(match_clause) = clause {
for pattern in &match_clause.patterns {
match pattern {
GraphPattern::Node(node) => {
for label in &node.labels {
if !labels.contains(label) {
labels.push(label.clone());
}
}
}
for segment in &path.segments {
for label in &segment.end_node.labels {
GraphPattern::Path(path) => {
for label in &path.start_node.labels {
if !labels.contains(label) {
labels.push(label.clone());
}
}
for segment in &path.segments {
for label in &segment.end_node.labels {
if !labels.contains(label) {
labels.push(label.clone());
}
}
}
}
}
}
Expand All @@ -70,21 +73,38 @@ impl CypherQuery {
/// Extract all relationship types referenced in the query
pub fn get_relationship_types(&self) -> Vec<String> {
let mut types = Vec::new();
for match_clause in &self.match_clauses {
for pattern in &match_clause.patterns {
if let GraphPattern::Path(path) = pattern {
for segment in &path.segments {
for rel_type in &segment.relationship.types {
if !types.contains(rel_type) {
types.push(rel_type.clone());
}
}
}
for clause in &self.reading_clauses {
if let ReadingClause::Match(match_clause) = clause {
for pattern in &match_clause.patterns {
self.collect_relationship_types_from_pattern(pattern, &mut types);
}
}
}
types
}

fn collect_relationship_types_from_pattern(
&self,
pattern: &GraphPattern,
types: &mut Vec<String>,
) {
if let GraphPattern::Path(path) = pattern {
for segment in &path.segments {
for rel_type in &segment.relationship.types {
if !types.contains(rel_type) {
types.push(rel_type.clone());
}
}
}
}
}
}

/// A clause that reads from the graph (MATCH, UNWIND)
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ReadingClause {
Match(MatchClause),
Unwind(UnwindClause),
}

/// A MATCH clause containing graph patterns
Expand All @@ -94,6 +114,15 @@ pub struct MatchClause {
pub patterns: Vec<GraphPattern>,
}

/// An UNWIND clause
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct UnwindClause {
/// Expression to unwind
pub expression: ValueExpression,
/// Alias for the unwound values
pub alias: String,
}

/// A graph pattern (nodes and relationships)
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum GraphPattern {
Expand Down
5 changes: 5 additions & 0 deletions crates/lance-graph/src/datafusion_planner/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ fn analyze_operator(
analyze_operator(left, analysis, rel_counter)?;
analyze_operator(right, analysis, rel_counter)?;
}
LogicalOperator::Unwind { input, .. } => {
if let Some(op) = input {
analyze_operator(op, analysis, rel_counter)?;
}
}
}
Ok(())
}
Expand Down
39 changes: 39 additions & 0 deletions crates/lance-graph/src/datafusion_planner/builder/basic_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,45 @@ impl DataFusionPlanner {
.build()
.map_err(|e| self.plan_error("Failed to build plan", e))
}

pub(crate) fn build_unwind(
&self,
ctx: &mut PlanningContext,
input: &Option<Box<LogicalOperator>>,
expression: &crate::ast::ValueExpression,
alias: &str,
) -> Result<LogicalPlan> {
let input_plan = if let Some(input_op) = input {
self.build_operator(ctx, input_op)?
} else {
// Create an empty relation that produces one row for UNWIND [..]
LogicalPlanBuilder::empty(true)
.build()
.map_err(|e| self.plan_error("Failed to create empty relation", e))?
};

// Convert expression to DataFusion Expr
let df_expr = super::super::expression::to_df_value_expr(expression);

// We project the list expression first (aliased as the target alias temporarily)
// DataFusion unnest takes a column name.
let builder = LogicalPlanBuilder::from(input_plan);

let builder = builder
.project(vec![
datafusion::logical_expr::wildcard(),
datafusion::logical_expr::select_expr::SelectExpr::Expression(df_expr.alias(alias)),
])
.map_err(|e| self.plan_error("Failed to project unwind expression", e))?;

let builder = builder
.unnest_column(alias)
.map_err(|e| self.plan_error("Failed to build unnest", e))?;

builder
.build()
.map_err(|e| self.plan_error("Failed to build unwind plan", e))
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions crates/lance-graph/src/datafusion_planner/builder/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ impl DataFusionPlanner {
Self::collect_variables(left, vars);
Self::collect_variables(right, vars);
}
LogicalOperator::Unwind { input, alias, .. } => {
if let Some(op) = input {
Self::collect_variables(op, vars);
}
vars.push(alias.clone());
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/lance-graph/src/datafusion_planner/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ impl DataFusionPlanner {
right,
join_type,
} => self.build_join(ctx, left, right, join_type),
LogicalOperator::Unwind {
input,
expression,
alias,
} => self.build_unwind(ctx, input, expression, alias),
}
}
}
75 changes: 62 additions & 13 deletions crates/lance-graph/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ pub enum LogicalOperator {
properties: HashMap<String, PropertyValue>,
},

/// Unwind a list into a sequence of rows
Unwind {
/// The input operator
input: Option<Box<LogicalOperator>>,
/// The expression to unwind (must yield a list)
expression: ValueExpression,
/// The alias for the unwound element
alias: String,
},

/// Apply a filter predicate (WHERE clause)
Filter {
input: Box<LogicalOperator>,
Expand Down Expand Up @@ -153,8 +163,8 @@ impl LogicalPlanner {

/// Convert a Cypher AST to a logical plan
pub fn plan(&mut self, query: &CypherQuery) -> Result<LogicalOperator> {
// Start with the MATCH clause(s)
let mut plan = self.plan_match_clauses(&query.match_clauses)?;
// Plan main MATCH clauses
let mut plan = self.plan_reading_clauses(None, &query.reading_clauses)?;

// Apply WHERE clause if present (before WITH)
if let Some(where_clause) = &query.where_clause {
Expand All @@ -169,9 +179,9 @@ impl LogicalPlanner {
plan = self.plan_with_clause(with_clause, plan)?;
}

// Apply post-WITH MATCH clauses if present (query chaining)
for match_clause in &query.post_with_match_clauses {
plan = self.plan_match_clause_with_base(Some(plan), match_clause)?;
// Plan post-WITH MATCH clauses
if !query.post_with_reading_clauses.is_empty() {
plan = self.plan_reading_clauses(Some(plan), &query.post_with_reading_clauses)?;
}

// Apply post-WITH WHERE clause if present
Expand Down Expand Up @@ -219,25 +229,63 @@ impl LogicalPlanner {
Ok(plan)
}

/// Plan MATCH clauses - the core graph pattern matching
fn plan_match_clauses(&mut self, match_clauses: &[MatchClause]) -> Result<LogicalOperator> {
if match_clauses.is_empty() {
fn plan_reading_clauses(
&mut self,
base_plan: Option<LogicalOperator>,
reading_clauses: &[ReadingClause],
) -> Result<LogicalOperator> {
let mut plan = base_plan;

if reading_clauses.is_empty() && plan.is_none() {
return Err(GraphError::PlanError {
message: "Query must have at least one MATCH clause".to_string(),
message: "Query must have at least one MATCH or UNWIND clause".to_string(),
location: snafu::Location::new(file!(), line!(), column!()),
});
}

let plan = match_clauses.iter().try_fold(None, |plan, clause| {
self.plan_match_clause_with_base(plan, clause).map(Some)
})?;
for clause in reading_clauses {
plan = Some(self.plan_reading_clause_with_base(plan, clause)?);
}

plan.ok_or_else(|| GraphError::PlanError {
message: "Failed to plan MATCH clauses".to_string(),
message: "Failed to plan clauses".to_string(),
location: snafu::Location::new(file!(), line!(), column!()),
})
}

/// Plan a single READING clause, optionally starting from an existing base plan
fn plan_reading_clause_with_base(
&mut self,
base: Option<LogicalOperator>,
clause: &ReadingClause,
) -> Result<LogicalOperator> {
match clause {
ReadingClause::Match(match_clause) => {
self.plan_match_clause_with_base(base, match_clause)
}
ReadingClause::Unwind(unwind_clause) => {
self.plan_unwind_clause_with_base(base, unwind_clause)
}
}
}

/// Plan an UNWIND clause
fn plan_unwind_clause_with_base(
&mut self,
base: Option<LogicalOperator>,
unwind_clause: &UnwindClause,
) -> Result<LogicalOperator> {
// Register the alias variable
self.variables
.insert(unwind_clause.alias.clone(), "Unwound".to_string());

Ok(LogicalOperator::Unwind {
input: base.map(Box::new),
expression: unwind_clause.expression.clone(),
alias: unwind_clause.alias.clone(),
})
}

/// Plan a single MATCH clause, optionally starting from an existing base plan
fn plan_match_clause_with_base(
&mut self,
Expand Down Expand Up @@ -398,6 +446,7 @@ impl LogicalPlanner {
fn extract_variable_from_plan(&self, plan: &LogicalOperator) -> Result<String> {
match plan {
LogicalOperator::ScanByLabel { variable, .. } => Ok(variable.clone()),
LogicalOperator::Unwind { alias, .. } => Ok(alias.clone()),
LogicalOperator::Expand {
target_variable, ..
} => Ok(target_variable.clone()),
Expand Down
Loading
Loading