Skip to content

Commit f21e8a4

Browse files
authored
Bind stable vector handles during planning (#19)
* Bind stable vector handles during planning * refactor(planner): remove redundant flat fields from SearchParams Access schema, key_col, scalar_kind, and brute_force_threshold through params.registered.* instead of duplicating them as flat fields on SearchParams.
1 parent f9b4610 commit f21e8a4

3 files changed

Lines changed: 60 additions & 61 deletions

File tree

src/planner.rs

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,8 @@ impl ExtensionPlanner for USearchExecPlanner {
140140
None => return Ok(None),
141141
};
142142

143-
// Async: ensure the index is loaded (downloads on cache miss).
144-
self.registry.ensure_loaded(&node.table_name).await?;
145-
146-
let registered = match self.registry.resolve(&node.table_name) {
147-
Some(r) => r,
148-
None => {
149-
return Err(DataFusionError::Execution(format!(
150-
"USearchExecPlanner: table '{}' not available after ensure_loaded",
151-
node.table_name
152-
)));
153-
}
154-
};
143+
// Async: bind a query-stable registry entry during planning.
144+
let registered = self.registry.prepare(&node.table_name).await?;
155145

156146
let exec_props = session_state.execution_props();
157147

@@ -225,15 +215,11 @@ impl ExtensionPlanner for USearchExecPlanner {
225215

226216
Ok(Some(Arc::new(USearchExec::new(SearchParams {
227217
table_name: node.table_name.clone(),
228-
registry: self.registry.clone(),
218+
registered,
229219
query_vec: node.query_vec_f64(),
230220
k: node.k,
231221
distance_type: node.distance_type.clone(),
232222
has_filters: !node.filters.is_empty(),
233-
schema: registered.schema.clone(),
234-
key_col: registered.key_col.clone(),
235-
scalar_kind: registered.scalar_kind,
236-
brute_force_threshold: registered.config.brute_force_selectivity_threshold,
237223
provider_scan,
238224
}))))
239225
}
@@ -242,25 +228,42 @@ impl ExtensionPlanner for USearchExecPlanner {
242228
// ── Search parameters ─────────────────────────────────────────────────────────
243229

244230
/// All parameters needed to run a USearch query, cloned cheaply into execute().
245-
#[derive(Debug, Clone)]
231+
#[derive(Clone)]
246232
struct SearchParams {
247233
table_name: String,
248-
registry: Arc<dyn VectorIndexResolver>,
234+
registered: Arc<crate::registry::RegisteredTable>,
249235
query_vec: Vec<f64>,
250236
k: usize,
251237
distance_type: DistanceType,
252238
/// Whether the query has WHERE-clause filters. Used to choose between the
253239
/// unfiltered HNSW path and the adaptive filtered path.
254240
has_filters: bool,
255-
schema: SchemaRef,
256-
key_col: String,
257-
scalar_kind: ScalarKind,
258-
brute_force_threshold: f64,
259241
/// Pre-planned provider scan for the filtered path (_key + filter cols only).
260242
/// Used for selectivity estimation. None for the unfiltered path.
261243
provider_scan: Option<Arc<dyn ExecutionPlan>>,
262244
}
263245

246+
impl fmt::Debug for SearchParams {
247+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248+
f.debug_struct("SearchParams")
249+
.field("table_name", &self.table_name)
250+
.field("k", &self.k)
251+
.field("has_filters", &self.has_filters)
252+
.field("schema", &self.registered.schema)
253+
.field("key_col", &self.registered.key_col)
254+
.field("scalar_kind", &self.registered.scalar_kind)
255+
.field(
256+
"brute_force_threshold",
257+
&self.registered.config.brute_force_selectivity_threshold,
258+
)
259+
.field(
260+
"provider_scan",
261+
&self.provider_scan.as_ref().map(|_| "Some(..)"),
262+
)
263+
.finish_non_exhaustive()
264+
}
265+
}
266+
264267
// ── Physical execution node ───────────────────────────────────────────────────
265268

266269
/// Leaf execution plan that defers all I/O to execute() time.
@@ -273,7 +276,7 @@ pub struct USearchExec {
273276
impl USearchExec {
274277
fn new(params: SearchParams) -> Self {
275278
let properties = PlanProperties::new(
276-
EquivalenceProperties::new(params.schema.clone()),
279+
EquivalenceProperties::new(params.registered.schema.clone()),
277280
Partitioning::UnknownPartitioning(1),
278281
EmissionType::Incremental,
279282
Boundedness::Bounded,
@@ -340,7 +343,7 @@ impl ExecutionPlan for USearchExec {
340343
});
341344

342345
Ok(Box::pin(RecordBatchStreamAdapter::new(
343-
self.params.schema.clone(),
346+
self.params.registered.schema.clone(),
344347
stream,
345348
)))
346349
}
@@ -361,16 +364,7 @@ async fn usearch_execute(
361364
params: SearchParams,
362365
task_ctx: Arc<TaskContext>,
363366
) -> Result<Vec<RecordBatch>> {
364-
// Re-fetch at execute time and reload on cache miss so eviction between
365-
// plan and execute is handled correctly for async-backed resolvers.
366-
params.registry.ensure_loaded(&params.table_name).await?;
367-
368-
let registered = params.registry.resolve(&params.table_name).ok_or_else(|| {
369-
DataFusionError::Execution(format!(
370-
"USearchExec: table '{}' not available after ensure_loaded at execute time",
371-
params.table_name
372-
))
373-
})?;
367+
let registered = params.registered.clone();
374368

375369
if !params.has_filters {
376370
// ── Unfiltered path ───────────────────────────────────────────────
@@ -385,7 +379,7 @@ async fn usearch_execute(
385379
&registered.index,
386380
&params.query_vec,
387381
params.k,
388-
params.scalar_kind,
382+
registered.scalar_kind,
389383
)?
390384
};
391385

@@ -404,7 +398,7 @@ async fn usearch_execute(
404398
let data_batches = async {
405399
registered
406400
.lookup_provider
407-
.fetch_by_keys(&matches.keys, &params.key_col, None)
401+
.fetch_by_keys(&matches.keys, &registered.key_col, None)
408402
.await
409403
}
410404
.instrument(tracing::info_span!(
@@ -415,7 +409,7 @@ async fn usearch_execute(
415409

416410
let key_col_idx = provider_key_col_idx(&registered)?;
417411
let _span = tracing::info_span!("usearch_attach_distances").entered();
418-
attach_distances(data_batches, key_col_idx, &key_to_dist, &params.schema)
412+
attach_distances(data_batches, key_col_idx, &key_to_dist, &registered.schema)
419413
} else {
420414
// ── Adaptive filtered path ────────────────────────────────────────
421415
let scan = params.provider_scan.clone().ok_or_else(|| {
@@ -489,7 +483,7 @@ async fn adaptive_filtered_execute(
489483

490484
let total = registered.index.size();
491485
let selectivity = valid_keys.len() as f64 / total.max(1) as f64;
492-
let threshold = params.brute_force_threshold;
486+
let threshold = registered.config.brute_force_selectivity_threshold;
493487

494488
let path = if selectivity <= threshold {
495489
"index-get"
@@ -535,7 +529,7 @@ async fn adaptive_filtered_execute(
535529
let data_batches = async {
536530
registered
537531
.lookup_provider
538-
.fetch_by_keys(&fetch_keys, &params.key_col, None)
532+
.fetch_by_keys(&fetch_keys, &registered.key_col, None)
539533
.await
540534
}
541535
.instrument(tracing::info_span!(
@@ -550,7 +544,7 @@ async fn adaptive_filtered_execute(
550544
data_batches,
551545
lookup_key_col_idx,
552546
&key_to_dist,
553-
&params.schema,
547+
&registered.schema,
554548
)?
555549
};
556550

@@ -591,7 +585,7 @@ async fn adaptive_filtered_execute(
591585
let data_batches = async {
592586
registered
593587
.lookup_provider
594-
.fetch_by_keys(&matches.keys, &params.key_col, None)
588+
.fetch_by_keys(&matches.keys, &registered.key_col, None)
595589
.await
596590
}
597591
.instrument(tracing::info_span!(
@@ -606,7 +600,7 @@ async fn adaptive_filtered_execute(
606600
data_batches,
607601
lookup_key_col_idx,
608602
&key_to_dist,
609-
&params.schema,
603+
&registered.schema,
610604
)?
611605
};
612606

src/registry.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,30 +230,34 @@ pub struct VectorIndexMeta {
230230

231231
/// Trait for resolving vector index entries by name.
232232
///
233-
/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline:
233+
/// DataFusion's optimizer is synchronous but physical planning is async, so the
234+
/// resolver exposes two levels:
234235
/// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies
235-
/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path)
236-
/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor)
236+
/// - `resolve()` — sync, cache-only: used by synchronous callers such as the UDTF
237+
/// - `prepare()` — async: returns a query-stable loaded entry for planning/execution
237238
///
238-
/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups
239-
/// (always cached). Production systems can implement catalog-backed loading in
240-
/// `ensure_loaded`.
239+
/// The important contract is that `prepare()` returns the exact loaded
240+
/// [`RegisteredTable`] the query should execute against. This avoids a second
241+
/// lookup during execution and removes the planner/execute handoff gap where a
242+
/// newer generation could replace the registry entry mid-query.
241243
#[async_trait::async_trait]
242244
pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug {
243245
/// Sync, cheap: check if a vector index exists and return its metadata.
244246
/// Used by the optimizer rule to decide if the ANN rewrite should apply.
245247
/// Must NOT do I/O — only check local cache or lightweight state.
246248
fn peek(&self, name: &str) -> Option<VectorIndexMeta>;
247249

248-
/// Sync: return a fully loaded entry from cache.
249-
/// Returns `None` on cache miss — the caller should call `ensure_loaded` first.
250+
/// Sync, cache-only: return a fully loaded entry from local cache.
251+
/// Returns `None` on cache miss.
250252
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>>;
251253

252-
/// Async: ensure the index is loaded and available in cache.
253-
/// On cache miss or stale entry, downloads files and loads the index.
254-
/// On cache hit with current generation, returns immediately.
255-
/// Called from the async physical planner before building the execution plan.
256-
async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>;
254+
/// Async: return a query-stable loaded entry.
255+
///
256+
/// On cache miss or stale entry, implementations may download files and
257+
/// load the current generation. On success, the returned [`Arc`] must stay
258+
/// valid for the lifetime of the query even if a newer generation is loaded
259+
/// concurrently afterwards.
260+
async fn prepare(&self, name: &str) -> datafusion::common::Result<Arc<RegisteredTable>>;
257261
}
258262

259263
// ── USearchRegistry ───────────────────────────────────────────────────────────
@@ -438,9 +442,10 @@ impl VectorIndexResolver for USearchRegistry {
438442
self.get(name)
439443
}
440444

441-
async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> {
442-
// USearchRegistry is always fully cached — nothing to load.
443-
Ok(())
445+
async fn prepare(&self, name: &str) -> datafusion::common::Result<Arc<RegisteredTable>> {
446+
self.get(name).ok_or_else(|| {
447+
DataFusionError::Execution(format!("USearchRegistry: table '{name}' is not loaded"))
448+
})
444449
}
445450
}
446451

src/udtf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ use crate::registry::VectorIndexResolver;
4444
///
4545
/// This entry point is synchronous. For async-backed [`VectorIndexResolver`]
4646
/// implementations, it only works when the target index is already loaded in
47-
/// the local cache. `vector_usearch()` does not call `ensure_loaded()` and
48-
/// cannot trigger async index loads.
47+
/// the local cache. `vector_usearch()` does not call `prepare()` and cannot
48+
/// trigger async index loads.
4949
pub struct USearchUDTF {
5050
registry: Arc<dyn VectorIndexResolver>,
5151
}

0 commit comments

Comments
 (0)