Skip to content

Commit f9b4610

Browse files
authored
feat: split VectorIndexResolver into peek/resolve/ensure_loaded (#18)
* feat: add VectorIndexResolver trait for pluggable index resolution Replace direct USearchRegistry dependency with a VectorIndexResolver trait throughout the optimizer rule, physical planner, executor, and UDTF. USearchRegistry implements the trait (backward compatible). This enables production systems to implement a catalog-backed resolver that treats the in-memory index as a cache rather than the source of truth, following the same publication model as sorted/BM25 indexes. Key changes: - Add VectorIndexResolver trait with resolve(name) -> Option<Arc<RegisteredTable>> - USearchRegistry implements VectorIndexResolver - All consumers now accept Arc<dyn VectorIndexResolver> - Add into_resolver() convenience method on USearchRegistry * feat: split VectorIndexResolver into peek/resolve/ensure_loaded Split the resolver trait to match DataFusion's sync/async pipeline: - peek(key) — sync, cheap: returns VectorIndexMeta (metric, schema) for the optimizer rule to validate ANN rewrites without loading - resolve(key) — sync: returns loaded RegisteredTable from cache - ensure_loaded(key) — async: loads index on cache miss, called by the physical planner before building execution plans The optimizer rule uses peek (sync, no I/O). The planner calls ensure_loaded (async) then resolve. The executor uses resolve (sync, cache only). This enables catalog-backed resolution where ensure_loaded downloads and loads index files on demand, while the optimizer stays sync and fast. * Clarify vector_usearch cache-only behavior * Reload vector indexes at execute time
1 parent ca92c15 commit f9b4610

5 files changed

Lines changed: 112 additions & 25 deletions

File tree

src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ pub use keys::{DatasetLayout, pack_key, unpack_key};
7474
pub use lookup::{HashKeyProvider, PointLookupProvider};
7575
pub use node::{DistanceType, USearchNode};
7676
pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner};
77-
pub use registry::{RegisteredTable, USearchIndexConfig, USearchRegistry, USearchTableConfig};
77+
pub use registry::{
78+
RegisteredTable, USearchIndexConfig, USearchRegistry, USearchTableConfig, VectorIndexMeta,
79+
VectorIndexResolver,
80+
};
7881
pub use rule::USearchRule;
7982
pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf};
8083
pub use udtf::USearchUDTF;
@@ -97,16 +100,19 @@ use datafusion::prelude::SessionContext;
97100
/// - `cosine_distance(col, query)` — cosine distance
98101
/// - `negative_dot_product(col, query)` — negated inner product
99102
/// - `vector_usearch(table, query, k)` — explicit ANN table function
103+
/// (cache-only for async-backed resolvers; does not trigger async loads)
100104
/// - [`USearchRule`] — optimizer rewrite rule
101105
///
102106
/// The [`USearchQueryPlanner`] must be installed at `SessionState` build time
103107
/// (before this call) via `SessionStateBuilder::with_query_planner`.
104-
pub fn register_all(ctx: &SessionContext, registry: Arc<USearchRegistry>) -> Result<()> {
108+
pub fn register_all(ctx: &SessionContext, registry: Arc<dyn VectorIndexResolver>) -> Result<()> {
105109
ctx.register_udf(ScalarUDF::new_from_impl(l2_distance_udf()));
106110
ctx.register_udf(ScalarUDF::new_from_impl(cosine_distance_udf()));
107111
ctx.register_udf(ScalarUDF::new_from_impl(negative_dot_product_udf()));
108112
ctx.register_udtf(
109113
"vector_usearch",
114+
// `vector_usearch()` is synchronous and therefore cache-only for
115+
// async-backed resolvers.
110116
Arc::new(USearchUDTF::new(registry.clone())),
111117
);
112118
ctx.add_optimizer_rule(Arc::new(USearchRule::new(registry)));

src/planner.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use datafusion::logical_expr::Expr;
6262

6363
use crate::lookup::extract_keys_as_u64;
6464
use crate::node::{DistanceType, USearchNode};
65-
use crate::registry::USearchRegistry;
65+
use crate::registry::VectorIndexResolver;
6666

6767
/// Strip table qualifiers from column references so expressions can be
6868
/// resolved against an unqualified Arrow schema. Mirrors the pattern in
@@ -92,7 +92,7 @@ impl fmt::Debug for USearchQueryPlanner {
9292
}
9393

9494
impl USearchQueryPlanner {
95-
pub fn new(registry: Arc<USearchRegistry>) -> Self {
95+
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
9696
let inner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
9797
USearchExecPlanner::new(registry),
9898
)]);
@@ -116,11 +116,11 @@ impl QueryPlanner for USearchQueryPlanner {
116116
// ── Extension planner ─────────────────────────────────────────────────────────
117117

118118
pub struct USearchExecPlanner {
119-
registry: Arc<USearchRegistry>,
119+
registry: Arc<dyn VectorIndexResolver>,
120120
}
121121

122122
impl USearchExecPlanner {
123-
pub fn new(registry: Arc<USearchRegistry>) -> Self {
123+
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
124124
Self { registry }
125125
}
126126
}
@@ -140,12 +140,14 @@ impl ExtensionPlanner for USearchExecPlanner {
140140
None => return Ok(None),
141141
};
142142

143-
// Cheap validation: RwLock read + HashMap lookup — no I/O.
144-
let registered = match self.registry.get(&node.table_name) {
143+
// Async: ensure the index is loaded (downloads on cache miss).
144+
self.registry.ensure_loaded(&node.table_name).await?;
145+
146+
let registered = match self.registry.resolve(&node.table_name) {
145147
Some(r) => r,
146148
None => {
147149
return Err(DataFusionError::Execution(format!(
148-
"USearchExecPlanner: table '{}' not in registry",
150+
"USearchExecPlanner: table '{}' not available after ensure_loaded",
149151
node.table_name
150152
)));
151153
}
@@ -243,7 +245,7 @@ impl ExtensionPlanner for USearchExecPlanner {
243245
#[derive(Debug, Clone)]
244246
struct SearchParams {
245247
table_name: String,
246-
registry: Arc<USearchRegistry>,
248+
registry: Arc<dyn VectorIndexResolver>,
247249
query_vec: Vec<f64>,
248250
k: usize,
249251
distance_type: DistanceType,
@@ -359,10 +361,13 @@ async fn usearch_execute(
359361
params: SearchParams,
360362
task_ctx: Arc<TaskContext>,
361363
) -> Result<Vec<RecordBatch>> {
362-
// Re-fetch at execute time so cache eviction between plan and execute is handled correctly.
363-
let registered = params.registry.get(&params.table_name).ok_or_else(|| {
364+
// Re-fetch at execute time and reload on cache miss so eviction between
365+
// plan and execute is handled correctly for async-backed resolvers.
366+
params.registry.ensure_loaded(&params.table_name).await?;
367+
368+
let registered = params.registry.resolve(&params.table_name).ok_or_else(|| {
364369
DataFusionError::Execution(format!(
365-
"USearchExec: table '{}' not in registry at execute time",
370+
"USearchExec: table '{}' not available after ensure_loaded at execute time",
366371
params.table_name
367372
))
368373
})?;

src/registry.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,48 @@ pub struct RegisteredTable {
214214
pub config: USearchTableConfig,
215215
}
216216

217+
// ── VectorIndexResolver ───────────────────────────────────────────────────────
218+
219+
/// Lightweight metadata returned by the sync `peek` check.
220+
/// Contains enough info for the optimizer to validate the ANN rewrite
221+
/// and build the logical plan node, without needing the loaded index objects.
222+
#[derive(Debug, Clone)]
223+
pub struct VectorIndexMeta {
224+
pub metric: MetricKind,
225+
pub scalar_kind: ScalarKind,
226+
pub schema: SchemaRef,
227+
pub key_col: String,
228+
pub config: USearchTableConfig,
229+
}
230+
231+
/// Trait for resolving vector index entries by name.
232+
///
233+
/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline:
234+
/// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies
235+
/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path)
236+
/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor)
237+
///
238+
/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups
239+
/// (always cached). Production systems can implement catalog-backed loading in
240+
/// `ensure_loaded`.
241+
#[async_trait::async_trait]
242+
pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug {
243+
/// Sync, cheap: check if a vector index exists and return its metadata.
244+
/// Used by the optimizer rule to decide if the ANN rewrite should apply.
245+
/// Must NOT do I/O — only check local cache or lightweight state.
246+
fn peek(&self, name: &str) -> Option<VectorIndexMeta>;
247+
248+
/// Sync: return a fully loaded entry from cache.
249+
/// Returns `None` on cache miss — the caller should call `ensure_loaded` first.
250+
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>>;
251+
252+
/// Async: ensure the index is loaded and available in cache.
253+
/// On cache miss or stale entry, downloads files and loads the index.
254+
/// On cache hit with current generation, returns immediately.
255+
/// Called from the async physical planner before building the execution plan.
256+
async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>;
257+
}
258+
217259
// ── USearchRegistry ───────────────────────────────────────────────────────────
218260

219261
pub struct USearchRegistry {
@@ -373,6 +415,33 @@ impl USearchRegistry {
373415
pub fn into_arc(self) -> Arc<Self> {
374416
Arc::new(self)
375417
}
418+
419+
/// Convert into a trait object for use with the optimizer rule and planner.
420+
pub fn into_resolver(self) -> Arc<dyn VectorIndexResolver> {
421+
Arc::new(self)
422+
}
423+
}
424+
425+
#[async_trait::async_trait]
426+
impl VectorIndexResolver for USearchRegistry {
427+
fn peek(&self, name: &str) -> Option<VectorIndexMeta> {
428+
self.get(name).map(|r| VectorIndexMeta {
429+
metric: r.metric,
430+
scalar_kind: r.scalar_kind,
431+
schema: r.schema.clone(),
432+
key_col: r.key_col.clone(),
433+
config: r.config.clone(),
434+
})
435+
}
436+
437+
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>> {
438+
self.get(name)
439+
}
440+
441+
async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> {
442+
// USearchRegistry is always fully cached — nothing to load.
443+
Ok(())
444+
}
376445
}
377446

378447
impl Default for USearchRegistry {

src/rule.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ use datafusion::optimizer::OptimizerConfig;
3535
use usearch::MetricKind;
3636

3737
use crate::node::{DistanceType, USearchNode};
38-
use crate::registry::USearchRegistry;
38+
use crate::registry::VectorIndexResolver;
3939

4040
pub struct USearchRule {
41-
registry: Arc<USearchRegistry>,
41+
registry: Arc<dyn VectorIndexResolver>,
4242
}
4343

4444
impl USearchRule {
45-
pub fn new(registry: Arc<USearchRegistry>) -> Self {
45+
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
4646
Self { registry }
4747
}
4848

@@ -104,8 +104,8 @@ impl USearchRule {
104104
// Registry key: "catalog::schema::table::col" (or fewer parts for bare refs).
105105
let reg_key = format!("{}::{}", table_ref_full, vec_col);
106106

107-
// Table must be registered for vector search.
108-
let registered = self.registry.get(&reg_key)?;
107+
// Sync check: does a vector index exist for this key?
108+
let meta = self.registry.peek(&reg_key)?;
109109

110110
let dist_type = match udf_name.as_str() {
111111
"l2_distance" => DistanceType::L2,
@@ -116,14 +116,14 @@ impl USearchRule {
116116

117117
// Guard: the SQL distance UDF must match the metric the index was built
118118
// with. Mismatch → return None so DataFusion falls back to exact scan.
119-
if !dist_type_matches_metric(&dist_type, registered.metric) {
119+
if !dist_type_matches_metric(&dist_type, meta.metric) {
120120
return None;
121121
}
122122

123123
// Build USearchNode schema: base fields qualified with the original table
124124
// reference (Full/Partial/Bare) so qualifiers match the original plan's schema.
125125
let table_ref = scan_table_ref.clone();
126-
let qualified_fields: Vec<(Option<TableReference>, Arc<arrow_schema::Field>)> = registered
126+
let qualified_fields: Vec<(Option<TableReference>, Arc<arrow_schema::Field>)> = meta
127127
.schema
128128
.fields()
129129
.iter()

src/udtf.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,21 @@ use datafusion::physical_plan::{
3333
};
3434
use datafusion::scalar::ScalarValue;
3535

36-
use crate::registry::USearchRegistry;
36+
use crate::registry::VectorIndexResolver;
3737

3838
// ── UDTF ─────────────────────────────────────────────────────────────────────
3939

4040
/// Table function: vector_usearch(table_name, query_vec, k [, ef_search])
4141
///
4242
/// Returns `(key: UInt64, _distance: Float32)`. Join with your data table on
4343
/// the key column to retrieve full rows.
44+
///
45+
/// This entry point is synchronous. For async-backed [`VectorIndexResolver`]
46+
/// implementations, it only works when the target index is already loaded in
47+
/// the local cache. `vector_usearch()` does not call `ensure_loaded()` and
48+
/// cannot trigger async index loads.
4449
pub struct USearchUDTF {
45-
registry: Arc<USearchRegistry>,
50+
registry: Arc<dyn VectorIndexResolver>,
4651
}
4752

4853
impl fmt::Debug for USearchProvider {
@@ -58,7 +63,7 @@ impl fmt::Debug for USearchUDTF {
5863
}
5964

6065
impl USearchUDTF {
61-
pub fn new(registry: Arc<USearchRegistry>) -> Self {
66+
pub fn new(registry: Arc<dyn VectorIndexResolver>) -> Self {
6267
Self { registry }
6368
}
6469
}
@@ -85,9 +90,11 @@ impl TableFunctionImpl for USearchUDTF {
8590
None
8691
};
8792

88-
let registered = self.registry.get(&table_name).ok_or_else(|| {
93+
let registered = self.registry.resolve(&table_name).ok_or_else(|| {
8994
DataFusionError::Execution(format!(
90-
"vector_usearch: table '{table_name}' not registered"
95+
"vector_usearch: table '{table_name}' is not loaded locally. \
96+
This synchronous path only checks the local cache and cannot trigger async \
97+
loads. Use the optimizer/planner vector query path or pre-load the index first."
9198
))
9299
})?;
93100

0 commit comments

Comments
 (0)