Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 163 additions & 96 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 23 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,30 @@ edition = "2024"
description = "DataFusion extension for USearch HNSW vector similarity search with adaptive WHERE clause filtering"
license = "MIT OR Apache-2.0"

[features]
parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"]
sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"]

[dependencies]
tracing = "0.1"
datafusion = "52.2.0"
usearch = "2.24.0"
arrow-array = "57.2.0"
tracing = "0.1"
datafusion = "52.2.0"
usearch = "2.24.0"
arrow-array = "57.2.0"
arrow-schema = "57.2.0"
async-trait = "0.1"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
async-trait = "0.1"
futures = "0.3"
# "sync" adds tokio::sync::Semaphore, used by SqliteLookupProvider's connection pool
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] }

# parquet-provider
parquet = { version = "57.2.0", optional = true, features = ["async", "object_store"] }
object_store = { version = "0.12", optional = true }
bytes = { version = "1", optional = true }

# sqlite-provider
rusqlite = { version = "0.32", optional = true, features = ["bundled"] }
serde_json = { version = "1", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tempfile = "3"
164 changes: 164 additions & 0 deletions src/keys.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// keys.rs — Key encoding utilities for packed row addresses.
//
// Bit layout:
// bits 63-48 (16 bits) file_idx — which parquet file (up to 65,536)
// bits 47-32 (16 bits) rg_idx — which row group (up to 65,536)
// bits 31-0 (32 bits) local_offset — row offset within row group (up to 4 B)

/// Pack a physical row address into a single `u64` key for use with USearch
/// and the lookup providers.
///
/// # Panics
/// Panics if any component exceeds its allocated bit range:
/// `file_idx` < 65 536, `rg_idx` < 65 536, `local_offset` ≤ 4 294 967 295.
#[inline]
pub fn pack_key(file_idx: usize, rg_idx: usize, local_offset: usize) -> u64 {
assert!(
file_idx < (1 << 16),
"file_idx {file_idx} overflows 16 bits"
);
assert!(rg_idx < (1 << 16), "rg_idx {rg_idx} overflows 16 bits");
assert!(
local_offset <= u32::MAX as usize,
"local_offset {local_offset} overflows 32 bits"
);
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Silent data corruption on out-of-range inputs.

file_idx is silently masked to 16 bits, rg_idx to 16 bits, and local_offset to 32 bits. If any of these exceed their encoded range (file_idx/rg_idx ≥ 65 536, local_offset ≥ 4 294 967 296), the packed key is wrong and the lookup returns the wrong row with no error or warning.

These aren't just theoretical: a dataset with a large single file can easily produce local_offset values in the millions — which is fine, but rg_idx grows per-file, so it's realistic for rg_idx to exceed 65 535 in a heavily-sharded dataset.

Suggest adding debug-mode assertions (or unconditional checks returning Err) before encoding:

Suggested change
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)
debug_assert!(file_idx < (1 << 16), "file_idx {file_idx} overflows 16 bits");
debug_assert!(rg_idx < (1 << 16), "rg_idx {rg_idx} overflows 16 bits");
debug_assert!(local_offset < (1 << 32), "local_offset {local_offset} overflows 32 bits");
((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64)

}

/// Unpack a `u64` key back to `(file_idx, rg_idx, local_offset)`.
#[inline]
pub fn unpack_key(key: u64) -> (usize, usize, usize) {
let file_idx = (key >> 48) as usize;
let rg_idx = ((key >> 32) & 0xFFFF) as usize;
let local_offset = (key & 0xFFFF_FFFF) as usize;
(file_idx, rg_idx, local_offset)
}

/// Physical layout of a sharded parquet dataset.
///
/// Computed once at startup from parquet file footers (reads only the last few
/// KB of each file). Not persisted to disk.
pub struct DatasetLayout {
/// Object-store path (or S3 key) for each file, indexed by `file_idx`.
pub file_keys: Vec<String>,
/// Cumulative row counts at the start of each file: `file_cum_rows[i]` is
/// the total number of rows in files 0..i. `file_cum_rows[n_files]` is the
/// total row count of the dataset.
pub file_cum_rows: Vec<u64>,
/// For each file, cumulative row count at the start of each row group:
/// `rg_cum_rows[file][rg]` = rows in row groups 0..rg within that file.
pub rg_cum_rows: Vec<Vec<u64>>,
}

impl DatasetLayout {
/// Convert a packed usearch key back to a global (dataset-wide) row index.
#[inline]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 suggestion: This function is public and indexes into file_cum_rows / rg_cum_rows without bounds checking. A stale or mismatched key (e.g. from an index built against a different dataset version) will panic rather than surface an actionable error. Consider returning Option<u64> and using get instead of direct indexing:

Suggested change
#[inline]
pub fn packed_key_to_global(&self, key: u64) -> Option<u64> {
let (file_idx, rg_idx, local_offset) = unpack_key(key);
let file_base = self.file_cum_rows.get(file_idx)?;
let rg_base = self.rg_cum_rows.get(file_idx)?.get(rg_idx)?;
Some(file_base + rg_base + local_offset as u64)

At minimum, the doc-comment should document the panic conditions the same way pack_key does.

pub fn packed_key_to_global(&self, key: u64) -> u64 {
let (file_idx, rg_idx, local_offset) = unpack_key(key);
self.file_cum_rows[file_idx] + self.rg_cum_rows[file_idx][rg_idx] + local_offset as u64
}

/// Scan parquet footers to build the layout. No vector data is read.
///
/// `key_prefix` is prepended to each bare filename when storing the
/// object-store path in `file_keys`. It must match the prefix used when
/// constructing the `ObjectStore` passed to `ParquetLookupProvider`, so
/// that `store.get("{key_prefix}{filename}")` resolves to the correct
/// object at query time.
///
/// # Examples
/// ```text
/// // Files at /data/shard_00.parquet, store rooted at /data
/// DatasetLayout::from_files(&["/data/shard_00.parquet"], "")
///
/// // Files under a parquet/ subdirectory, store rooted at /data
/// DatasetLayout::from_files(&["/data/parquet/shard_00.parquet"], "parquet/")
///
/// // Local footer reads, but keys point at S3 prefix
/// DatasetLayout::from_files(&["/local/cache/shard_00.parquet"], "year=2024/")
/// ```
///
/// Only compiled when the `parquet-provider` or `sqlite-provider` feature is enabled.
#[cfg(any(feature = "parquet-provider", feature = "sqlite-provider"))]
pub fn from_files(local_paths: &[&str], key_prefix: &str) -> datafusion::common::Result<Self> {
use datafusion::error::DataFusionError;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs;
use std::path::Path;

let mut file_keys = Vec::with_capacity(local_paths.len());
let mut file_cum_rows = vec![0u64];
let mut rg_cum_rows: Vec<Vec<u64>> = Vec::with_capacity(local_paths.len());

let mut running_total = 0u64;
for &path in local_paths {
let file_name = Path::new(path)
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| DataFusionError::Execution(format!("invalid path: {path}")))?;
file_keys.push(format!("{key_prefix}{file_name}"));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — hardcoded parquet/ prefix

This prefix is baked in at layout-build time, so the file_keys stored in the layout (and later passed to ParquetLookupProvider) must be relative to a store rooted one level above a parquet/ directory. That implicit coupling isn't obvious to callers.

Consider accepting a configurable prefix parameter, or using just the filename and letting the caller control the store root:

Suggested change
file_keys.push(file_name.to_string());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Hardcoded "parquet/" prefix creates undocumented coupling

from_files always stores keys as parquet/<filename>, which means the caller's ObjectStore must be rooted one level above a parquet/ directory (or mounted with a matching prefix). Nothing in the type signature or docs enforces this.

Either accept the prefix as a parameter, or document the convention and its implications for LocalFileSystem vs S3 usage.

let f = fs::File::open(path)
.map_err(|e| DataFusionError::Execution(format!("open {path}: {e}")))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(f)
.map_err(|e| DataFusionError::Execution(format!("read footer {path}: {e}")))?;
let meta = builder.metadata();

let mut rg_cum = vec![0u64];
let mut file_rows = 0u64;
for rg in 0..meta.num_row_groups() {
let n = meta.row_group(rg).num_rows() as u64;
file_rows += n;
rg_cum.push(file_rows);
}
rg_cum_rows.push(rg_cum);
running_total += file_rows;
file_cum_rows.push(running_total);
}

Ok(Self {
file_keys,
file_cum_rows,
rg_cum_rows,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_pack_unpack_roundtrip() {
let cases = [
(0, 0, 0),
(1, 2, 3),
(65535, 65535, u32::MAX as usize),
(0, 0, u32::MAX as usize),
];
for (fi, rg, lo) in cases {
let key = pack_key(fi, rg, lo);
assert_eq!(unpack_key(key), (fi, rg, lo));
}
}

#[test]
fn test_pack_key_boundary_values() {
// u32::MAX as local_offset is within range and must round-trip cleanly.
let key = pack_key(0, 0, u32::MAX as usize);
let (_, _, lo) = unpack_key(key);
assert_eq!(lo, u32::MAX as usize);
}

#[test]
#[should_panic(expected = "overflows 32 bits")]
fn test_pack_key_local_offset_overflow_panics() {
pack_key(0, 0, u32::MAX as usize + 1);
}

#[test]
#[should_panic(expected = "overflows 16 bits")]
fn test_pack_key_file_idx_overflow_panics() {
pack_key(1 << 16, 0, 0);
}
}
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
//! LIMIT 10
//! ```

pub mod keys;
pub mod lookup;
pub mod node;
pub mod planner;
Expand All @@ -64,6 +65,12 @@ pub mod rule;
pub mod udf;
pub mod udtf;

#[cfg(feature = "parquet-provider")]
pub mod parquet_provider;
#[cfg(feature = "sqlite-provider")]
pub mod sqlite_provider;

pub use keys::{DatasetLayout, pack_key, unpack_key};
pub use lookup::{HashKeyProvider, PointLookupProvider};
pub use node::{DistanceType, USearchNode};
pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner};
Expand All @@ -72,6 +79,11 @@ pub use rule::USearchRule;
pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf};
pub use udtf::USearchUDTF;

#[cfg(feature = "parquet-provider")]
pub use parquet_provider::ParquetLookupProvider;
#[cfg(feature = "sqlite-provider")]
pub use sqlite_provider::SqliteLookupProvider;

use std::sync::Arc;

use datafusion::common::Result;
Expand Down
Loading
Loading