Skip to content

Commit 285e87c

Browse files
authored
feat: add ParquetLookupProvider and SqliteLookupProvider behind feature gates (#3)
* feat: add ParquetLookupProvider and SqliteLookupProvider behind feature gates Ports the two storage backends proven in the df-vector-search benchmark POC into the library as optional, feature-gated providers. - `parquet-provider` feature: ParquetLookupProvider — concurrent row-group reads from any ObjectStore (S3, local FS) with pre-cached parquet footers and optional RowSelection for page-skip optimisation - `sqlite-provider` feature: SqliteLookupProvider — B-tree point lookups via a WAL-mode connection pool; builds from parquet on first run - `keys` module (always compiled): pack_key / unpack_key / DatasetLayout — shared key encoding utilities extracted from indexing.rs - Integration tests for both providers (9 tests total) Breaking change in Cargo.toml: tokio gains the "sync" feature unconditionally (needed by SqliteLookupProvider's Semaphore; tokio was already a hard dep). * fix: address P1/P2 review comments P1 — sqlite_provider: add ConnGuard to return connection to pool on panic, preventing permanent pool shrinkage and cascading failures. P1 — parquet_provider: replace unchecked file_keys[file_idx] / metadata_cache[file_idx] with bounds-checked .get() calls that return DataFusionError instead of panicking on stale or mismatched keys. P2 — parquet_provider: replace .expect("column mismatch") in projection mapping with a proper ? returning DataFusionError::Execution. P2 — sqlite_provider: replace println! with tracing::info! throughout; library code should not write to stdout directly. * fix: address second round of P1/P2 review comments P1 — keys: add debug_assert! on pack_key inputs to catch out-of-range file_idx/rg_idx (>= 65536) or local_offset (>= 2^32) in debug builds instead of silently producing a wrong key. P1 — sqlite_provider: add quote_ident() helper that doubles embedded double-quotes before interpolating table_name into SQL, preventing SQL injection via a crafted table name. Applied to all four SQL format strings (SELECT COUNT, SELECT *, CREATE TABLE, INSERT INTO). P2 — sqlite_provider: validate pool_size >= 1 at construction time and return DataFusionError instead of allowing an unusable provider. Replace pool.lock().unwrap().pop().unwrap() with explicit map_err / ok_or_else so mutex poison and empty-pool are surfaced as errors. * fix: quote column names in DDL, guard empty col_bufs * fix: address bot review comments — debug_assert overflow, projection reorder, UInt64 note - keys.rs: use u32::MAX as usize instead of (1 << 32) to avoid overflow on 32-bit targets - parquet_provider.rs: reorder filtered columns from parquet-schema order back to idxs order after ProjectionMask::roots - sqlite_provider.rs: add comment documenting UInt64 > i64::MAX wrap-around behaviour when storing to SQLite INTEGER * fix(scan): return NotImplemented instead of silent empty MemTable Both providers scan() returned empty MemTable, causing full-table SQL queries to silently produce zero rows. Return NotImplemented so the limitation is explicit — these providers are fetch_by_keys only. Removes the now-unused MemTable import from both files. * test: add regression tests for bugs found in PR review - keys.rs: unit tests for pack/unpack roundtrip, boundary values, and debug_assert panics for out-of-range file_idx and local_offset - parquet: test projection with non-monotonic column order (the P1 bug where columns were silently swapped without the reorder fix) - parquet: test stale file_idx returns Err not panic - parquet + sqlite: test scan() returns NotImplemented (not silent empty) - sqlite: test table name with spaces works via quote_ident * fix(keys): promote debug_assert to assert in pack_key All three range checks (file_idx, rg_idx, local_offset) now fire in release builds. overflow silently produces a key pointing to the wrong row with no downstream error — a hard panic is the right behaviour. Also removes the now-incorrect #[cfg(debug_assertions)] guards from the corresponding should_panic tests. * fix(sqlite): atomic build, Float32/64 read, SELECT projection - Wrap CREATE TABLE inside the INSERT transaction so a mid-build crash cannot leave an empty table that open_or_build silently accepts on the next startup. - Add missing Float32 and Float64 arms to sql_values_to_arrow; these types were written correctly by arrow_cell_to_sql but read back via the unsupported-type error path, breaking any schema with float cols. - Replace SELECT * with an explicit column list built from out_schema so only projected columns are fetched from SQLite. Also removes the now-dead projected_indices helper. * feat(keys): add key_prefix param to DatasetLayout::from_files The hardcoded "parquet/" prefix created silent coupling between from_files and the ObjectStore root — a mismatch caused all lookups to 404 with no clear error. key_prefix is now explicit: callers pass "" for a flat layout, "parquet/" for a parquet/ subdirectory, or any S3 prefix needed to match their object store configuration. * ci: trigger re-review
1 parent 4d685a8 commit 285e87c

8 files changed

Lines changed: 1943 additions & 104 deletions

File tree

Cargo.lock

Lines changed: 163 additions & 96 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,30 @@ edition = "2024"
55
description = "DataFusion extension for USearch HNSW vector similarity search with adaptive WHERE clause filtering"
66
license = "MIT OR Apache-2.0"
77

8+
[features]
9+
parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"]
10+
sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"]
11+
812
[dependencies]
9-
tracing = "0.1"
10-
datafusion = "52.2.0"
11-
usearch = "2.24.0"
12-
arrow-array = "57.2.0"
13+
tracing = "0.1"
14+
datafusion = "52.2.0"
15+
usearch = "2.24.0"
16+
arrow-array = "57.2.0"
1317
arrow-schema = "57.2.0"
14-
async-trait = "0.1"
15-
futures = "0.3"
16-
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
18+
async-trait = "0.1"
19+
futures = "0.3"
20+
# "sync" adds tokio::sync::Semaphore, used by SqliteLookupProvider's connection pool
21+
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] }
22+
23+
# parquet-provider
24+
parquet = { version = "57.2.0", optional = true, features = ["async", "object_store"] }
25+
object_store = { version = "0.12", optional = true }
26+
bytes = { version = "1", optional = true }
27+
28+
# sqlite-provider
29+
rusqlite = { version = "0.32", optional = true, features = ["bundled"] }
30+
serde_json = { version = "1", optional = true }
1731

1832
[dev-dependencies]
19-
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
33+
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
34+
tempfile = "3"

src/keys.rs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// keys.rs — Key encoding utilities for packed row addresses.
2+
//
3+
// Bit layout:
4+
// bits 63-48 (16 bits) file_idx — which parquet file (up to 65,536)
5+
// bits 47-32 (16 bits) rg_idx — which row group (up to 65,536)
6+
// bits 31-0 (32 bits) local_offset — row offset within row group (up to 4 B)
7+
8+
/// Pack a physical row address into a single `u64` key for use with USearch
9+
/// and the lookup providers.
10+
///
11+
/// # Panics
12+
/// Panics if any component exceeds its allocated bit range:
13+
/// `file_idx` < 65 536, `rg_idx` < 65 536, `local_offset` ≤ 4 294 967 295.
14+
#[inline]
15+
pub fn pack_key(file_idx: usize, rg_idx: usize, local_offset: usize) -> u64 {
16+
assert!(
17+
file_idx < (1 << 16),
18+
"file_idx {file_idx} overflows 16 bits"
19+
);
20+
assert!(rg_idx < (1 << 16), "rg_idx {rg_idx} overflows 16 bits");
21+
assert!(
22+
local_offset <= u32::MAX as usize,
23+
"local_offset {local_offset} overflows 32 bits"
24+
);
25+
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)
26+
}
27+
28+
/// Unpack a `u64` key back to `(file_idx, rg_idx, local_offset)`.
29+
#[inline]
30+
pub fn unpack_key(key: u64) -> (usize, usize, usize) {
31+
let file_idx = (key >> 48) as usize;
32+
let rg_idx = ((key >> 32) & 0xFFFF) as usize;
33+
let local_offset = (key & 0xFFFF_FFFF) as usize;
34+
(file_idx, rg_idx, local_offset)
35+
}
36+
37+
/// Physical layout of a sharded parquet dataset.
38+
///
39+
/// Computed once at startup from parquet file footers (reads only the last few
40+
/// KB of each file). Not persisted to disk.
41+
pub struct DatasetLayout {
42+
/// Object-store path (or S3 key) for each file, indexed by `file_idx`.
43+
pub file_keys: Vec<String>,
44+
/// Cumulative row counts at the start of each file: `file_cum_rows[i]` is
45+
/// the total number of rows in files 0..i. `file_cum_rows[n_files]` is the
46+
/// total row count of the dataset.
47+
pub file_cum_rows: Vec<u64>,
48+
/// For each file, cumulative row count at the start of each row group:
49+
/// `rg_cum_rows[file][rg]` = rows in row groups 0..rg within that file.
50+
pub rg_cum_rows: Vec<Vec<u64>>,
51+
}
52+
53+
impl DatasetLayout {
54+
/// Convert a packed usearch key back to a global (dataset-wide) row index.
55+
#[inline]
56+
pub fn packed_key_to_global(&self, key: u64) -> u64 {
57+
let (file_idx, rg_idx, local_offset) = unpack_key(key);
58+
self.file_cum_rows[file_idx] + self.rg_cum_rows[file_idx][rg_idx] + local_offset as u64
59+
}
60+
61+
/// Scan parquet footers to build the layout. No vector data is read.
62+
///
63+
/// `key_prefix` is prepended to each bare filename when storing the
64+
/// object-store path in `file_keys`. It must match the prefix used when
65+
/// constructing the `ObjectStore` passed to `ParquetLookupProvider`, so
66+
/// that `store.get("{key_prefix}{filename}")` resolves to the correct
67+
/// object at query time.
68+
///
69+
/// # Examples
70+
/// ```text
71+
/// // Files at /data/shard_00.parquet, store rooted at /data
72+
/// DatasetLayout::from_files(&["/data/shard_00.parquet"], "")
73+
///
74+
/// // Files under a parquet/ subdirectory, store rooted at /data
75+
/// DatasetLayout::from_files(&["/data/parquet/shard_00.parquet"], "parquet/")
76+
///
77+
/// // Local footer reads, but keys point at S3 prefix
78+
/// DatasetLayout::from_files(&["/local/cache/shard_00.parquet"], "year=2024/")
79+
/// ```
80+
///
81+
/// Only compiled when the `parquet-provider` or `sqlite-provider` feature is enabled.
82+
#[cfg(any(feature = "parquet-provider", feature = "sqlite-provider"))]
83+
pub fn from_files(local_paths: &[&str], key_prefix: &str) -> datafusion::common::Result<Self> {
84+
use datafusion::error::DataFusionError;
85+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
86+
use std::fs;
87+
use std::path::Path;
88+
89+
let mut file_keys = Vec::with_capacity(local_paths.len());
90+
let mut file_cum_rows = vec![0u64];
91+
let mut rg_cum_rows: Vec<Vec<u64>> = Vec::with_capacity(local_paths.len());
92+
93+
let mut running_total = 0u64;
94+
for &path in local_paths {
95+
let file_name = Path::new(path)
96+
.file_name()
97+
.and_then(|n| n.to_str())
98+
.ok_or_else(|| DataFusionError::Execution(format!("invalid path: {path}")))?;
99+
file_keys.push(format!("{key_prefix}{file_name}"));
100+
101+
let f = fs::File::open(path)
102+
.map_err(|e| DataFusionError::Execution(format!("open {path}: {e}")))?;
103+
let builder = ParquetRecordBatchReaderBuilder::try_new(f)
104+
.map_err(|e| DataFusionError::Execution(format!("read footer {path}: {e}")))?;
105+
let meta = builder.metadata();
106+
107+
let mut rg_cum = vec![0u64];
108+
let mut file_rows = 0u64;
109+
for rg in 0..meta.num_row_groups() {
110+
let n = meta.row_group(rg).num_rows() as u64;
111+
file_rows += n;
112+
rg_cum.push(file_rows);
113+
}
114+
rg_cum_rows.push(rg_cum);
115+
running_total += file_rows;
116+
file_cum_rows.push(running_total);
117+
}
118+
119+
Ok(Self {
120+
file_keys,
121+
file_cum_rows,
122+
rg_cum_rows,
123+
})
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::*;
130+
131+
#[test]
132+
fn test_pack_unpack_roundtrip() {
133+
let cases = [
134+
(0, 0, 0),
135+
(1, 2, 3),
136+
(65535, 65535, u32::MAX as usize),
137+
(0, 0, u32::MAX as usize),
138+
];
139+
for (fi, rg, lo) in cases {
140+
let key = pack_key(fi, rg, lo);
141+
assert_eq!(unpack_key(key), (fi, rg, lo));
142+
}
143+
}
144+
145+
#[test]
146+
fn test_pack_key_boundary_values() {
147+
// u32::MAX as local_offset is within range and must round-trip cleanly.
148+
let key = pack_key(0, 0, u32::MAX as usize);
149+
let (_, _, lo) = unpack_key(key);
150+
assert_eq!(lo, u32::MAX as usize);
151+
}
152+
153+
#[test]
154+
#[should_panic(expected = "overflows 32 bits")]
155+
fn test_pack_key_local_offset_overflow_panics() {
156+
pack_key(0, 0, u32::MAX as usize + 1);
157+
}
158+
159+
#[test]
160+
#[should_panic(expected = "overflows 16 bits")]
161+
fn test_pack_key_file_idx_overflow_panics() {
162+
pack_key(1 << 16, 0, 0);
163+
}
164+
}

src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
//! LIMIT 10
5757
//! ```
5858
59+
pub mod keys;
5960
pub mod lookup;
6061
pub mod node;
6162
pub mod planner;
@@ -64,6 +65,12 @@ pub mod rule;
6465
pub mod udf;
6566
pub mod udtf;
6667

68+
#[cfg(feature = "parquet-provider")]
69+
pub mod parquet_provider;
70+
#[cfg(feature = "sqlite-provider")]
71+
pub mod sqlite_provider;
72+
73+
pub use keys::{DatasetLayout, pack_key, unpack_key};
6774
pub use lookup::{HashKeyProvider, PointLookupProvider};
6875
pub use node::{DistanceType, USearchNode};
6976
pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner};
@@ -72,6 +79,11 @@ pub use rule::USearchRule;
7279
pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf};
7380
pub use udtf::USearchUDTF;
7481

82+
#[cfg(feature = "parquet-provider")]
83+
pub use parquet_provider::ParquetLookupProvider;
84+
#[cfg(feature = "sqlite-provider")]
85+
pub use sqlite_provider::SqliteLookupProvider;
86+
7587
use std::sync::Arc;
7688

7789
use datafusion::common::Result;

0 commit comments

Comments
 (0)