Skip to content

Commit d263482

Browse files
committed
feat(namespace): allow executing Cypher against namespaces
1 parent 2ba1faa commit d263482

8 files changed

Lines changed: 366 additions & 13 deletions

File tree

python/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ arrow-schema = "56.2"
1616
arrow-ipc = "56.2"
1717
futures = "0.3"
1818
lance-graph = { path = "../rust/lance-graph" }
19+
lance-namespace = "1.0.1"
1920
serde = { version = "1", features = ["derive"] }
2021
serde_json = "1"
22+
async-trait = "0.1"
2123
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39", "py-clone"] }
24+
snafu = "0.8"
2225
tokio = { version = "1.37", features = ["rt-multi-thread", "macros"] }

python/python/tests/test_graph.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright The Lance Authors
33

4+
from pathlib import Path
5+
46
import pyarrow as pa
57
import pytest
68
from lance_graph import CypherQuery, GraphConfig
@@ -194,3 +196,32 @@ def test_distinct_clause(graph_env):
194196

195197
assert len(data["c.company_name"]) == 3
196198
assert set(data["c.company_name"]) == {"TechCorp", "DataInc", "CloudSoft"}
199+
200+
201+
def test_execute_with_directory_namespace(graph_env, tmp_path, monkeypatch):
202+
config, datasets, _ = graph_env
203+
204+
repo_root = Path(__file__).resolve().parents[3]
205+
monkeypatch.syspath_prepend(str(repo_root / "lance" / "python" / "python"))
206+
monkeypatch.syspath_prepend(str(repo_root / "lance-namespace" / "python"))
207+
208+
try:
209+
from lance import write_dataset
210+
from lance_namespace import connect
211+
except ImportError:
212+
pytest.skip("Lance namespace dependencies not available")
213+
214+
for name, table in datasets.items():
215+
write_dataset(table, tmp_path / f"{name}.lance")
216+
217+
catalog = connect("dir", {"root": str(tmp_path)})
218+
219+
query = (
220+
CypherQuery("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")
221+
.with_config(config)
222+
)
223+
224+
result = query.execute(catalog)
225+
data = result.to_pydict()
226+
227+
assert set(data["p.name"]) == {"Bob", "David"}

python/src/graph.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use pyo3::{
3434
};
3535
use serde_json::Value as JsonValue;
3636

37+
use crate::namespace::PyNamespaceAdapter;
3738
use crate::RT;
3839

3940
/// Execution strategy for Cypher queries
@@ -490,8 +491,9 @@ impl CypherQuery {
490491
///
491492
/// Parameters
492493
/// ----------
493-
/// datasets : dict
494-
/// Dictionary mapping table names to Lance datasets
494+
/// datasets : dict or LanceNamespace
495+
/// Either a dictionary mapping table names to Lance datasets, or a Lance
496+
/// namespace instance that can resolve table locations dynamically.
495497
/// strategy : ExecutionStrategy, optional
496498
/// Execution strategy to use (defaults to DataFusion)
497499
///
@@ -517,24 +519,37 @@ impl CypherQuery {
517519
fn execute(
518520
&self,
519521
py: Python,
520-
datasets: &Bound<'_, PyDict>,
522+
datasets: &Bound<'_, PyAny>,
521523
strategy: Option<ExecutionStrategy>,
522524
) -> PyResult<PyObject> {
523-
// Convert datasets to Arrow batches while holding the GIL
524-
let arrow_datasets = python_datasets_to_batches(datasets)?;
525-
526-
// Convert Python strategy to Rust strategy
527525
let rust_strategy = strategy.map(|s| s.into());
528526

529-
// Clone the inner query for use in the async block
530527
let inner_query = self.inner.clone();
531528

532-
// Use RT.block_on with Some(py) like the scanner to_pyarrow method
533-
let result_batch = RT
534-
.block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))?
535-
.map_err(graph_error_to_pyerr)?;
529+
if let Ok(dict) = datasets.downcast::<PyDict>() {
530+
// Convert datasets to Arrow batches while holding the GIL
531+
let arrow_datasets = python_datasets_to_batches(&dict)?;
532+
533+
// Use RT.block_on with Some(py) like the scanner to_pyarrow method
534+
let result_batch = RT
535+
.block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))?
536+
.map_err(graph_error_to_pyerr)?;
536537

537-
record_batch_to_python_table(py, &result_batch)
538+
record_batch_to_python_table(py, &result_batch)
539+
} else {
540+
// Treat input as a Lance namespace catalog
541+
let namespace_adapter = PyNamespaceAdapter::new(datasets.clone().unbind());
542+
let namespace = Arc::new(namespace_adapter) as Arc<dyn lance_namespace::LanceNamespace>;
543+
544+
let result_batch = RT
545+
.block_on(
546+
Some(py),
547+
inner_query.execute_with_namespace(namespace, rust_strategy),
548+
)?
549+
.map_err(graph_error_to_pyerr)?;
550+
551+
record_batch_to_python_table(py, &result_batch)
552+
}
538553
}
539554

540555
/// Explain query using the DataFusion planner with in-memory datasets

python/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use pyo3::prelude::*;
44

55
mod executor;
66
mod graph;
7+
mod namespace;
78

89
pub(crate) static RT: LazyLock<executor::BackgroundExecutor> =
910
LazyLock::new(executor::BackgroundExecutor::new);

python/src/namespace.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
use std::collections::HashMap;
5+
6+
use async_trait::async_trait;
7+
use lance_namespace::models::{DescribeTableRequest, DescribeTableResponse};
8+
use lance_namespace::{Error as NamespaceError, LanceNamespace, Result as NamespaceResult};
9+
use pyo3::prelude::*;
10+
use pyo3::prelude::PyAnyMethods;
11+
use pyo3::types::{PyAny, PyDict, PyList};
12+
use snafu::Location;
13+
14+
#[derive(Clone, Debug)]
15+
pub struct PyNamespaceAdapter {
16+
namespace: Py<PyAny>,
17+
}
18+
19+
impl PyNamespaceAdapter {
20+
pub fn new(namespace: Py<PyAny>) -> Self {
21+
Self { namespace }
22+
}
23+
24+
fn convert_response(
25+
py: Python,
26+
response: &Bound<'_, PyAny>,
27+
) -> NamespaceResult<DescribeTableResponse> {
28+
let location = extract_field::<String>(py, response, "location")?;
29+
let storage_options = extract_field::<HashMap<String, String>>(
30+
py,
31+
response,
32+
"storage_options",
33+
)?;
34+
35+
Ok(DescribeTableResponse {
36+
location,
37+
storage_options,
38+
..DescribeTableResponse::new()
39+
})
40+
}
41+
}
42+
43+
fn extract_field<T>(
44+
py: Python,
45+
obj: &Bound<'_, PyAny>,
46+
key: &str,
47+
) -> NamespaceResult<Option<T>>
48+
where
49+
T: for<'a> FromPyObject<'a>,
50+
{
51+
if let Ok(attr) = obj.getattr(key) {
52+
if attr.is_none() {
53+
return Ok(None);
54+
}
55+
return attr
56+
.extract::<T>()
57+
.map(Some)
58+
.map_err(py_err_to_namespace_error);
59+
}
60+
61+
if obj
62+
.hasattr("get")
63+
.map_err(py_err_to_namespace_error)?
64+
{
65+
let value = obj
66+
.call_method1("get", (key, py.None()))
67+
.map_err(py_err_to_namespace_error)?;
68+
if value.is_none() {
69+
return Ok(None);
70+
}
71+
return value
72+
.extract::<T>()
73+
.map(Some)
74+
.map_err(py_err_to_namespace_error);
75+
}
76+
77+
Ok(None)
78+
}
79+
80+
unsafe impl Send for PyNamespaceAdapter {}
81+
unsafe impl Sync for PyNamespaceAdapter {}
82+
83+
#[derive(Debug)]
84+
struct PyNamespaceError(String);
85+
86+
impl std::fmt::Display for PyNamespaceError {
87+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88+
write!(f, "{}", self.0)
89+
}
90+
}
91+
92+
impl std::error::Error for PyNamespaceError {}
93+
94+
fn py_err_to_namespace_error(err: PyErr) -> NamespaceError {
95+
NamespaceError::Namespace {
96+
source: Box::new(PyNamespaceError(err.to_string())),
97+
location: Location::new(file!(), line!(), column!()),
98+
}
99+
}
100+
101+
#[async_trait]
102+
impl LanceNamespace for PyNamespaceAdapter {
103+
fn namespace_id(&self) -> String {
104+
Python::with_gil(|py| {
105+
let obj = self.namespace.bind(py);
106+
if let Ok(id_obj) = obj.call_method0("namespace_id") {
107+
if let Ok(id) = id_obj.extract::<String>() {
108+
return id;
109+
}
110+
}
111+
112+
obj.repr()
113+
.map(|repr| repr.to_string())
114+
.unwrap_or_else(|_| "PyNamespaceAdapter".to_string())
115+
})
116+
}
117+
118+
async fn describe_table(
119+
&self,
120+
request: DescribeTableRequest,
121+
) -> NamespaceResult<DescribeTableResponse> {
122+
let namespace = self.namespace.clone();
123+
let id = request.id.clone();
124+
let version = request.version;
125+
126+
tokio::task::spawn_blocking(move || {
127+
Python::with_gil(|py| -> NamespaceResult<DescribeTableResponse> {
128+
let obj = namespace.bind(py);
129+
let kwargs = PyDict::new(py);
130+
131+
if let Some(id_components) = id {
132+
let py_list =
133+
PyList::new(py, &id_components).map_err(py_err_to_namespace_error)?;
134+
kwargs
135+
.set_item("id", py_list)
136+
.map_err(py_err_to_namespace_error)?;
137+
}
138+
139+
if let Some(version) = version {
140+
kwargs
141+
.set_item("version", version)
142+
.map_err(py_err_to_namespace_error)?;
143+
}
144+
145+
let result = obj
146+
.call_method("describe_table", (), Some(&kwargs))
147+
.map_err(py_err_to_namespace_error)?;
148+
149+
Self::convert_response(py, &result)
150+
})
151+
})
152+
.await
153+
.map_err(|err| NamespaceError::Namespace {
154+
source: Box::new(PyNamespaceError(format!(
155+
"Python describe_table task failed: {}",
156+
err
157+
))),
158+
location: Location::new(file!(), line!(), column!()),
159+
})?
160+
}
161+
}

rust/lance-graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ datafusion-functions-aggregate = "50.3"
3030
futures = "0.3"
3131
lance = "1.0.0"
3232
lance-linalg = "1.0.0"
33+
lance-namespace = "1.0.1"
3334
nom = "7.1"
3435
serde = { version = "1", features = ["derive"] }
3536
serde_json = "1"

0 commit comments

Comments
 (0)