Skip to content

Commit 7079e80

Browse files
committed
refactor(registry): split provider into scan_provider + lookup_provider
Separate RegisteredTable.provider into scan_provider (TableProvider for WHERE evaluation) and lookup_provider (PointLookupProvider for key-based fetch), preparing for Parquet-native adaptive filtering.
1 parent 3c3b8cf commit 7079e80

7 files changed

Lines changed: 54 additions & 20 deletions

File tree

src/lookup.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ use datafusion::physical_plan::ExecutionPlan;
2525

2626
// ── Trait ─────────────────────────────────────────────────────────────────────
2727

28-
/// A [`TableProvider`] that guarantees efficient row retrieval by primary key.
28+
/// Trait for efficient row retrieval by primary key.
2929
///
3030
/// Implementors provide O(k) or O(k log N) row lookups — no full-table scan.
3131
/// The `USearchRegistry` requires this trait instead of a bare `TableProvider`
3232
/// to enforce the performance contract at registration time.
3333
///
3434
/// # Contract
3535
///
36+
/// - `schema()` MUST return the Arrow schema of the rows returned by
37+
/// `fetch_by_keys` (when `projection` is `None`).
3638
/// - `fetch_by_keys` MUST return only rows whose key column value is in `keys`.
3739
/// - Keys not found in the table are silently omitted — not an error.
3840
/// - Returned batches must use a schema consistent with `self.schema()`. When
@@ -46,6 +48,8 @@ use datafusion::physical_plan::ExecutionPlan;
4648
/// ```rust,ignore
4749
/// #[async_trait]
4850
/// impl PointLookupProvider for MyEngineTable {
51+
/// fn schema(&self) -> SchemaRef { self.schema.clone() }
52+
///
4953
/// async fn fetch_by_keys(
5054
/// &self,
5155
/// keys: &[u64],
@@ -60,7 +64,10 @@ use datafusion::physical_plan::ExecutionPlan;
6064
/// }
6165
/// ```
6266
#[async_trait]
63-
pub trait PointLookupProvider: TableProvider + Send + Sync {
67+
pub trait PointLookupProvider: Send + Sync {
68+
/// Arrow schema of the rows this provider returns (without `_distance`).
69+
fn schema(&self) -> SchemaRef;
70+
6471
async fn fetch_by_keys(
6572
&self,
6673
keys: &[u64],
@@ -145,6 +152,10 @@ impl fmt::Debug for HashKeyProvider {
145152

146153
#[async_trait]
147154
impl PointLookupProvider for HashKeyProvider {
155+
fn schema(&self) -> SchemaRef {
156+
self.schema.clone()
157+
}
158+
148159
async fn fetch_by_keys(
149160
&self,
150161
keys: &[u64],

src/parquet_provider.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ async fn load_metadata_cache(
148148

149149
#[async_trait]
150150
impl PointLookupProvider for ParquetLookupProvider {
151+
fn schema(&self) -> SchemaRef {
152+
self.schema.clone()
153+
}
154+
151155
async fn fetch_by_keys(
152156
&self,
153157
keys: &[u64],

src/planner.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ impl ExtensionPlanner for USearchExecPlanner {
145145
let provider_scan = if !node.filters.is_empty() {
146146
Some(
147147
registered
148-
.provider
148+
.scan_provider
149149
.scan(session_state, None, &[], None)
150150
.await?,
151151
)
@@ -313,7 +313,7 @@ async fn usearch_execute(
313313
.collect();
314314

315315
let data_batches = registered
316-
.provider
316+
.lookup_provider
317317
.fetch_by_keys(&matches.keys, &params.key_col, None)
318318
.await?;
319319

@@ -352,7 +352,7 @@ async fn adaptive_filtered_execute(
352352
scan_plan: Arc<dyn ExecutionPlan>,
353353
task_ctx: Arc<TaskContext>,
354354
) -> Result<Vec<RecordBatch>> {
355-
let provider_schema = registered.provider.schema();
355+
let provider_schema = registered.scan_provider.schema();
356356
let key_col_idx = provider_key_col_idx(registered)?;
357357
let vec_col_idx = provider_schema.index_of(&params.vector_col).ok();
358358
let has_vec_col = vec_col_idx.is_some();
@@ -445,7 +445,7 @@ async fn adaptive_filtered_execute(
445445
let top_keys: Vec<u64> = top_k.iter().map(|(k, _)| *k).collect();
446446

447447
let data_batches = registered
448-
.provider
448+
.lookup_provider
449449
.fetch_by_keys(&top_keys, &params.key_col, None)
450450
.await?;
451451

@@ -486,7 +486,7 @@ async fn adaptive_filtered_execute(
486486
.collect();
487487

488488
let data_batches = registered
489-
.provider
489+
.lookup_provider
490490
.fetch_by_keys(&matches.keys, &params.key_col, None)
491491
.await?;
492492

@@ -729,15 +729,15 @@ fn heap_select_top_k(pairs: &mut [(u64, f32)], k: usize) -> Vec<(u64, f32)> {
729729
result
730730
}
731731

732-
/// Index of the key column in the provider schema.
732+
/// Index of the key column in the lookup provider schema.
733733
fn provider_key_col_idx(registered: &crate::registry::RegisteredTable) -> Result<usize> {
734734
registered
735-
.provider
735+
.lookup_provider
736736
.schema()
737737
.index_of(&registered.key_col)
738738
.map_err(|_| {
739739
DataFusionError::Execution(format!(
740-
"USearchExecPlanner: key column '{}' not found in provider schema",
740+
"USearchExecPlanner: key column '{}' not found in lookup provider schema",
741741
registered.key_col
742742
))
743743
})

src/registry.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use datafusion::common::Result;
88
use datafusion::error::DataFusionError;
99
use usearch::{Index, IndexOptions, MetricKind, ScalarKind};
1010

11+
use datafusion::catalog::TableProvider;
12+
1113
use crate::lookup::PointLookupProvider;
1214

1315
// ── USearchIndexConfig ────────────────────────────────────────────────────────
@@ -177,7 +179,10 @@ impl Default for USearchTableConfig {
177179

178180
pub struct RegisteredTable {
179181
pub index: Arc<Index>,
180-
pub provider: Arc<dyn PointLookupProvider>,
182+
/// Scan provider for WHERE evaluation and low-selectivity Parquet-native path.
183+
pub scan_provider: Arc<dyn TableProvider>,
184+
/// Lookup provider for efficient key-based row fetch (e.g. SQLite).
185+
pub lookup_provider: Arc<dyn PointLookupProvider>,
181186
pub key_col: String,
182187
pub metric: MetricKind,
183188
/// Native scalar type of the vector column. Determines which typed search
@@ -213,31 +218,36 @@ impl USearchRegistry {
213218
/// [`USearchTableConfig::default()`] (ef_search=64, threshold=5%).
214219
///
215220
/// - `index` — must already be loaded / populated.
216-
/// - `provider` — must implement [`PointLookupProvider`].
221+
/// - `scan_provider` — [`TableProvider`] used for WHERE evaluation and
222+
/// low-selectivity Parquet-native scanning.
223+
/// - `lookup_provider` — [`PointLookupProvider`] for O(k) key-based fetch.
217224
/// [`HashKeyProvider`] is the bundled in-memory implementation.
218-
/// For production, implement the trait on your storage engine's table type.
219-
/// - `key_col` — column in `provider.schema()` that stores the USearch key
220-
/// (`u64`). Supported Arrow types: `UInt64`, `Int64`, `UInt32`, `Int32`.
225+
/// - `key_col` — column in `lookup_provider.schema()` that stores the
226+
/// USearch key (`u64`). Supported Arrow types: `UInt64`, `Int64`,
227+
/// `UInt32`, `Int32`.
221228
/// - `metric` — must match how the index was built. The optimizer rule
222229
/// validates this and refuses to rewrite on mismatch.
223230
/// - `scalar_kind` — native element type of the vector column (`F32` or
224231
/// `F64`). Controls which typed search method the planner dispatches to.
225232
///
226233
/// [`add_with_config`]: USearchRegistry::add_with_config
227234
/// [`HashKeyProvider`]: crate::lookup::HashKeyProvider
235+
#[allow(clippy::too_many_arguments)]
228236
pub fn add(
229237
&self,
230238
name: &str,
231239
index: Arc<Index>,
232-
provider: Arc<dyn PointLookupProvider>,
240+
scan_provider: Arc<dyn TableProvider>,
241+
lookup_provider: Arc<dyn PointLookupProvider>,
233242
key_col: &str,
234243
metric: MetricKind,
235244
scalar_kind: ScalarKind,
236245
) -> Result<()> {
237246
self.add_with_config(
238247
name,
239248
index,
240-
provider,
249+
scan_provider,
250+
lookup_provider,
241251
key_col,
242252
metric,
243253
scalar_kind,
@@ -254,7 +264,8 @@ impl USearchRegistry {
254264
&self,
255265
name: &str,
256266
index: Arc<Index>,
257-
provider: Arc<dyn PointLookupProvider>,
267+
scan_provider: Arc<dyn TableProvider>,
268+
lookup_provider: Arc<dyn PointLookupProvider>,
258269
key_col: &str,
259270
metric: MetricKind,
260271
scalar_kind: ScalarKind,
@@ -263,7 +274,7 @@ impl USearchRegistry {
263274
// Set ef_search once, here, before any query touches the index.
264275
index.change_expansion_search(config.expansion_search);
265276

266-
let data_schema = provider.schema();
277+
let data_schema = lookup_provider.schema();
267278

268279
let _ = data_schema.index_of(key_col).map_err(|_| {
269280
DataFusionError::Execution(format!(
@@ -286,7 +297,8 @@ impl USearchRegistry {
286297
name.to_string(),
287298
Arc::new(RegisteredTable {
288299
index,
289-
provider,
300+
scan_provider,
301+
lookup_provider,
290302
key_col: key_col.to_string(),
291303
metric,
292304
scalar_kind,

src/sqlite_provider.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ impl SqliteLookupProvider {
177177

178178
#[async_trait]
179179
impl PointLookupProvider for SqliteLookupProvider {
180+
fn schema(&self) -> SchemaRef {
181+
self.schema.clone()
182+
}
183+
180184
async fn fetch_by_keys(
181185
&self,
182186
keys: &[u64],

tests/execution.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ async fn make_exec_ctx(reg_key: &str) -> SessionContext {
106106
reg_key,
107107
make_populated_index(),
108108
provider.clone(),
109+
provider.clone(),
109110
"id",
110111
MetricKind::L2sq,
111112
ScalarKind::F32,

tests/optimizer_rule.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async fn make_ctx(metric: MetricKind) -> SessionContext {
6262
"items::vector",
6363
make_index(metric),
6464
provider.clone(),
65+
provider.clone(),
6566
"id",
6667
metric,
6768
ScalarKind::F32,
@@ -347,6 +348,7 @@ async fn make_ctx_qualified(metric: MetricKind) -> SessionContext {
347348
"datafusion::public::items::vector",
348349
make_index(metric),
349350
provider.clone(),
351+
provider.clone(),
350352
"id",
351353
metric,
352354
ScalarKind::F32,

0 commit comments

Comments
 (0)