From 7079e80be0ab4a2287f7315621cb892fc41f3810 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 17 Mar 2026 23:31:06 +0530 Subject: [PATCH 1/4] refactor(registry): split provider into scan_provider + lookup_provider Separate RegisteredTable.provider into scan_provider (TableProvider for WHERE evaluation) and lookup_provider (PointLookupProvider for key-based fetch), preparing for Parquet-native adaptive filtering. --- src/lookup.rs | 15 +++++++++++++-- src/parquet_provider.rs | 4 ++++ src/planner.rs | 16 ++++++++-------- src/registry.rs | 32 ++++++++++++++++++++++---------- src/sqlite_provider.rs | 4 ++++ tests/execution.rs | 1 + tests/optimizer_rule.rs | 2 ++ 7 files changed, 54 insertions(+), 20 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 83994c2..7184fa4 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -25,7 +25,7 @@ use datafusion::physical_plan::ExecutionPlan; // ── Trait ───────────────────────────────────────────────────────────────────── -/// A [`TableProvider`] that guarantees efficient row retrieval by primary key. +/// Trait for efficient row retrieval by primary key. /// /// Implementors provide O(k) or O(k log N) row lookups — no full-table scan. /// The `USearchRegistry` requires this trait instead of a bare `TableProvider` @@ -33,6 +33,8 @@ use datafusion::physical_plan::ExecutionPlan; /// /// # Contract /// +/// - `schema()` MUST return the Arrow schema of the rows returned by +/// `fetch_by_keys` (when `projection` is `None`). /// - `fetch_by_keys` MUST return only rows whose key column value is in `keys`. /// - Keys not found in the table are silently omitted — not an error. /// - Returned batches must use a schema consistent with `self.schema()`. When @@ -46,6 +48,8 @@ use datafusion::physical_plan::ExecutionPlan; /// ```rust,ignore /// #[async_trait] /// impl PointLookupProvider for MyEngineTable { +/// fn schema(&self) -> SchemaRef { self.schema.clone() } +/// /// async fn fetch_by_keys( /// &self, /// keys: &[u64], @@ -60,7 +64,10 @@ use datafusion::physical_plan::ExecutionPlan; /// } /// ``` #[async_trait] -pub trait PointLookupProvider: TableProvider + Send + Sync { +pub trait PointLookupProvider: Send + Sync { + /// Arrow schema of the rows this provider returns (without `_distance`). + fn schema(&self) -> SchemaRef; + async fn fetch_by_keys( &self, keys: &[u64], @@ -145,6 +152,10 @@ impl fmt::Debug for HashKeyProvider { #[async_trait] impl PointLookupProvider for HashKeyProvider { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + async fn fetch_by_keys( &self, keys: &[u64], diff --git a/src/parquet_provider.rs b/src/parquet_provider.rs index be4c770..d6d1198 100644 --- a/src/parquet_provider.rs +++ b/src/parquet_provider.rs @@ -148,6 +148,10 @@ async fn load_metadata_cache( #[async_trait] impl PointLookupProvider for ParquetLookupProvider { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + async fn fetch_by_keys( &self, keys: &[u64], diff --git a/src/planner.rs b/src/planner.rs index 2e89688..dc063e0 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -145,7 +145,7 @@ impl ExtensionPlanner for USearchExecPlanner { let provider_scan = if !node.filters.is_empty() { Some( registered - .provider + .scan_provider .scan(session_state, None, &[], None) .await?, ) @@ -313,7 +313,7 @@ async fn usearch_execute( .collect(); let data_batches = registered - .provider + .lookup_provider .fetch_by_keys(&matches.keys, ¶ms.key_col, None) .await?; @@ -352,7 +352,7 @@ async fn adaptive_filtered_execute( scan_plan: Arc, task_ctx: Arc, ) -> Result> { - let provider_schema = registered.provider.schema(); + let provider_schema = registered.scan_provider.schema(); let key_col_idx = provider_key_col_idx(registered)?; let vec_col_idx = provider_schema.index_of(¶ms.vector_col).ok(); let has_vec_col = vec_col_idx.is_some(); @@ -445,7 +445,7 @@ async fn adaptive_filtered_execute( let top_keys: Vec = top_k.iter().map(|(k, _)| *k).collect(); let data_batches = registered - .provider + .lookup_provider .fetch_by_keys(&top_keys, ¶ms.key_col, None) .await?; @@ -486,7 +486,7 @@ async fn adaptive_filtered_execute( .collect(); let data_batches = registered - .provider + .lookup_provider .fetch_by_keys(&matches.keys, ¶ms.key_col, None) .await?; @@ -729,15 +729,15 @@ fn heap_select_top_k(pairs: &mut [(u64, f32)], k: usize) -> Vec<(u64, f32)> { result } -/// Index of the key column in the provider schema. +/// Index of the key column in the lookup provider schema. fn provider_key_col_idx(registered: &crate::registry::RegisteredTable) -> Result { registered - .provider + .lookup_provider .schema() .index_of(®istered.key_col) .map_err(|_| { DataFusionError::Execution(format!( - "USearchExecPlanner: key column '{}' not found in provider schema", + "USearchExecPlanner: key column '{}' not found in lookup provider schema", registered.key_col )) }) diff --git a/src/registry.rs b/src/registry.rs index 520a445..141aeca 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -8,6 +8,8 @@ use datafusion::common::Result; use datafusion::error::DataFusionError; use usearch::{Index, IndexOptions, MetricKind, ScalarKind}; +use datafusion::catalog::TableProvider; + use crate::lookup::PointLookupProvider; // ── USearchIndexConfig ──────────────────────────────────────────────────────── @@ -177,7 +179,10 @@ impl Default for USearchTableConfig { pub struct RegisteredTable { pub index: Arc, - pub provider: Arc, + /// Scan provider for WHERE evaluation and low-selectivity Parquet-native path. + pub scan_provider: Arc, + /// Lookup provider for efficient key-based row fetch (e.g. SQLite). + pub lookup_provider: Arc, pub key_col: String, pub metric: MetricKind, /// Native scalar type of the vector column. Determines which typed search @@ -213,11 +218,13 @@ impl USearchRegistry { /// [`USearchTableConfig::default()`] (ef_search=64, threshold=5%). /// /// - `index` — must already be loaded / populated. - /// - `provider` — must implement [`PointLookupProvider`]. + /// - `scan_provider` — [`TableProvider`] used for WHERE evaluation and + /// low-selectivity Parquet-native scanning. + /// - `lookup_provider` — [`PointLookupProvider`] for O(k) key-based fetch. /// [`HashKeyProvider`] is the bundled in-memory implementation. - /// For production, implement the trait on your storage engine's table type. - /// - `key_col` — column in `provider.schema()` that stores the USearch key - /// (`u64`). Supported Arrow types: `UInt64`, `Int64`, `UInt32`, `Int32`. + /// - `key_col` — column in `lookup_provider.schema()` that stores the + /// USearch key (`u64`). Supported Arrow types: `UInt64`, `Int64`, + /// `UInt32`, `Int32`. /// - `metric` — must match how the index was built. The optimizer rule /// validates this and refuses to rewrite on mismatch. /// - `scalar_kind` — native element type of the vector column (`F32` or @@ -225,11 +232,13 @@ impl USearchRegistry { /// /// [`add_with_config`]: USearchRegistry::add_with_config /// [`HashKeyProvider`]: crate::lookup::HashKeyProvider + #[allow(clippy::too_many_arguments)] pub fn add( &self, name: &str, index: Arc, - provider: Arc, + scan_provider: Arc, + lookup_provider: Arc, key_col: &str, metric: MetricKind, scalar_kind: ScalarKind, @@ -237,7 +246,8 @@ impl USearchRegistry { self.add_with_config( name, index, - provider, + scan_provider, + lookup_provider, key_col, metric, scalar_kind, @@ -254,7 +264,8 @@ impl USearchRegistry { &self, name: &str, index: Arc, - provider: Arc, + scan_provider: Arc, + lookup_provider: Arc, key_col: &str, metric: MetricKind, scalar_kind: ScalarKind, @@ -263,7 +274,7 @@ impl USearchRegistry { // Set ef_search once, here, before any query touches the index. index.change_expansion_search(config.expansion_search); - let data_schema = provider.schema(); + let data_schema = lookup_provider.schema(); let _ = data_schema.index_of(key_col).map_err(|_| { DataFusionError::Execution(format!( @@ -286,7 +297,8 @@ impl USearchRegistry { name.to_string(), Arc::new(RegisteredTable { index, - provider, + scan_provider, + lookup_provider, key_col: key_col.to_string(), metric, scalar_kind, diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 61d1e47..2273ced 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -177,6 +177,10 @@ impl SqliteLookupProvider { #[async_trait] impl PointLookupProvider for SqliteLookupProvider { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + async fn fetch_by_keys( &self, keys: &[u64], diff --git a/tests/execution.rs b/tests/execution.rs index e47a11f..6317dd5 100644 --- a/tests/execution.rs +++ b/tests/execution.rs @@ -106,6 +106,7 @@ async fn make_exec_ctx(reg_key: &str) -> SessionContext { reg_key, make_populated_index(), provider.clone(), + provider.clone(), "id", MetricKind::L2sq, ScalarKind::F32, diff --git a/tests/optimizer_rule.rs b/tests/optimizer_rule.rs index 00433db..ec98fe8 100644 --- a/tests/optimizer_rule.rs +++ b/tests/optimizer_rule.rs @@ -62,6 +62,7 @@ async fn make_ctx(metric: MetricKind) -> SessionContext { "items::vector", make_index(metric), provider.clone(), + provider.clone(), "id", metric, ScalarKind::F32, @@ -347,6 +348,7 @@ async fn make_ctx_qualified(metric: MetricKind) -> SessionContext { "datafusion::public::items::vector", make_index(metric), provider.clone(), + provider.clone(), "id", metric, ScalarKind::F32, From e1dc20a06e42e2079126540a122b3de8e215817a Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 17 Mar 2026 23:36:42 +0530 Subject: [PATCH 2/4] fix(planner): use separate key column indices for scan and lookup schemas The scan_provider and lookup_provider may have different schemas, so the key column can be at different indices. Use scan_key_col_idx when reading scan batches and lookup_key_col_idx for attach_distances on fetched rows. --- src/planner.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/planner.rs b/src/planner.rs index dc063e0..a2b94ed 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -353,7 +353,15 @@ async fn adaptive_filtered_execute( task_ctx: Arc, ) -> Result> { let provider_schema = registered.scan_provider.schema(); - let key_col_idx = provider_key_col_idx(registered)?; + // Key column index in scan_provider schema — used when reading scan batches. + let scan_key_col_idx = provider_schema.index_of(®istered.key_col).map_err(|_| { + DataFusionError::Execution(format!( + "USearchExecPlanner: key column '{}' not found in scan provider schema", + registered.key_col + )) + })?; + // Key column index in lookup_provider schema — used by attach_distances. + let lookup_key_col_idx = provider_key_col_idx(registered)?; let vec_col_idx = provider_schema.index_of(¶ms.vector_col).ok(); let has_vec_col = vec_col_idx.is_some(); @@ -371,7 +379,7 @@ async fn adaptive_filtered_execute( while let Some(batch_result) = stream.next().await { let batch = batch_result?; let mask = evaluate_filters(¶ms.physical_filters, &batch)?; - let keys = extract_keys_as_u64(batch.column(key_col_idx).as_ref())?; + let keys = extract_keys_as_u64(batch.column(scan_key_col_idx).as_ref())?; for row_idx in 0..batch.num_rows() { if !mask.is_null(row_idx) @@ -450,7 +458,7 @@ async fn adaptive_filtered_execute( .await?; let result_batches = - attach_distances(data_batches, key_col_idx, &key_to_dist, ¶ms.schema)?; + attach_distances(data_batches, lookup_key_col_idx, &key_to_dist, ¶ms.schema)?; tracing::Span::current().record( "usearch.result_count", @@ -491,7 +499,7 @@ async fn adaptive_filtered_execute( .await?; let result_batches = - attach_distances(data_batches, key_col_idx, &key_to_dist, ¶ms.schema)?; + attach_distances(data_batches, lookup_key_col_idx, &key_to_dist, ¶ms.schema)?; tracing::Span::current().record( "usearch.result_count", From d474463f65489e61422f18eea4a39213cb98f448 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 17 Mar 2026 23:41:01 +0530 Subject: [PATCH 3/4] style(planner): fix rustfmt formatting --- src/planner.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/planner.rs b/src/planner.rs index a2b94ed..9bc2cc5 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -457,8 +457,12 @@ async fn adaptive_filtered_execute( .fetch_by_keys(&top_keys, ¶ms.key_col, None) .await?; - let result_batches = - attach_distances(data_batches, lookup_key_col_idx, &key_to_dist, ¶ms.schema)?; + let result_batches = attach_distances( + data_batches, + lookup_key_col_idx, + &key_to_dist, + ¶ms.schema, + )?; tracing::Span::current().record( "usearch.result_count", @@ -498,8 +502,12 @@ async fn adaptive_filtered_execute( .fetch_by_keys(&matches.keys, ¶ms.key_col, None) .await?; - let result_batches = - attach_distances(data_batches, lookup_key_col_idx, &key_to_dist, ¶ms.schema)?; + let result_batches = attach_distances( + data_batches, + lookup_key_col_idx, + &key_to_dist, + ¶ms.schema, + )?; tracing::Span::current().record( "usearch.result_count", From 8d1214327216ac531432a286e7f5faae50bf77ac Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 17 Mar 2026 23:56:08 +0530 Subject: [PATCH 4/4] fix(registry): validate key column in scan_provider schema at registration Add scan_provider.schema().index_of(key_col) guard in add_with_config to catch misconfigured registrations early instead of at query execution time. --- src/registry.rs | 8 +++++++- tests/execution.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/registry.rs b/src/registry.rs index 141aeca..47c9fb7 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -278,7 +278,13 @@ impl USearchRegistry { let _ = data_schema.index_of(key_col).map_err(|_| { DataFusionError::Execution(format!( - "USearchRegistry: key column '{key_col}' not found in table '{name}' schema" + "USearchRegistry: key column '{key_col}' not found in lookup provider schema for table '{name}'" + )) + })?; + + let _ = scan_provider.schema().index_of(key_col).map_err(|_| { + DataFusionError::Execution(format!( + "USearchRegistry: key column '{key_col}' not found in scan provider schema for table '{name}'" )) })?; diff --git a/tests/execution.rs b/tests/execution.rs index 6317dd5..2b11e63 100644 --- a/tests/execution.rs +++ b/tests/execution.rs @@ -257,6 +257,51 @@ async fn exec_qualified_where_order_by_alias() { assert_eq!(ids[0], 1, "closest alpha row must be row 1\nids: {ids:?}"); } +// ═══════════════════════════════════════════════════════════════════════════════ +// Registration validation +// ═══════════════════════════════════════════════════════════════════════════════ + +/// Registration must fail when scan_provider schema is missing the key column. +#[tokio::test] +async fn reg_scan_provider_missing_key_col_errors() { + // scan_provider schema: only "label" and "vector" — no "id". + let scan_schema = Arc::new(Schema::new(vec![ + Field::new("label", DataType::Utf8, false), + Field::new( + "vector", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4), + false, + ), + ])); + let scan_provider = + Arc::new(HashKeyProvider::try_new(scan_schema, vec![], "label").expect("HashKeyProvider")); + + // lookup_provider has "id". + let lookup_schema = exec_schema(); + let lookup_provider = + Arc::new(HashKeyProvider::try_new(lookup_schema, vec![], "id").expect("HashKeyProvider")); + + let reg = USearchRegistry::new(); + let result = reg.add( + "test::vector", + make_populated_index(), + scan_provider, + lookup_provider, + "id", + MetricKind::L2sq, + ScalarKind::F32, + ); + assert!( + result.is_err(), + "registration must fail when scan_provider lacks key column" + ); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("scan provider"), + "error must mention scan provider: {msg}" + ); +} + /// Qualified table, WHERE clause, ORDER BY UDF directly. #[tokio::test] async fn exec_qualified_where_order_by_udf() {