Skip to content

Commit b5823ac

Browse files
committed
feat: split VectorIndexResolver into peek/resolve/ensure_loaded
Split the resolver trait to match DataFusion's sync/async pipeline: - peek(key) — sync, cheap: returns VectorIndexMeta (metric, schema) for the optimizer rule to validate ANN rewrites without loading - resolve(key) — sync: returns loaded RegisteredTable from cache - ensure_loaded(key) — async: loads index on cache miss, called by the physical planner before building execution plans The optimizer rule uses peek (sync, no I/O). The planner calls ensure_loaded (async) then resolve. The executor uses resolve (sync, cache only). This enables catalog-backed resolution where ensure_loaded downloads and loads index files on demand, while the optimizer stays sync and fast.
1 parent 2f98713 commit b5823ac

4 files changed

Lines changed: 58 additions & 15 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub use lookup::{HashKeyProvider, PointLookupProvider};
7575
pub use node::{DistanceType, USearchNode};
7676
pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner};
7777
pub use registry::{
78-
RegisteredTable, USearchIndexConfig, USearchRegistry, USearchTableConfig,
78+
RegisteredTable, USearchIndexConfig, USearchRegistry, USearchTableConfig, VectorIndexMeta,
7979
VectorIndexResolver,
8080
};
8181
pub use rule::USearchRule;

src/planner.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,14 @@ impl ExtensionPlanner for USearchExecPlanner {
140140
None => return Ok(None),
141141
};
142142

143-
// Cheap validation: RwLock read + HashMap lookup — no I/O.
143+
// Async: ensure the index is loaded (downloads on cache miss).
144+
self.registry.ensure_loaded(&node.table_name).await?;
145+
144146
let registered = match self.registry.resolve(&node.table_name) {
145147
Some(r) => r,
146148
None => {
147149
return Err(DataFusionError::Execution(format!(
148-
"USearchExecPlanner: table '{}' not in registry",
150+
"USearchExecPlanner: table '{}' not available after ensure_loaded",
149151
node.table_name
150152
)));
151153
}

src/registry.rs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,19 +216,44 @@ pub struct RegisteredTable {
216216

217217
// ── VectorIndexResolver ───────────────────────────────────────────────────────
218218

219+
/// Lightweight metadata returned by the sync `peek` check.
220+
/// Contains enough info for the optimizer to validate the ANN rewrite
221+
/// and build the logical plan node, without needing the loaded index objects.
222+
#[derive(Debug, Clone)]
223+
pub struct VectorIndexMeta {
224+
pub metric: MetricKind,
225+
pub scalar_kind: ScalarKind,
226+
pub schema: SchemaRef,
227+
pub key_col: String,
228+
pub config: USearchTableConfig,
229+
}
230+
219231
/// Trait for resolving vector index entries by name.
220232
///
221-
/// The optimizer rule, physical planner, and executor all call `resolve()` to
222-
/// obtain the loaded index, providers, and metadata needed for ANN execution.
233+
/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline:
234+
/// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies
235+
/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path)
236+
/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor)
223237
///
224-
/// The built-in [`USearchRegistry`] implements this as a direct hashmap lookup.
225-
/// Production systems can implement a catalog-backed resolver that treats the
226-
/// in-memory index as a cache and reloads from storage on miss or staleness.
238+
/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups
239+
/// (always cached). Production systems can implement catalog-backed loading in
240+
/// `ensure_loaded`.
241+
#[async_trait::async_trait]
227242
pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug {
228-
/// Look up a registered table by its key (e.g., `"catalog::schema::table::col"`).
229-
///
230-
/// Returns `None` if the key is not found or the index is not available.
243+
/// Sync, cheap: check if a vector index exists and return its metadata.
244+
/// Used by the optimizer rule to decide if the ANN rewrite should apply.
245+
/// Must NOT do I/O — only check local cache or lightweight state.
246+
fn peek(&self, name: &str) -> Option<VectorIndexMeta>;
247+
248+
/// Sync: return a fully loaded entry from cache.
249+
/// Returns `None` on cache miss — the caller should call `ensure_loaded` first.
231250
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>>;
251+
252+
/// Async: ensure the index is loaded and available in cache.
253+
/// On cache miss or stale entry, downloads files and loads the index.
254+
/// On cache hit with current generation, returns immediately.
255+
/// Called from the async physical planner before building the execution plan.
256+
async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>;
232257
}
233258

234259
// ── USearchRegistry ───────────────────────────────────────────────────────────
@@ -397,10 +422,26 @@ impl USearchRegistry {
397422
}
398423
}
399424

425+
#[async_trait::async_trait]
400426
impl VectorIndexResolver for USearchRegistry {
427+
fn peek(&self, name: &str) -> Option<VectorIndexMeta> {
428+
self.get(name).map(|r| VectorIndexMeta {
429+
metric: r.metric,
430+
scalar_kind: r.scalar_kind,
431+
schema: r.schema.clone(),
432+
key_col: r.key_col.clone(),
433+
config: r.config.clone(),
434+
})
435+
}
436+
401437
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>> {
402438
self.get(name)
403439
}
440+
441+
async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> {
442+
// USearchRegistry is always fully cached — nothing to load.
443+
Ok(())
444+
}
404445
}
405446

406447
impl Default for USearchRegistry {

src/rule.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ impl USearchRule {
104104
// Registry key: "catalog::schema::table::col" (or fewer parts for bare refs).
105105
let reg_key = format!("{}::{}", table_ref_full, vec_col);
106106

107-
// Table must be registered for vector search.
108-
let registered = self.registry.resolve(&reg_key)?;
107+
// Sync check: does a vector index exist for this key?
108+
let meta = self.registry.peek(&reg_key)?;
109109

110110
let dist_type = match udf_name.as_str() {
111111
"l2_distance" => DistanceType::L2,
@@ -116,14 +116,14 @@ impl USearchRule {
116116

117117
// Guard: the SQL distance UDF must match the metric the index was built
118118
// with. Mismatch → return None so DataFusion falls back to exact scan.
119-
if !dist_type_matches_metric(&dist_type, registered.metric) {
119+
if !dist_type_matches_metric(&dist_type, meta.metric) {
120120
return None;
121121
}
122122

123123
// Build USearchNode schema: base fields qualified with the original table
124124
// reference (Full/Partial/Bare) so qualifiers match the original plan's schema.
125125
let table_ref = scan_table_ref.clone();
126-
let qualified_fields: Vec<(Option<TableReference>, Arc<arrow_schema::Field>)> = registered
126+
let qualified_fields: Vec<(Option<TableReference>, Arc<arrow_schema::Field>)> = meta
127127
.schema
128128
.fields()
129129
.iter()

0 commit comments

Comments
 (0)