Skip to content

Commit c27809a

Browse files
authored
refactor: refactor the datafusion planner (#37)
1 parent d843b4b commit c27809a

9 files changed

Lines changed: 4366 additions & 3612 deletions

File tree

rust/lance-graph/src/datafusion_planner.rs

Lines changed: 0 additions & 3612 deletions
This file was deleted.
Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Query Analysis Phase
5+
//!
6+
//! Assigns unique IDs to relationship instances and collects variable-to-label mappings
7+
8+
use crate::ast::RelationshipDirection;
9+
use crate::error::Result;
10+
use crate::logical_plan::*;
11+
use std::collections::{HashMap, HashSet};
12+
13+
/// Analysis result containing all metadata needed for planning
14+
#[derive(Debug, Clone, Default)]
15+
pub struct QueryAnalysis {
16+
/// Variable → Label mappings (e.g., "n" → "Person")
17+
pub var_to_label: HashMap<String, String>,
18+
19+
/// Relationship instances with unique IDs to avoid column conflicts
20+
pub relationship_instances: Vec<RelationshipInstance>,
21+
22+
/// All datasets required for this query
23+
pub required_datasets: HashSet<String>,
24+
}
25+
26+
/// Represents a single relationship expansion with a unique instance ID
27+
#[derive(Debug, Clone)]
28+
pub struct RelationshipInstance {
29+
pub id: usize, // Unique instance number
30+
pub rel_type: String,
31+
pub source_var: String,
32+
pub target_var: String,
33+
pub direction: RelationshipDirection,
34+
pub alias: String, // e.g., "friend_of_1", "friend_of_2"
35+
}
36+
37+
/// Planning context that tracks state during plan building
38+
pub struct PlanningContext<'a> {
39+
pub analysis: &'a QueryAnalysis,
40+
pub(crate) relationship_instance_idx: HashMap<String, usize>,
41+
}
42+
43+
impl<'a> PlanningContext<'a> {
44+
pub fn new(analysis: &'a QueryAnalysis) -> Self {
45+
Self {
46+
analysis,
47+
relationship_instance_idx: HashMap::new(),
48+
}
49+
}
50+
51+
/// Get the next relationship instance for a given type (returns a clone)
52+
pub fn next_relationship_instance(&mut self, rel_type: &str) -> Result<RelationshipInstance> {
53+
let idx = self
54+
.relationship_instance_idx
55+
.entry(rel_type.to_string())
56+
.and_modify(|i| *i += 1)
57+
.or_insert(0);
58+
59+
self.analysis
60+
.relationship_instances
61+
.iter()
62+
.filter(|r| r.rel_type == rel_type)
63+
.nth(*idx)
64+
.cloned()
65+
.ok_or_else(|| crate::error::GraphError::PlanError {
66+
message: format!("No relationship instance found for: {}", rel_type),
67+
location: snafu::Location::new(file!(), line!(), column!()),
68+
})
69+
}
70+
}
71+
72+
/// Analyze the logical plan to extract metadata
73+
pub fn analyze(logical_plan: &LogicalOperator) -> Result<QueryAnalysis> {
74+
let mut analysis = QueryAnalysis::default();
75+
let mut rel_counter: HashMap<String, usize> = HashMap::new();
76+
77+
analyze_operator(logical_plan, &mut analysis, &mut rel_counter)?;
78+
Ok(analysis)
79+
}
80+
81+
/// Recursively analyze operators to build QueryAnalysis
82+
fn analyze_operator(
83+
op: &LogicalOperator,
84+
analysis: &mut QueryAnalysis,
85+
rel_counter: &mut HashMap<String, usize>,
86+
) -> Result<()> {
87+
match op {
88+
LogicalOperator::ScanByLabel {
89+
variable, label, ..
90+
} => {
91+
analysis
92+
.var_to_label
93+
.insert(variable.clone(), label.clone());
94+
analysis.required_datasets.insert(label.clone());
95+
}
96+
LogicalOperator::Expand {
97+
input,
98+
source_variable,
99+
target_variable,
100+
target_label,
101+
relationship_types,
102+
direction,
103+
relationship_variable,
104+
..
105+
} => {
106+
// Recursively analyze input first
107+
analyze_operator(input, analysis, rel_counter)?;
108+
109+
// Register the target variable with its label from the logical plan
110+
analysis
111+
.var_to_label
112+
.insert(target_variable.clone(), target_label.clone());
113+
114+
// Assign unique instance ID for this relationship
115+
if let Some(rel_type) = relationship_types.first() {
116+
let instance_id = rel_counter
117+
.entry(rel_type.clone())
118+
.and_modify(|c| *c += 1)
119+
.or_insert(1);
120+
121+
// Use relationship variable if provided, otherwise use type_instanceId
122+
let alias = if let Some(rel_var) = relationship_variable {
123+
rel_var.clone()
124+
} else {
125+
format!("{}_{}", rel_type.to_lowercase(), instance_id)
126+
};
127+
128+
analysis.relationship_instances.push(RelationshipInstance {
129+
id: *instance_id,
130+
rel_type: rel_type.clone(),
131+
source_var: source_variable.clone(),
132+
target_var: target_variable.clone(),
133+
direction: direction.clone(),
134+
alias,
135+
});
136+
137+
analysis.required_datasets.insert(rel_type.clone());
138+
}
139+
}
140+
LogicalOperator::VariableLengthExpand {
141+
input,
142+
source_variable,
143+
target_variable,
144+
relationship_types,
145+
direction,
146+
relationship_variable,
147+
min_length,
148+
max_length,
149+
..
150+
} => {
151+
// Recursively analyze input first
152+
analyze_operator(input, analysis, rel_counter)?;
153+
154+
// Infer target variable's label from source variable
155+
// For (a:Person)-[:KNOWS]->(b), b also gets label Person
156+
if let Some(source_label) = analysis.var_to_label.get(source_variable).cloned() {
157+
analysis
158+
.var_to_label
159+
.insert(target_variable.clone(), source_label);
160+
}
161+
162+
// For variable-length paths, register multiple instances (one per hop)
163+
// We need to register instances for all possible hop counts
164+
if let Some(rel_type) = relationship_types.first() {
165+
let max_hops = max_length.unwrap_or(crate::MAX_VARIABLE_LENGTH_HOPS);
166+
let min_hops = min_length.unwrap_or(1).max(1);
167+
168+
// Register instances for each hop count we'll generate
169+
for hop_count in min_hops..=max_hops {
170+
for _ in 0..hop_count {
171+
let instance_id = rel_counter
172+
.entry(rel_type.clone())
173+
.and_modify(|c| *c += 1)
174+
.or_insert(1);
175+
176+
// Use relationship variable if provided, otherwise use type_instanceId
177+
let alias = if let Some(rel_var) = relationship_variable {
178+
format!("{}_{}", rel_var, instance_id)
179+
} else {
180+
format!("{}_{}", rel_type.to_lowercase(), instance_id)
181+
};
182+
183+
analysis.relationship_instances.push(RelationshipInstance {
184+
id: *instance_id,
185+
rel_type: rel_type.clone(),
186+
source_var: source_variable.clone(),
187+
target_var: target_variable.clone(),
188+
direction: direction.clone(),
189+
alias,
190+
});
191+
}
192+
}
193+
194+
analysis.required_datasets.insert(rel_type.clone());
195+
}
196+
}
197+
LogicalOperator::Filter { input, .. }
198+
| LogicalOperator::Project { input, .. }
199+
| LogicalOperator::Sort { input, .. }
200+
| LogicalOperator::Limit { input, .. }
201+
| LogicalOperator::Offset { input, .. }
202+
| LogicalOperator::Distinct { input } => {
203+
analyze_operator(input, analysis, rel_counter)?;
204+
}
205+
LogicalOperator::Join { left, right, .. } => {
206+
analyze_operator(left, analysis, rel_counter)?;
207+
analyze_operator(right, analysis, rel_counter)?;
208+
}
209+
}
210+
Ok(())
211+
}
212+
213+
#[cfg(test)]
214+
mod tests {
215+
use super::*;
216+
use crate::ast::RelationshipDirection;
217+
use crate::logical_plan::LogicalOperator;
218+
use std::collections::HashMap;
219+
220+
#[test]
221+
fn test_query_analysis_single_hop() {
222+
// Test that analysis correctly identifies relationship instances
223+
let scan_a = LogicalOperator::ScanByLabel {
224+
variable: "a".to_string(),
225+
label: "Person".to_string(),
226+
properties: Default::default(),
227+
};
228+
let expand = LogicalOperator::Expand {
229+
input: Box::new(scan_a),
230+
source_variable: "a".to_string(),
231+
target_variable: "b".to_string(),
232+
target_label: "Person".to_string(),
233+
relationship_types: vec!["KNOWS".to_string()],
234+
direction: RelationshipDirection::Outgoing,
235+
relationship_variable: None,
236+
properties: Default::default(),
237+
target_properties: Default::default(),
238+
};
239+
240+
let cfg = crate::config::GraphConfig::builder()
241+
.with_node_label("Person", "id")
242+
.with_relationship("KNOWS", "src_id", "dst_id")
243+
.build()
244+
.unwrap();
245+
let _planner = crate::datafusion_planner::DataFusionPlanner::new(cfg);
246+
let analysis = analyze(&expand).unwrap();
247+
248+
// Should have two variable mappings: a and b both map to Person
249+
assert_eq!(analysis.var_to_label.len(), 2);
250+
assert_eq!(analysis.var_to_label.get("a"), Some(&"Person".to_string()));
251+
assert_eq!(analysis.var_to_label.get("b"), Some(&"Person".to_string()));
252+
253+
// Should have one relationship instance
254+
assert_eq!(analysis.relationship_instances.len(), 1);
255+
assert_eq!(analysis.relationship_instances[0].rel_type, "KNOWS");
256+
assert_eq!(analysis.relationship_instances[0].alias, "knows_1");
257+
assert_eq!(analysis.relationship_instances[0].id, 1);
258+
}
259+
260+
#[test]
261+
fn test_query_analysis_two_hop() {
262+
// Test that two-hop queries get unique relationship instances
263+
let scan_a = LogicalOperator::ScanByLabel {
264+
variable: "a".to_string(),
265+
label: "Person".to_string(),
266+
properties: Default::default(),
267+
};
268+
let expand1 = LogicalOperator::Expand {
269+
input: Box::new(scan_a),
270+
source_variable: "a".to_string(),
271+
target_variable: "b".to_string(),
272+
target_label: "Person".to_string(),
273+
relationship_types: vec!["KNOWS".to_string()],
274+
direction: RelationshipDirection::Outgoing,
275+
relationship_variable: None,
276+
properties: Default::default(),
277+
target_properties: Default::default(),
278+
};
279+
let expand2 = LogicalOperator::Expand {
280+
input: Box::new(expand1),
281+
source_variable: "b".to_string(),
282+
target_variable: "c".to_string(),
283+
target_label: "Person".to_string(),
284+
relationship_types: vec!["KNOWS".to_string()],
285+
direction: RelationshipDirection::Outgoing,
286+
relationship_variable: None,
287+
properties: Default::default(),
288+
target_properties: Default::default(),
289+
};
290+
291+
let cfg = crate::config::GraphConfig::builder()
292+
.with_node_label("Person", "id")
293+
.with_relationship("KNOWS", "src_id", "dst_id")
294+
.build()
295+
.unwrap();
296+
let _planner = crate::datafusion_planner::DataFusionPlanner::new(cfg);
297+
let analysis = analyze(&expand2).unwrap();
298+
299+
// Should have two relationship instances with UNIQUE aliases
300+
assert_eq!(analysis.relationship_instances.len(), 2);
301+
assert_eq!(analysis.relationship_instances[0].alias, "knows_1");
302+
assert_eq!(analysis.relationship_instances[1].alias, "knows_2");
303+
304+
// Both should be KNOWS but with different IDs
305+
assert_eq!(analysis.relationship_instances[0].rel_type, "KNOWS");
306+
assert_eq!(analysis.relationship_instances[1].rel_type, "KNOWS");
307+
assert_eq!(analysis.relationship_instances[0].id, 1);
308+
assert_eq!(analysis.relationship_instances[1].id, 2);
309+
}
310+
311+
#[test]
312+
fn test_varlength_expand_analysis_registers_instances() {
313+
// Test that analysis phase correctly registers multiple relationship instances
314+
let scan_a = LogicalOperator::ScanByLabel {
315+
variable: "a".to_string(),
316+
label: "Person".to_string(),
317+
properties: Default::default(),
318+
};
319+
let vlexpand = LogicalOperator::VariableLengthExpand {
320+
input: Box::new(scan_a),
321+
source_variable: "a".to_string(),
322+
target_variable: "b".to_string(),
323+
relationship_types: vec!["KNOWS".to_string()],
324+
direction: RelationshipDirection::Outgoing,
325+
relationship_variable: None,
326+
min_length: Some(1),
327+
max_length: Some(2),
328+
target_properties: HashMap::new(),
329+
};
330+
331+
let cfg = crate::config::GraphConfig::builder()
332+
.with_node_label("Person", "id")
333+
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
334+
.build()
335+
.unwrap();
336+
let _planner = crate::datafusion_planner::DataFusionPlanner::new(cfg);
337+
let analysis = analyze(&vlexpand).unwrap();
338+
339+
// For *1..2, should register 1 + 2 = 3 instances
340+
let knows_instances: Vec<_> = analysis
341+
.relationship_instances
342+
.iter()
343+
.filter(|r| r.rel_type == "KNOWS")
344+
.collect();
345+
346+
assert_eq!(
347+
knows_instances.len(),
348+
3,
349+
"Expected 3 KNOWS instances (1 for 1-hop + 2 for 2-hop)"
350+
);
351+
}
352+
353+
#[test]
354+
fn test_planning_context_tracks_instances() {
355+
// Test that PlanningContext correctly iterates through instances
356+
let instances = vec![
357+
RelationshipInstance {
358+
id: 1,
359+
rel_type: "KNOWS".to_string(),
360+
source_var: "a".to_string(),
361+
target_var: "b".to_string(),
362+
direction: RelationshipDirection::Outgoing,
363+
alias: "knows_1".to_string(),
364+
},
365+
RelationshipInstance {
366+
id: 2,
367+
rel_type: "KNOWS".to_string(),
368+
source_var: "b".to_string(),
369+
target_var: "c".to_string(),
370+
direction: RelationshipDirection::Outgoing,
371+
alias: "knows_2".to_string(),
372+
},
373+
];
374+
375+
let analysis = QueryAnalysis {
376+
var_to_label: HashMap::new(),
377+
relationship_instances: instances,
378+
required_datasets: HashSet::new(),
379+
};
380+
381+
let mut ctx = PlanningContext::new(&analysis);
382+
383+
// First call should return first instance
384+
let inst1 = ctx.next_relationship_instance("KNOWS").unwrap();
385+
assert_eq!(inst1.alias, "knows_1");
386+
387+
// Second call should return second instance
388+
let inst2 = ctx.next_relationship_instance("KNOWS").unwrap();
389+
assert_eq!(inst2.alias, "knows_2");
390+
391+
// Third call should error (no more instances)
392+
assert!(ctx.next_relationship_instance("KNOWS").is_err());
393+
}
394+
}

0 commit comments

Comments
 (0)