Skip to content

Commit 1e7767f

Browse files
authored
feat(namespace): add directory-backed execution (#96)
1 parent 2ba1faa commit 1e7767f

13 files changed

Lines changed: 520 additions & 36 deletions

File tree

python/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ markers = [
7777
"slow",
7878
"torch: tests which rely on pytorch being installed",
7979
"recurring: marks tests as recurring tests",
80+
"requires_lance: tests that need the Lance Python bindings available",
8081
]
8182
filterwarnings = [
8283
'error::FutureWarning',

python/python/lance_graph/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,15 @@ def _load_dev_build() -> ModuleType:
7373
CypherQuery = _bindings.graph.CypherQuery
7474
VectorSearch = _bindings.graph.VectorSearch
7575
DistanceMetric = _bindings.graph.DistanceMetric
76+
DirNamespace = _bindings.graph.DirNamespace
7677

7778
__all__ = [
7879
"GraphConfig",
7980
"GraphConfigBuilder",
8081
"CypherQuery",
8182
"VectorSearch",
8283
"DistanceMetric",
84+
"DirNamespace",
8385
]
8486

8587
__version__ = _bindings.__version__

python/python/tests/test_graph.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pyarrow as pa
55
import pytest
6-
from lance_graph import CypherQuery, GraphConfig
6+
from lance_graph import CypherQuery, DirNamespace, GraphConfig
77

88

99
@pytest.fixture
@@ -194,3 +194,24 @@ def test_distinct_clause(graph_env):
194194

195195
assert len(data["c.company_name"]) == 3
196196
assert set(data["c.company_name"]) == {"TechCorp", "DataInc", "CloudSoft"}
197+
198+
199+
@pytest.mark.requires_lance
200+
def test_execute_with_directory_namespace(graph_env, tmp_path):
201+
config, datasets, _ = graph_env
202+
203+
from lance import write_dataset
204+
205+
for name, table in datasets.items():
206+
write_dataset(table, tmp_path / f"{name}.lance")
207+
208+
namespace = DirNamespace(str(tmp_path))
209+
210+
query = CypherQuery("MATCH (p:Person) WHERE p.age > 30 RETURN p.name").with_config(
211+
config
212+
)
213+
214+
result = query.execute_with_namespace(namespace)
215+
data = result.to_pydict()
216+
217+
assert set(data["p.name"]) == {"Bob", "David"}

python/src/graph.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ use arrow::ffi_stream::ArrowArrayStreamReader;
2222
use arrow_array::{RecordBatch, RecordBatchReader};
2323
use arrow_schema::Schema;
2424
use lance_graph::{
25-
ExecutionStrategy as RustExecutionStrategy, CypherQuery as RustCypherQuery,
26-
GraphConfig as RustGraphConfig, GraphError as RustGraphError,
27-
VectorSearch as RustVectorSearch, ast::DistanceMetric as RustDistanceMetric,
25+
ast::DistanceMetric as RustDistanceMetric, CypherQuery as RustCypherQuery,
26+
ExecutionStrategy as RustExecutionStrategy, GraphConfig as RustGraphConfig,
27+
GraphError as RustGraphError, VectorSearch as RustVectorSearch,
2828
};
2929
use pyo3::{
3030
exceptions::{PyNotImplementedError, PyRuntimeError, PyValueError},
@@ -34,6 +34,7 @@ use pyo3::{
3434
};
3535
use serde_json::Value as JsonValue;
3636

37+
use crate::namespace::PyDirNamespace;
3738
use crate::RT;
3839

3940
/// Execution strategy for Cypher queries
@@ -537,6 +538,45 @@ impl CypherQuery {
537538
record_batch_to_python_table(py, &result_batch)
538539
}
539540

541+
/// Execute query using a namespace resolver.
542+
///
543+
/// Parameters
544+
/// ----------
545+
/// namespace : DirNamespace
546+
/// Directory-backed namespace that resolves table names to Lance datasets.
547+
/// strategy : ExecutionStrategy, optional
548+
/// Execution strategy to use (defaults to DataFusion)
549+
///
550+
/// Returns
551+
/// -------
552+
/// pyarrow.Table
553+
/// Query results as Arrow table
554+
///
555+
/// Raises
556+
/// ------
557+
/// RuntimeError
558+
/// If query execution fails
559+
#[pyo3(signature = (namespace, strategy=None))]
560+
fn execute_with_namespace(
561+
&self,
562+
py: Python,
563+
namespace: &Bound<'_, PyDirNamespace>,
564+
strategy: Option<ExecutionStrategy>,
565+
) -> PyResult<PyObject> {
566+
let rust_strategy = strategy.map(|s| s.into());
567+
let inner_query = self.inner.clone();
568+
let namespace_arc = namespace.borrow().inner.clone();
569+
570+
let result_batch = RT
571+
.block_on(
572+
Some(py),
573+
inner_query.execute_with_namespace_arc(namespace_arc, rust_strategy),
574+
)?
575+
.map_err(graph_error_to_pyerr)?;
576+
577+
record_batch_to_python_table(py, &result_batch)
578+
}
579+
540580
/// Explain query using the DataFusion planner with in-memory datasets
541581
///
542582
/// Parameters
@@ -641,7 +681,10 @@ impl CypherQuery {
641681

642682
// Execute via runtime
643683
let result = RT
644-
.block_on(Some(py), inner_query.execute_with_vector_rerank(arrow_datasets, vs))?
684+
.block_on(
685+
Some(py),
686+
inner_query.execute_with_vector_rerank(arrow_datasets, vs),
687+
)?
645688
.map_err(graph_error_to_pyerr)?;
646689

647690
record_batch_to_python_table(py, &result)
@@ -840,6 +883,7 @@ pub fn register_graph_module(py: Python, parent_module: &Bound<'_, PyModule>) ->
840883
graph_module.add_class::<GraphConfigBuilder>()?;
841884
graph_module.add_class::<CypherQuery>()?;
842885
graph_module.add_class::<VectorSearch>()?;
886+
graph_module.add_class::<PyDirNamespace>()?;
843887

844888
parent_module.add_submodule(&graph_module)?;
845889
Ok(())

python/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use pyo3::prelude::*;
44

55
mod executor;
66
mod graph;
7+
mod namespace;
78

89
pub(crate) static RT: LazyLock<executor::BackgroundExecutor> =
910
LazyLock::new(executor::BackgroundExecutor::new);

python/src/namespace.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use std::sync::Arc;
2+
3+
use lance_graph::namespace::DirNamespace;
4+
use pyo3::prelude::*;
5+
6+
#[pyclass(name = "DirNamespace", module = "lance.graph")]
7+
pub struct PyDirNamespace {
8+
pub(crate) inner: Arc<DirNamespace>,
9+
}
10+
11+
#[pymethods]
12+
impl PyDirNamespace {
13+
#[new]
14+
fn new(base_uri: String) -> Self {
15+
Self {
16+
inner: Arc::new(DirNamespace::new(base_uri)),
17+
}
18+
}
19+
20+
#[getter]
21+
fn base_uri(&self) -> String {
22+
self.inner.base_uri().to_string()
23+
}
24+
}

rust/lance-graph/Cargo.lock

Lines changed: 32 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)