Skip to content

Commit b18e75e

Browse files
committed
feat: add ParquetLookupProvider and SqliteLookupProvider behind feature gates
Ports the two storage backends proven in the df-vector-search benchmark POC into the library as optional, feature-gated providers. - `parquet-provider` feature: ParquetLookupProvider — concurrent row-group reads from any ObjectStore (S3, local FS) with pre-cached parquet footers and optional RowSelection for page-skip optimisation - `sqlite-provider` feature: SqliteLookupProvider — B-tree point lookups via a WAL-mode connection pool; builds from parquet on first run - `keys` module (always compiled): pack_key / unpack_key / DatasetLayout — shared key encoding utilities extracted from indexing.rs - Integration tests for both providers (9 tests total) Breaking change in Cargo.toml: tokio gains the "sync" feature unconditionally (needed by SqliteLookupProvider's Semaphore; tokio was already a hard dep).
1 parent 4d685a8 commit b18e75e

8 files changed

Lines changed: 1578 additions & 104 deletions

File tree

Cargo.lock

Lines changed: 163 additions & 96 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,30 @@ edition = "2024"
55
description = "DataFusion extension for USearch HNSW vector similarity search with adaptive WHERE clause filtering"
66
license = "MIT OR Apache-2.0"
77

8+
[features]
9+
parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"]
10+
sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"]
11+
812
[dependencies]
9-
tracing = "0.1"
10-
datafusion = "52.2.0"
11-
usearch = "2.24.0"
12-
arrow-array = "57.2.0"
13+
tracing = "0.1"
14+
datafusion = "52.2.0"
15+
usearch = "2.24.0"
16+
arrow-array = "57.2.0"
1317
arrow-schema = "57.2.0"
14-
async-trait = "0.1"
15-
futures = "0.3"
16-
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
18+
async-trait = "0.1"
19+
futures = "0.3"
20+
# "sync" adds tokio::sync::Semaphore, used by SqliteLookupProvider's connection pool
21+
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] }
22+
23+
# parquet-provider
24+
parquet = { version = "57.2.0", optional = true, features = ["async", "object_store"] }
25+
object_store = { version = "0.12", optional = true }
26+
bytes = { version = "1", optional = true }
27+
28+
# sqlite-provider
29+
rusqlite = { version = "0.32", optional = true, features = ["bundled"] }
30+
serde_json = { version = "1", optional = true }
1731

1832
[dev-dependencies]
19-
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
33+
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
34+
tempfile = "3"

src/keys.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// keys.rs — Key encoding utilities for packed row addresses.
2+
//
3+
// Bit layout:
4+
// bits 63-48 (16 bits) file_idx — which parquet file (up to 65,536)
5+
// bits 47-32 (16 bits) rg_idx — which row group (up to 65,536)
6+
// bits 31-0 (32 bits) local_offset — row offset within row group (up to 4 B)
7+
8+
/// Pack a physical row address into a single `u64` key for use with USearch
9+
/// and the lookup providers.
10+
#[inline]
11+
pub fn pack_key(file_idx: usize, rg_idx: usize, local_offset: usize) -> u64 {
12+
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)
13+
}
14+
15+
/// Unpack a `u64` key back to `(file_idx, rg_idx, local_offset)`.
16+
#[inline]
17+
pub fn unpack_key(key: u64) -> (usize, usize, usize) {
18+
let file_idx = (key >> 48) as usize;
19+
let rg_idx = ((key >> 32) & 0xFFFF) as usize;
20+
let local_offset = (key & 0xFFFF_FFFF) as usize;
21+
(file_idx, rg_idx, local_offset)
22+
}
23+
24+
/// Physical layout of a sharded parquet dataset.
25+
///
26+
/// Computed once at startup from parquet file footers (reads only the last few
27+
/// KB of each file). Not persisted to disk.
28+
pub struct DatasetLayout {
29+
/// Object-store path (or S3 key) for each file, indexed by `file_idx`.
30+
pub file_keys: Vec<String>,
31+
/// Cumulative row counts at the start of each file: `file_cum_rows[i]` is
32+
/// the total number of rows in files 0..i. `file_cum_rows[n_files]` is the
33+
/// total row count of the dataset.
34+
pub file_cum_rows: Vec<u64>,
35+
/// For each file, cumulative row count at the start of each row group:
36+
/// `rg_cum_rows[file][rg]` = rows in row groups 0..rg within that file.
37+
pub rg_cum_rows: Vec<Vec<u64>>,
38+
}
39+
40+
impl DatasetLayout {
41+
/// Convert a packed usearch key back to a global (dataset-wide) row index.
42+
#[inline]
43+
pub fn packed_key_to_global(&self, key: u64) -> u64 {
44+
let (file_idx, rg_idx, local_offset) = unpack_key(key);
45+
self.file_cum_rows[file_idx] + self.rg_cum_rows[file_idx][rg_idx] + local_offset as u64
46+
}
47+
48+
/// Scan parquet footers to build the layout. No vector data is read.
49+
///
50+
/// Only compiled when the `parquet-provider` or `sqlite-provider` feature is enabled.
51+
#[cfg(any(feature = "parquet-provider", feature = "sqlite-provider"))]
52+
pub fn from_files(local_paths: &[&str]) -> datafusion::common::Result<Self> {
53+
use datafusion::error::DataFusionError;
54+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
55+
use std::fs;
56+
use std::path::Path;
57+
58+
let mut file_keys = Vec::with_capacity(local_paths.len());
59+
let mut file_cum_rows = vec![0u64];
60+
let mut rg_cum_rows: Vec<Vec<u64>> = Vec::with_capacity(local_paths.len());
61+
62+
let mut running_total = 0u64;
63+
for &path in local_paths {
64+
let file_name = Path::new(path)
65+
.file_name()
66+
.and_then(|n| n.to_str())
67+
.ok_or_else(|| DataFusionError::Execution(format!("invalid path: {path}")))?;
68+
file_keys.push(format!("parquet/{file_name}"));
69+
70+
let f = fs::File::open(path)
71+
.map_err(|e| DataFusionError::Execution(format!("open {path}: {e}")))?;
72+
let builder = ParquetRecordBatchReaderBuilder::try_new(f)
73+
.map_err(|e| DataFusionError::Execution(format!("read footer {path}: {e}")))?;
74+
let meta = builder.metadata();
75+
76+
let mut rg_cum = vec![0u64];
77+
let mut file_rows = 0u64;
78+
for rg in 0..meta.num_row_groups() {
79+
let n = meta.row_group(rg).num_rows() as u64;
80+
file_rows += n;
81+
rg_cum.push(file_rows);
82+
}
83+
rg_cum_rows.push(rg_cum);
84+
running_total += file_rows;
85+
file_cum_rows.push(running_total);
86+
}
87+
88+
Ok(Self {
89+
file_keys,
90+
file_cum_rows,
91+
rg_cum_rows,
92+
})
93+
}
94+
}

src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
//! LIMIT 10
5757
//! ```
5858
59+
pub mod keys;
5960
pub mod lookup;
6061
pub mod node;
6162
pub mod planner;
@@ -64,6 +65,12 @@ pub mod rule;
6465
pub mod udf;
6566
pub mod udtf;
6667

68+
#[cfg(feature = "parquet-provider")]
69+
pub mod parquet_provider;
70+
#[cfg(feature = "sqlite-provider")]
71+
pub mod sqlite_provider;
72+
73+
pub use keys::{DatasetLayout, pack_key, unpack_key};
6774
pub use lookup::{HashKeyProvider, PointLookupProvider};
6875
pub use node::{DistanceType, USearchNode};
6976
pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner};
@@ -72,6 +79,11 @@ pub use rule::USearchRule;
7279
pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf};
7380
pub use udtf::USearchUDTF;
7481

82+
#[cfg(feature = "parquet-provider")]
83+
pub use parquet_provider::ParquetLookupProvider;
84+
#[cfg(feature = "sqlite-provider")]
85+
pub use sqlite_provider::SqliteLookupProvider;
86+
7587
use std::sync::Arc;
7688

7789
use datafusion::common::Result;

0 commit comments

Comments
 (0)