Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ pub use keys::{DatasetLayout, pack_key, unpack_key};
pub use lookup::{HashKeyProvider, PointLookupProvider};
pub use node::{DistanceType, USearchNode};
pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner};
pub use registry::{RegisteredTable, USearchIndexConfig, USearchRegistry, USearchTableConfig};
pub use registry::{
RegisteredTable, USearchIndexConfig, USearchRegistry, USearchTableConfig, VectorIndexMeta,
VectorIndexResolver,
};
pub use rule::USearchRule;
pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf};
pub use udtf::USearchUDTF;
Expand All @@ -97,16 +100,19 @@ use datafusion::prelude::SessionContext;
/// - `cosine_distance(col, query)` — cosine distance
/// - `negative_dot_product(col, query)` — negated inner product
/// - `vector_usearch(table, query, k)` — explicit ANN table function
/// (cache-only for async-backed resolvers; does not trigger async loads)
/// - [`USearchRule`] — optimizer rewrite rule
///
/// The [`USearchQueryPlanner`] must be installed at `SessionState` build time
/// (before this call) via `SessionStateBuilder::with_query_planner`.
pub fn register_all(ctx: &SessionContext, registry: Arc<USearchRegistry>) -> Result<()> {
pub fn register_all(ctx: &SessionContext, registry: Arc<dyn VectorIndexResolver>) -> Result<()> {
ctx.register_udf(ScalarUDF::new_from_impl(l2_distance_udf()));
ctx.register_udf(ScalarUDF::new_from_impl(cosine_distance_udf()));
ctx.register_udf(ScalarUDF::new_from_impl(negative_dot_product_udf()));
ctx.register_udtf(
"vector_usearch",
// `vector_usearch()` is synchronous and therefore cache-only for
// async-backed resolvers.
Arc::new(USearchUDTF::new(registry.clone())),
);
ctx.add_optimizer_rule(Arc::new(USearchRule::new(registry)));
Expand Down
27 changes: 16 additions & 11 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use datafusion::logical_expr::Expr;

use crate::lookup::extract_keys_as_u64;
use crate::node::{DistanceType, USearchNode};
use crate::registry::USearchRegistry;
use crate::registry::VectorIndexResolver;

/// Strip table qualifiers from column references so expressions can be
/// resolved against an unqualified Arrow schema. Mirrors the pattern in
Expand Down Expand Up @@ -92,7 +92,7 @@ impl fmt::Debug for USearchQueryPlanner {
}

impl USearchQueryPlanner {
pub fn new(registry: Arc<USearchRegistry>) -> Self {
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
let inner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
USearchExecPlanner::new(registry),
)]);
Expand All @@ -116,11 +116,11 @@ impl QueryPlanner for USearchQueryPlanner {
// ── Extension planner ─────────────────────────────────────────────────────────

pub struct USearchExecPlanner {
registry: Arc<USearchRegistry>,
registry: Arc<dyn VectorIndexResolver>,
}

impl USearchExecPlanner {
pub fn new(registry: Arc<USearchRegistry>) -> Self {
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
Self { registry }
}
}
Expand All @@ -140,12 +140,14 @@ impl ExtensionPlanner for USearchExecPlanner {
None => return Ok(None),
};

// Cheap validation: RwLock read + HashMap lookup — no I/O.
let registered = match self.registry.get(&node.table_name) {
// Async: ensure the index is loaded (downloads on cache miss).
self.registry.ensure_loaded(&node.table_name).await?;

let registered = match self.registry.resolve(&node.table_name) {
Some(r) => r,
None => {
return Err(DataFusionError::Execution(format!(
"USearchExecPlanner: table '{}' not in registry",
"USearchExecPlanner: table '{}' not available after ensure_loaded",
node.table_name
)));
}
Expand Down Expand Up @@ -243,7 +245,7 @@ impl ExtensionPlanner for USearchExecPlanner {
#[derive(Debug, Clone)]
struct SearchParams {
table_name: String,
registry: Arc<USearchRegistry>,
registry: Arc<dyn VectorIndexResolver>,
query_vec: Vec<f64>,
k: usize,
distance_type: DistanceType,
Expand Down Expand Up @@ -359,10 +361,13 @@ async fn usearch_execute(
params: SearchParams,
task_ctx: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
// Re-fetch at execute time so cache eviction between plan and execute is handled correctly.
let registered = params.registry.get(&params.table_name).ok_or_else(|| {
// Re-fetch at execute time and reload on cache miss so eviction between
// plan and execute is handled correctly for async-backed resolvers.
params.registry.ensure_loaded(&params.table_name).await?;

let registered = params.registry.resolve(&params.table_name).ok_or_else(|| {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve() is called without a preceding ensure_loaded(). For async-backed resolvers that support cache eviction, the index will be gone here and the query fails — eviction is not actually handled, the error message is just friendlier.

Fix: call ensure_loaded first:

params.registry.ensure_loaded(&params.table_name).await?;
let registered = params.registry.resolve(&params.table_name).ok_or_else(|| { ... })?;

Also update the comment on line 364 — "handled correctly" implies the index is reloaded on eviction, which it is not without this call.

DataFusionError::Execution(format!(
"USearchExec: table '{}' not in registry at execute time",
"USearchExec: table '{}' not available after ensure_loaded at execute time",
params.table_name
))
})?;
Expand Down
69 changes: 69 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,48 @@ pub struct RegisteredTable {
pub config: USearchTableConfig,
}

// ── VectorIndexResolver ───────────────────────────────────────────────────────

/// Lightweight metadata returned by the sync `peek` check.
/// Contains enough info for the optimizer to validate the ANN rewrite
/// and build the logical plan node, without needing the loaded index objects.
#[derive(Debug, Clone)]
pub struct VectorIndexMeta {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalar_kind, key_col, and config are not read by the only caller of peek() — the optimizer rule uses only metric and schema. Carrying three extra fields in VectorIndexMeta increases implementor burden: a catalog-backed resolver must provide these in the sync peek() path even though they're only needed after ensure_loaded. Consider trimming VectorIndexMeta to { metric, schema } and letting callers get the rest via resolve() once the index is loaded. If you want to keep them for forward-compat, a doc note explaining which fields are actually required by the optimizer would help implementors.

pub metric: MetricKind,
pub scalar_kind: ScalarKind,
pub schema: SchemaRef,
pub key_col: String,
pub config: USearchTableConfig,
}

/// Trait for resolving vector index entries by name.
///
/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline:
/// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies
/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path)
/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor)
///
/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups
/// (always cached). Production systems can implement catalog-backed loading in
/// `ensure_loaded`.
#[async_trait::async_trait]
pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug {
/// Sync, cheap: check if a vector index exists and return its metadata.
/// Used by the optimizer rule to decide if the ANN rewrite should apply.
/// Must NOT do I/O — only check local cache or lightweight state.
fn peek(&self, name: &str) -> Option<VectorIndexMeta>;

/// Sync: return a fully loaded entry from cache.
/// Returns `None` on cache miss — the caller should call `ensure_loaded` first.
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>>;

/// Async: ensure the index is loaded and available in cache.
/// On cache miss or stale entry, downloads files and loads the index.
/// On cache hit with current generation, returns immediately.
/// Called from the async physical planner before building the execution plan.
async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait contract between ensure_loaded and resolve has a latent TOCTOU window: a concurrent eviction between the two calls produces an error rather than a retry. For USearchRegistry this is fine (never evicts), but production async-backed resolvers need to be aware. Consider documenting that implementations should guarantee the entry stays resident for at least one resolve() call after ensure_loaded returns successfully (e.g. via a short-lived pin/refcount), or explicitly note that callers must handle the None case with a retry.

}

// ── USearchRegistry ───────────────────────────────────────────────────────────

pub struct USearchRegistry {
Expand Down Expand Up @@ -373,6 +415,33 @@ impl USearchRegistry {
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}

/// Convert into a trait object for use with the optimizer rule and planner.
pub fn into_resolver(self) -> Arc<dyn VectorIndexResolver> {
Arc::new(self)
}
}

#[async_trait::async_trait]
impl VectorIndexResolver for USearchRegistry {
fn peek(&self, name: &str) -> Option<VectorIndexMeta> {
self.get(name).map(|r| VectorIndexMeta {
metric: r.metric,
scalar_kind: r.scalar_kind,
schema: r.schema.clone(),
key_col: r.key_col.clone(),
config: r.config.clone(),
})
}

fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>> {
self.get(name)
}

async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> {
// USearchRegistry is always fully cached — nothing to load.
Ok(())
}
}

impl Default for USearchRegistry {
Expand Down
14 changes: 7 additions & 7 deletions src/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ use datafusion::optimizer::OptimizerConfig;
use usearch::MetricKind;

use crate::node::{DistanceType, USearchNode};
use crate::registry::USearchRegistry;
use crate::registry::VectorIndexResolver;

pub struct USearchRule {
registry: Arc<USearchRegistry>,
registry: Arc<dyn VectorIndexResolver>,
}

impl USearchRule {
pub fn new(registry: Arc<USearchRegistry>) -> Self {
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
Self { registry }
}

Expand Down Expand Up @@ -104,8 +104,8 @@ impl USearchRule {
// Registry key: "catalog::schema::table::col" (or fewer parts for bare refs).
let reg_key = format!("{}::{}", table_ref_full, vec_col);

// Table must be registered for vector search.
let registered = self.registry.get(&reg_key)?;
// Sync check: does a vector index exist for this key?
let meta = self.registry.peek(&reg_key)?;

let dist_type = match udf_name.as_str() {
"l2_distance" => DistanceType::L2,
Expand All @@ -116,14 +116,14 @@ impl USearchRule {

// Guard: the SQL distance UDF must match the metric the index was built
// with. Mismatch → return None so DataFusion falls back to exact scan.
if !dist_type_matches_metric(&dist_type, registered.metric) {
if !dist_type_matches_metric(&dist_type, meta.metric) {
return None;
}

// Build USearchNode schema: base fields qualified with the original table
// reference (Full/Partial/Bare) so qualifiers match the original plan's schema.
let table_ref = scan_table_ref.clone();
let qualified_fields: Vec<(Option<TableReference>, Arc<arrow_schema::Field>)> = registered
let qualified_fields: Vec<(Option<TableReference>, Arc<arrow_schema::Field>)> = meta
.schema
.fields()
.iter()
Expand Down
17 changes: 12 additions & 5 deletions src/udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,21 @@ use datafusion::physical_plan::{
};
use datafusion::scalar::ScalarValue;

use crate::registry::USearchRegistry;
use crate::registry::VectorIndexResolver;

// ── UDTF ─────────────────────────────────────────────────────────────────────

/// Table function: vector_usearch(table_name, query_vec, k [, ef_search])
///
/// Returns `(key: UInt64, _distance: Float32)`. Join with your data table on
/// the key column to retrieve full rows.
///
/// This entry point is synchronous. For async-backed [`VectorIndexResolver`]
/// implementations, it only works when the target index is already loaded in
/// the local cache. `vector_usearch()` does not call `ensure_loaded()` and
/// cannot trigger async index loads.
pub struct USearchUDTF {
registry: Arc<USearchRegistry>,
registry: Arc<dyn VectorIndexResolver>,
}

impl fmt::Debug for USearchProvider {
Expand All @@ -58,7 +63,7 @@ impl fmt::Debug for USearchUDTF {
}

impl USearchUDTF {
pub fn new(registry: Arc<USearchRegistry>) -> Self {
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
Self { registry }
}
}
Expand All @@ -85,9 +90,11 @@ impl TableFunctionImpl for USearchUDTF {
None
};

let registered = self.registry.get(&table_name).ok_or_else(|| {
let registered = self.registry.resolve(&table_name).ok_or_else(|| {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — UDTF bypasses ensure_loaded, breaking catalog-backed implementations.

TableFunctionImpl::call() is sync, so it cannot call async fn ensure_loaded(). For USearchRegistry this is fine (always cached), but for any production VectorIndexResolver where ensure_loaded actually does I/O, vector_usearch() will always return "not registered" for tables that haven't been pre-loaded — with no indication of why.

The PR introduces VectorIndexResolver as an extension point for production catalog-backed systems, but the UDTF silently doesn't support that path.

Two options:

Option A — Document the limitation so implementors know to pre-load before calling vector_usearch():

Suggested change
let registered = self.registry.resolve(&table_name).ok_or_else(|| {
// NOTE: The UDTF path is sync and cannot call ensure_loaded().
// Production VectorIndexResolver implementations must pre-load
// any table before it can be queried via vector_usearch().
let registered = self.registry.resolve(&table_name).ok_or_else(|| {

Option B — Add a sync peek_resolve() / try_resolve_blocking() method to the trait that implementations can override to do a synchronous cache lookup + blocking load, falling back to resolve() for the base case. This is more invasive but makes the contract explicit for implementors.

DataFusionError::Execution(format!(
"vector_usearch: table '{table_name}' not registered"
"vector_usearch: table '{table_name}' is not loaded locally. \
This synchronous path only checks the local cache and cannot trigger async \
loads. Use the optimizer/planner vector query path or pre-load the index first."
))
})?;

Expand Down
Loading