-
Notifications
You must be signed in to change notification settings - Fork 0
refactor(registry): split provider into scan_provider + lookup_provider #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7079e80
e1dc20a
d474463
8d12143
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,8 @@ use datafusion::common::Result; | |
| use datafusion::error::DataFusionError; | ||
| use usearch::{Index, IndexOptions, MetricKind, ScalarKind}; | ||
|
|
||
| use datafusion::catalog::TableProvider; | ||
|
|
||
| use crate::lookup::PointLookupProvider; | ||
|
|
||
| // ── USearchIndexConfig ──────────────────────────────────────────────────────── | ||
|
|
@@ -177,7 +179,10 @@ impl Default for USearchTableConfig { | |
|
|
||
| pub struct RegisteredTable { | ||
| pub index: Arc<Index>, | ||
| pub provider: Arc<dyn PointLookupProvider>, | ||
| /// Scan provider for WHERE evaluation and low-selectivity Parquet-native path. | ||
| pub scan_provider: Arc<dyn TableProvider>, | ||
| /// Lookup provider for efficient key-based row fetch (e.g. SQLite). | ||
| pub lookup_provider: Arc<dyn PointLookupProvider>, | ||
| pub key_col: String, | ||
| pub metric: MetricKind, | ||
| /// Native scalar type of the vector column. Determines which typed search | ||
|
|
@@ -213,31 +218,36 @@ impl USearchRegistry { | |
| /// [`USearchTableConfig::default()`] (ef_search=64, threshold=5%). | ||
| /// | ||
| /// - `index` — must already be loaded / populated. | ||
| /// - `provider` — must implement [`PointLookupProvider`]. | ||
| /// - `scan_provider` — [`TableProvider`] used for WHERE evaluation and | ||
| /// low-selectivity Parquet-native scanning. | ||
| /// - `lookup_provider` — [`PointLookupProvider`] for O(k) key-based fetch. | ||
| /// [`HashKeyProvider`] is the bundled in-memory implementation. | ||
| /// For production, implement the trait on your storage engine's table type. | ||
| /// - `key_col` — column in `provider.schema()` that stores the USearch key | ||
| /// (`u64`). Supported Arrow types: `UInt64`, `Int64`, `UInt32`, `Int32`. | ||
| /// - `key_col` — column in `lookup_provider.schema()` that stores the | ||
| /// USearch key (`u64`). Supported Arrow types: `UInt64`, `Int64`, | ||
| /// `UInt32`, `Int32`. | ||
| /// - `metric` — must match how the index was built. The optimizer rule | ||
| /// validates this and refuses to rewrite on mismatch. | ||
| /// - `scalar_kind` — native element type of the vector column (`F32` or | ||
| /// `F64`). Controls which typed search method the planner dispatches to. | ||
| /// | ||
| /// [`add_with_config`]: USearchRegistry::add_with_config | ||
| /// [`HashKeyProvider`]: crate::lookup::HashKeyProvider | ||
| #[allow(clippy::too_many_arguments)] | ||
| pub fn add( | ||
| &self, | ||
| name: &str, | ||
| index: Arc<Index>, | ||
| provider: Arc<dyn PointLookupProvider>, | ||
| scan_provider: Arc<dyn TableProvider>, | ||
| lookup_provider: Arc<dyn PointLookupProvider>, | ||
| key_col: &str, | ||
| metric: MetricKind, | ||
| scalar_kind: ScalarKind, | ||
| ) -> Result<()> { | ||
| self.add_with_config( | ||
| name, | ||
| index, | ||
| provider, | ||
| scan_provider, | ||
| lookup_provider, | ||
| key_col, | ||
| metric, | ||
| scalar_kind, | ||
|
|
@@ -254,7 +264,8 @@ impl USearchRegistry { | |
| &self, | ||
| name: &str, | ||
| index: Arc<Index>, | ||
| provider: Arc<dyn PointLookupProvider>, | ||
| scan_provider: Arc<dyn TableProvider>, | ||
| lookup_provider: Arc<dyn PointLookupProvider>, | ||
| key_col: &str, | ||
| metric: MetricKind, | ||
| scalar_kind: ScalarKind, | ||
|
|
@@ -263,11 +274,17 @@ impl USearchRegistry { | |
| // Set ef_search once, here, before any query touches the index. | ||
| index.change_expansion_search(config.expansion_search); | ||
|
|
||
| let data_schema = provider.schema(); | ||
| let data_schema = lookup_provider.schema(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1 — Missing validation of
The whole motivation of this PR is to allow different providers for scan vs lookup; the tests mask the gap because they pass the same Fix: add an equivalent guard for scan_provider
.schema()
.index_of(key_col)
.map_err(|_| {
DataFusionError::Execution(format!(
"USearchRegistry: key column '{key_col}' not found in scan provider schema for table '{name}'"
))
})?; |
||
|
|
||
| let _ = data_schema.index_of(key_col).map_err(|_| { | ||
| DataFusionError::Execution(format!( | ||
| "USearchRegistry: key column '{key_col}' not found in table '{name}' schema" | ||
| "USearchRegistry: key column '{key_col}' not found in lookup provider schema for table '{name}'" | ||
| )) | ||
| })?; | ||
|
|
||
| let _ = scan_provider.schema().index_of(key_col).map_err(|_| { | ||
| DataFusionError::Execution(format!( | ||
| "USearchRegistry: key column '{key_col}' not found in scan provider schema for table '{name}'" | ||
| )) | ||
| })?; | ||
|
|
||
|
|
@@ -286,7 +303,8 @@ impl USearchRegistry { | |
| name.to_string(), | ||
| Arc::new(RegisteredTable { | ||
| index, | ||
| provider, | ||
| scan_provider, | ||
| lookup_provider, | ||
| key_col: key_col.to_string(), | ||
| metric, | ||
| scalar_kind, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,6 +106,7 @@ async fn make_exec_ctx(reg_key: &str) -> SessionContext { | |
| reg_key, | ||
| make_populated_index(), | ||
| provider.clone(), | ||
| provider.clone(), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both |
||
| "id", | ||
| MetricKind::L2sq, | ||
| ScalarKind::F32, | ||
|
|
@@ -256,6 +257,51 @@ async fn exec_qualified_where_order_by_alias() { | |
| assert_eq!(ids[0], 1, "closest alpha row must be row 1\nids: {ids:?}"); | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // Registration validation | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /// Registration must fail when scan_provider schema is missing the key column. | ||
| #[tokio::test] | ||
| async fn reg_scan_provider_missing_key_col_errors() { | ||
| // scan_provider schema: only "label" and "vector" — no "id". | ||
| let scan_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("label", DataType::Utf8, false), | ||
| Field::new( | ||
| "vector", | ||
| DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4), | ||
| false, | ||
| ), | ||
| ])); | ||
| let scan_provider = | ||
| Arc::new(HashKeyProvider::try_new(scan_schema, vec![], "label").expect("HashKeyProvider")); | ||
|
|
||
| // lookup_provider has "id". | ||
| let lookup_schema = exec_schema(); | ||
| let lookup_provider = | ||
| Arc::new(HashKeyProvider::try_new(lookup_schema, vec![], "id").expect("HashKeyProvider")); | ||
|
|
||
| let reg = USearchRegistry::new(); | ||
| let result = reg.add( | ||
| "test::vector", | ||
| make_populated_index(), | ||
| scan_provider, | ||
| lookup_provider, | ||
| "id", | ||
| MetricKind::L2sq, | ||
| ScalarKind::F32, | ||
| ); | ||
| assert!( | ||
| result.is_err(), | ||
| "registration must fail when scan_provider lacks key column" | ||
| ); | ||
| let msg = result.unwrap_err().to_string(); | ||
| assert!( | ||
| msg.contains("scan provider"), | ||
| "error must mention scan provider: {msg}" | ||
| ); | ||
| } | ||
|
|
||
| /// Qualified table, WHERE clause, ORDER BY UDF directly. | ||
| #[tokio::test] | ||
| async fn exec_qualified_where_order_by_udf() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1 —
key_col_idxfromlookup_provideris applied toscan_providerbatchesprovider_schemais correctly taken fromscan_providerhere, but the very next line:calls
lookup_provider.schema().index_of(key_col). That index is then used at the scan-phase loop to readbatch.column(key_col_idx)wherebatchcomes fromscan_plan(i.e.scan_provider). When the two providers have different schemas — the exact split this PR is designed to enable — the column positions can differ, silently reading the wrong column or panicking.The tests don't catch this because both test registrations pass
provider.clone()for both arguments, making the schemas identical.Fix: derive two separate indices:
Use
scan_key_col_idxin the scan loop andlookup_key_col_idxat the twoattach_distances(…)call sites.