Skip to content

Commit a9c876a

Browse files
committed
Bind stable vector handles during planning
1 parent f9b4610 commit a9c876a

3 files changed

Lines changed: 56 additions & 47 deletions

File tree

src/planner.rs

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,8 @@ impl ExtensionPlanner for USearchExecPlanner {
140140
None => return Ok(None),
141141
};
142142

143-
// Async: ensure the index is loaded (downloads on cache miss).
144-
self.registry.ensure_loaded(&node.table_name).await?;
145-
146-
let registered = match self.registry.resolve(&node.table_name) {
147-
Some(r) => r,
148-
None => {
149-
return Err(DataFusionError::Execution(format!(
150-
"USearchExecPlanner: table '{}' not available after ensure_loaded",
151-
node.table_name
152-
)));
153-
}
154-
};
143+
// Async: bind a query-stable registry entry during planning.
144+
let registered = self.registry.prepare(&node.table_name).await?;
155145

156146
let exec_props = session_state.execution_props();
157147

@@ -223,17 +213,22 @@ impl ExtensionPlanner for USearchExecPlanner {
223213
None
224214
};
225215

216+
let schema = registered.schema.clone();
217+
let key_col = registered.key_col.clone();
218+
let scalar_kind = registered.scalar_kind;
219+
let brute_force_threshold = registered.config.brute_force_selectivity_threshold;
220+
226221
Ok(Some(Arc::new(USearchExec::new(SearchParams {
227222
table_name: node.table_name.clone(),
228-
registry: self.registry.clone(),
223+
registered,
229224
query_vec: node.query_vec_f64(),
230225
k: node.k,
231226
distance_type: node.distance_type.clone(),
232227
has_filters: !node.filters.is_empty(),
233-
schema: registered.schema.clone(),
234-
key_col: registered.key_col.clone(),
235-
scalar_kind: registered.scalar_kind,
236-
brute_force_threshold: registered.config.brute_force_selectivity_threshold,
228+
schema,
229+
key_col,
230+
scalar_kind,
231+
brute_force_threshold,
237232
provider_scan,
238233
}))))
239234
}
@@ -242,10 +237,10 @@ impl ExtensionPlanner for USearchExecPlanner {
242237
// ── Search parameters ─────────────────────────────────────────────────────────
243238

244239
/// All parameters needed to run a USearch query, cloned cheaply into execute().
245-
#[derive(Debug, Clone)]
240+
#[derive(Clone)]
246241
struct SearchParams {
247242
table_name: String,
248-
registry: Arc<dyn VectorIndexResolver>,
243+
registered: Arc<crate::registry::RegisteredTable>,
249244
query_vec: Vec<f64>,
250245
k: usize,
251246
distance_type: DistanceType,
@@ -261,6 +256,24 @@ struct SearchParams {
261256
provider_scan: Option<Arc<dyn ExecutionPlan>>,
262257
}
263258

259+
impl fmt::Debug for SearchParams {
260+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261+
f.debug_struct("SearchParams")
262+
.field("table_name", &self.table_name)
263+
.field("k", &self.k)
264+
.field("has_filters", &self.has_filters)
265+
.field("schema", &self.schema)
266+
.field("key_col", &self.key_col)
267+
.field("scalar_kind", &self.scalar_kind)
268+
.field("brute_force_threshold", &self.brute_force_threshold)
269+
.field(
270+
"provider_scan",
271+
&self.provider_scan.as_ref().map(|_| "Some(..)"),
272+
)
273+
.finish_non_exhaustive()
274+
}
275+
}
276+
264277
// ── Physical execution node ───────────────────────────────────────────────────
265278

266279
/// Leaf execution plan that defers all I/O to execute() time.
@@ -361,16 +374,7 @@ async fn usearch_execute(
361374
params: SearchParams,
362375
task_ctx: Arc<TaskContext>,
363376
) -> Result<Vec<RecordBatch>> {
364-
// Re-fetch at execute time and reload on cache miss so eviction between
365-
// plan and execute is handled correctly for async-backed resolvers.
366-
params.registry.ensure_loaded(&params.table_name).await?;
367-
368-
let registered = params.registry.resolve(&params.table_name).ok_or_else(|| {
369-
DataFusionError::Execution(format!(
370-
"USearchExec: table '{}' not available after ensure_loaded at execute time",
371-
params.table_name
372-
))
373-
})?;
377+
let registered = params.registered.clone();
374378

375379
if !params.has_filters {
376380
// ── Unfiltered path ───────────────────────────────────────────────

src/registry.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,30 +230,34 @@ pub struct VectorIndexMeta {
230230

231231
/// Trait for resolving vector index entries by name.
232232
///
233-
/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline:
233+
/// DataFusion's optimizer is synchronous but physical planning is async, so the
234+
/// resolver exposes two levels:
234235
/// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies
235-
/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path)
236-
/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor)
236+
/// - `resolve()` — sync, cache-only: used by synchronous callers such as the UDTF
237+
/// - `prepare()` — async: returns a query-stable loaded entry for planning/execution
237238
///
238-
/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups
239-
/// (always cached). Production systems can implement catalog-backed loading in
240-
/// `ensure_loaded`.
239+
/// The important contract is that `prepare()` returns the exact loaded
240+
/// [`RegisteredTable`] the query should execute against. This avoids a second
241+
/// lookup during execution and removes the planner/execute handoff gap where a
242+
/// newer generation could replace the registry entry mid-query.
241243
#[async_trait::async_trait]
242244
pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug {
243245
/// Sync, cheap: check if a vector index exists and return its metadata.
244246
/// Used by the optimizer rule to decide if the ANN rewrite should apply.
245247
/// Must NOT do I/O — only check local cache or lightweight state.
246248
fn peek(&self, name: &str) -> Option<VectorIndexMeta>;
247249

248-
/// Sync: return a fully loaded entry from cache.
249-
/// Returns `None` on cache miss — the caller should call `ensure_loaded` first.
250+
/// Sync, cache-only: return a fully loaded entry from local cache.
251+
/// Returns `None` on cache miss.
250252
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>>;
251253

252-
/// Async: ensure the index is loaded and available in cache.
253-
/// On cache miss or stale entry, downloads files and loads the index.
254-
/// On cache hit with current generation, returns immediately.
255-
/// Called from the async physical planner before building the execution plan.
256-
async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>;
254+
/// Async: return a query-stable loaded entry.
255+
///
256+
/// On cache miss or stale entry, implementations may download files and
257+
/// load the current generation. On success, the returned [`Arc`] must stay
258+
/// valid for the lifetime of the query even if a newer generation is loaded
259+
/// concurrently afterwards.
260+
async fn prepare(&self, name: &str) -> datafusion::common::Result<Arc<RegisteredTable>>;
257261
}
258262

259263
// ── USearchRegistry ───────────────────────────────────────────────────────────
@@ -438,9 +442,10 @@ impl VectorIndexResolver for USearchRegistry {
438442
self.get(name)
439443
}
440444

441-
async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> {
442-
// USearchRegistry is always fully cached — nothing to load.
443-
Ok(())
445+
async fn prepare(&self, name: &str) -> datafusion::common::Result<Arc<RegisteredTable>> {
446+
self.get(name).ok_or_else(|| {
447+
DataFusionError::Execution(format!("USearchRegistry: table '{name}' is not loaded"))
448+
})
444449
}
445450
}
446451

src/udtf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ use crate::registry::VectorIndexResolver;
4444
///
4545
/// This entry point is synchronous. For async-backed [`VectorIndexResolver`]
4646
/// implementations, it only works when the target index is already loaded in
47-
/// the local cache. `vector_usearch()` does not call `ensure_loaded()` and
48-
/// cannot trigger async index loads.
47+
/// the local cache. `vector_usearch()` does not call `prepare()` and cannot
48+
/// trigger async index loads.
4949
pub struct USearchUDTF {
5050
registry: Arc<dyn VectorIndexResolver>,
5151
}

0 commit comments

Comments
 (0)