Skip to content

Commit 82b98a2

Browse files
jja725claude
andauthored
Feat: unity catalog/delta lake integration (#145)
## Summary - Add **Unity Catalog** (OSS) integration for browsing catalog metadata and auto-registering tables into `SqlEngine` - Introduce **Presto-inspired extensible connector architecture** with clean separation of catalog metadata (`CatalogProvider`) and data format reading (`TableReader`) - Support **Delta Lake** and **Parquet** table formats out of the box - Support **cloud storage** (S3, Azure, GCS) via `storage_options` ## Architecture Inspired by [Presto's connector SPI](https://github.com/prestodb/presto/tree/master/presto-spi/src/main/java/com/facebook/presto/spi/connector): | Layer | Component | Purpose | |-------|-----------|---------| | SPI | `CatalogProvider` trait | Browse catalog metadata (like Presto's `ConnectorMetadata`) | | SPI | `TableReader` trait | Read data in specific formats (like Presto's `ConnectorPageSourceProvider`) | | Facade | `Connector` struct | Bundles catalog + readers + storage options (like Presto's `Connector`) | **Extensibility:** - New catalog (e.g., AWS Glue) → implement `CatalogProvider`, reuses existing Delta/Parquet readers - New format (e.g., Iceberg) → implement `TableReader`, works with any catalog ## Python API \`\`\`python from lance_graph import UnityCatalog # Connect to Unity Catalog uc = UnityCatalog("http://localhost:8080/api/2.1/unity-catalog") # Browse catalogs = uc.list_catalogs() tables = uc.list_tables("unity", "default") table = uc.get_table("unity", "default", "marksheet") print(table.columns()) # Auto-register Delta + Parquet tables and query via SQL engine = uc.create_sql_engine("unity", "default") result = engine.execute("SELECT * FROM marksheet WHERE mark > 80") # Cloud storage support (S3, Azure, GCS) uc = UnityCatalog( "http://localhost:8080/api/2.1/unity-catalog", storage_options={ "azure_storage_account_name": "myaccount", "azure_storage_account_key": "...", } ) \`\`\` ## New files **SPI layer** (\`lance-graph-catalog\`): - \`catalog_provider.rs\` — \`CatalogProvider\` trait + data types - \`table_reader.rs\` — \`TableReader\` trait with \`storage_options\` for cloud access - \`connector.rs\` — \`Connector\` facade bundling catalog + readers + storage options - \`type_mapping.rs\` — UC type → Arrow type mapping - \`unity_catalog.rs\` — OSS UC REST client **Implementation layer** (\`lance-graph\`): - \`table_readers.rs\` — \`ParquetTableReader\` + \`DeltaTableReader\` (via deltalake 0.29) - \`sql_catalog.rs\` — \`build_context_from_connector()\` bridge to SqlEngine **Python bindings** (\`lance-graph-python\`): - \`catalog.rs\` — UnityCatalog, CatalogInfo, SchemaInfo, TableInfo PyO3 classes ## Test plan - [x] 12 unit tests for UC type → Arrow type mapping - [x] 15 wiremock integration tests for UC REST client - [x] 9 Python unit tests for UnityCatalog class - [x] 6 Python integration tests (require live UC server, skipped in CI) - [x] All existing tests pass unchanged (119 Python + 566 Rust) - [x] README docs updated with UC examples and cloud storage usage --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent af1e4bd commit 82b98a2

25 files changed

Lines changed: 4448 additions & 84 deletions

Cargo.lock

Lines changed: 885 additions & 60 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,65 @@ result = query.execute({"Person": people})
6767
print(result.to_pydict()) # {'name': ['Bob', 'David'], 'age': [34, 42]}
6868
```
6969

70+
## Python example: Direct SQL query
71+
72+
For data analytics workflows where you prefer standard SQL, use `SqlQuery` or `SqlEngine`. No `GraphConfig` is needed:
73+
74+
```python
75+
import pyarrow as pa
76+
from lance_graph import SqlQuery, SqlEngine
77+
78+
person = pa.table({
79+
"id": [1, 2, 3],
80+
"name": ["Alice", "Bob", "Carol"],
81+
"age": [28, 34, 29],
82+
})
83+
84+
# One-off query
85+
result = SqlQuery(
86+
"SELECT name, age FROM person WHERE age > 30"
87+
).execute({"person": person})
88+
print(result.to_pydict()) # {'name': ['Bob'], 'age': [34]}
89+
90+
# Multi-query with cached context
91+
engine = SqlEngine({"person": person})
92+
r1 = engine.execute("SELECT COUNT(*) AS cnt FROM person")
93+
r2 = engine.execute("SELECT name FROM person ORDER BY age DESC LIMIT 2")
94+
```
95+
96+
## Python example: Unity Catalog integration
97+
98+
Connect to [Unity Catalog](https://github.com/unitycatalog/unitycatalog) (OSS) to discover and query Delta Lake or Parquet tables directly:
99+
100+
```python
101+
from lance_graph import UnityCatalog
102+
103+
# Connect to Unity Catalog
104+
uc = UnityCatalog("http://localhost:8080/api/2.1/unity-catalog")
105+
106+
# Browse catalog metadata
107+
catalogs = uc.list_catalogs()
108+
schemas = uc.list_schemas("unity")
109+
tables = uc.list_tables("unity", "default")
110+
table = uc.get_table("unity", "default", "marksheet")
111+
print(table.columns()) # [{"name": "id", "type_name": "INT", ...}, ...]
112+
113+
# Auto-register tables (Delta + Parquet) and query via SQL
114+
engine = uc.create_sql_engine("unity", "default")
115+
result = engine.execute("SELECT * FROM marksheet WHERE mark > 80")
116+
print(result.to_pandas())
117+
118+
# For cloud storage (S3, Azure, GCS), pass storage options:
119+
uc = UnityCatalog(
120+
"http://localhost:8080/api/2.1/unity-catalog",
121+
storage_options={
122+
"aws_access_key_id": "...",
123+
"aws_secret_access_key": "...",
124+
"aws_region": "us-east-1",
125+
}
126+
)
127+
```
128+
70129
## Knowledge Graph CLI & API
71130

72131
The `knowledge_graph` package layers a simple Lance-backed knowledge graph

crates/lance-graph-catalog/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@ arrow-schema = "56.2"
1515
async-trait = "0.1"
1616
datafusion = { version = "50.3", default-features = false }
1717
lance-namespace = "1.0.1"
18+
reqwest = { version = "0.12", features = ["json"], optional = true }
19+
serde = { version = "1", features = ["derive"] }
20+
serde_json = "1"
1821
snafu = "0.8"
1922

23+
[features]
24+
default = ["unity-catalog"]
25+
unity-catalog = ["dep:reqwest"]
26+
2027
[dev-dependencies]
2128
tokio = { version = "1.37", features = ["macros", "rt-multi-thread"] }
29+
wiremock = "0.6"
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Catalog provider trait and data types for external catalog integration.
5+
//!
6+
//! Inspired by Presto's `ConnectorMetadata` SPI, this module defines the
7+
//! abstract interface for browsing external catalogs (Unity Catalog, Hive
8+
//! Metastore, AWS Glue, etc.).
9+
10+
use std::collections::HashMap;
11+
12+
use arrow_schema::SchemaRef;
13+
use async_trait::async_trait;
14+
15+
/// Metadata about a catalog (top-level namespace).
16+
#[derive(Debug, Clone)]
17+
pub struct CatalogInfo {
18+
pub name: String,
19+
pub comment: Option<String>,
20+
pub properties: HashMap<String, String>,
21+
pub created_at: Option<i64>,
22+
pub updated_at: Option<i64>,
23+
}
24+
25+
/// Metadata about a schema (second-level namespace within a catalog).
26+
#[derive(Debug, Clone)]
27+
pub struct SchemaInfo {
28+
pub name: String,
29+
pub catalog_name: String,
30+
pub comment: Option<String>,
31+
pub properties: HashMap<String, String>,
32+
pub created_at: Option<i64>,
33+
pub updated_at: Option<i64>,
34+
}
35+
36+
/// Metadata about a column in a table.
37+
#[derive(Debug, Clone)]
38+
pub struct ColumnInfo {
39+
pub name: String,
40+
/// Human-readable type string (e.g., "INT", "VARCHAR(255)").
41+
pub type_text: String,
42+
/// Canonical type name from the catalog (e.g., "INT", "STRING").
43+
pub type_name: String,
44+
/// Column position (0-based).
45+
pub position: i32,
46+
pub nullable: bool,
47+
pub comment: Option<String>,
48+
}
49+
50+
/// Data format of the underlying storage.
51+
#[derive(Debug, Clone, PartialEq, Eq)]
52+
pub enum DataSourceFormat {
53+
Delta,
54+
Parquet,
55+
Csv,
56+
Json,
57+
Avro,
58+
Orc,
59+
Text,
60+
Other(String),
61+
}
62+
63+
/// Type of table (managed vs external).
64+
#[derive(Debug, Clone, PartialEq, Eq)]
65+
pub enum TableType {
66+
Managed,
67+
External,
68+
}
69+
70+
/// Full table metadata including columns and storage information.
71+
#[derive(Debug, Clone)]
72+
pub struct TableInfo {
73+
pub name: String,
74+
pub catalog_name: String,
75+
pub schema_name: String,
76+
pub table_type: TableType,
77+
pub data_source_format: DataSourceFormat,
78+
pub columns: Vec<ColumnInfo>,
79+
pub storage_location: Option<String>,
80+
pub comment: Option<String>,
81+
pub properties: HashMap<String, String>,
82+
pub created_at: Option<i64>,
83+
pub updated_at: Option<i64>,
84+
}
85+
86+
/// Errors that can occur during catalog operations.
87+
#[derive(Debug)]
88+
pub enum CatalogError {
89+
/// Network or HTTP error.
90+
ConnectionError(String),
91+
/// Resource not found (catalog, schema, or table).
92+
NotFound(String),
93+
/// Authentication or authorization failure.
94+
AuthError(String),
95+
/// Invalid or unparsable response from the catalog server.
96+
InvalidResponse(String),
97+
/// Failed to map a catalog type to an Arrow type.
98+
TypeMappingError(String),
99+
/// Other errors.
100+
Other(String),
101+
}
102+
103+
impl std::fmt::Display for CatalogError {
104+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105+
match self {
106+
Self::ConnectionError(msg) => write!(f, "Catalog connection error: {}", msg),
107+
Self::NotFound(msg) => write!(f, "Not found: {}", msg),
108+
Self::AuthError(msg) => write!(f, "Auth error: {}", msg),
109+
Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
110+
Self::TypeMappingError(msg) => write!(f, "Type mapping error: {}", msg),
111+
Self::Other(msg) => write!(f, "Catalog error: {}", msg),
112+
}
113+
}
114+
}
115+
116+
impl std::error::Error for CatalogError {}
117+
118+
pub type CatalogResult<T> = std::result::Result<T, CatalogError>;
119+
120+
/// Abstract trait for browsing an external catalog.
121+
///
122+
/// Analogous to Presto's `ConnectorMetadata`. Implementations provide access
123+
/// to catalog metadata (catalogs, schemas, tables, columns) without being
124+
/// coupled to any specific data format or storage backend.
125+
///
126+
/// # Extensibility
127+
///
128+
/// Implement this trait to add support for new catalog backends:
129+
/// - Unity Catalog (provided)
130+
/// - Hive Metastore (future)
131+
/// - AWS Glue (future)
132+
/// - Iceberg REST Catalog (future)
133+
#[async_trait]
134+
pub trait CatalogProvider: Send + Sync {
135+
/// Human-readable name of this catalog provider (e.g., "unity-catalog").
136+
fn name(&self) -> &str;
137+
138+
/// List all catalogs available in this provider.
139+
async fn list_catalogs(&self) -> CatalogResult<Vec<CatalogInfo>>;
140+
141+
/// Get information about a specific catalog.
142+
async fn get_catalog(&self, name: &str) -> CatalogResult<CatalogInfo>;
143+
144+
/// List all schemas within a catalog.
145+
async fn list_schemas(&self, catalog_name: &str) -> CatalogResult<Vec<SchemaInfo>>;
146+
147+
/// Get information about a specific schema.
148+
async fn get_schema(&self, catalog_name: &str, schema_name: &str) -> CatalogResult<SchemaInfo>;
149+
150+
/// List all tables within a schema.
151+
async fn list_tables(
152+
&self,
153+
catalog_name: &str,
154+
schema_name: &str,
155+
) -> CatalogResult<Vec<TableInfo>>;
156+
157+
/// Get detailed information about a specific table, including columns.
158+
async fn get_table(
159+
&self,
160+
catalog_name: &str,
161+
schema_name: &str,
162+
table_name: &str,
163+
) -> CatalogResult<TableInfo>;
164+
165+
/// Convert a table's column definitions to an Arrow schema.
166+
///
167+
/// The default implementation uses the standard type mapping from
168+
/// [`crate::type_mapping::columns_to_arrow_schema`].
169+
fn table_to_arrow_schema(&self, table: &TableInfo) -> CatalogResult<SchemaRef> {
170+
crate::type_mapping::columns_to_arrow_schema(&table.columns)
171+
}
172+
}

0 commit comments

Comments
 (0)