feat: Parquet-native adaptive filter for vector search#7
Conversation
Pre-scan uses projection (scalar + _key only, no vector column) with filter pushdown for selectivity estimation. Low-selectivity path (<= 5%) bypasses USearch and SQLite entirely — streams full Parquet scan, evaluates filters inline, computes distances, and maintains a top-k heap (ScoredRow) to return results directly. High-selectivity path unchanged: HNSW filtered_search → SQLite fetch. - Remove old brute-force path and heap_select_top_k helper - Add parquet_native_execute() for the low-selectivity path - Add ScoredRow struct with Ord impl for max-heap eviction
Report provider_scan and full_scan as children so DataFusion's physical optimizer can traverse and optimize the scan plans. with_new_children correctly replaces whichever scans are present.
| skip_all, | ||
| fields(usearch.table = %params.table_name) | ||
| )] | ||
| async fn parquet_native_execute( |
There was a problem hiding this comment.
P1 — No test coverage for this path.
parquet_native_execute is never exercised by the existing test suite. The test fixtures use a 4-row index with default brute_force_selectivity_threshold = 0.05. Every WHERE-clause test (e.g. label != 'alpha') passes 2 of 4 rows → 50% selectivity, which always takes the filtered_search branch. Any bug here — including the schema-mismatch that would occur if lookup_provider includes the vector column — would be silent.
A minimal test needs to force the parquet-native branch. The cleanest way is add_with_config with brute_force_selectivity_threshold: 1.0 and a lookup provider whose schema excludes the vector column (matching the intended Parquet+SQLite deployment). Without it the new path is effectively untested code that ships in main.
There was a problem hiding this comment.
Review
Issues
src/planner.rs, line 522 — parquet_native_execute has no test coverage
The new low-selectivity path is never reached by any existing test. All WHERE-clause tests use a 4-row index with the default 5% threshold; any predicate matching even one row produces ≥ 25% selectivity and takes the filtered_search branch. The parquet_native_execute function is dead code from the test suite's perspective.
This matters beyond coverage: the path has a latent schema-mismatch bug. When the same provider is used for both scan_provider and lookup_provider (e.g. HashKeyProvider with the full schema including the vector column), params.schema has N+1 fields (scalar + vector + _distance), but output_col_indices strips the vector column, so all_cols has only N columns. RecordBatch::try_new would fail at runtime. A test would catch this immediately.
Action Required
Add at least one end-to-end test that forces the parquet-native branch:
- Use
USearchRegistry::add_with_configwithbrute_force_selectivity_threshold: 1.0to always take the low-sel path. - Use a
lookup_providerwhose schema excludes the vector column (matching the intended Parquet+SQLite deployment) soparams.schemaaligns with whatparquet_native_executeproduces. - Assert that the returned rows and distances are correct.
Force the low-selectivity path via brute_force_selectivity_threshold=1.0 with a lookup provider that excludes the vector column (matching the real Parquet+SQLite deployment). Four new tests: - WHERE exclusion with distance ordering - Equality filter with distance ordering - LIMIT < matching rows (heap eviction) - WHERE with no matches (empty result)
| let mut all_cols = concat_cols; | ||
| all_cols.push(Arc::new(Float32Array::from(distances))); | ||
|
|
||
| let result_batch = RecordBatch::try_new(params.schema.clone(), all_cols) |
There was a problem hiding this comment.
P1 — Silent schema mismatch
all_cols is derived by stripping the vector column from the full scan schema and appending _distance. params.schema (= registered.schema) is built from the lookup_provider schema + _distance. These two agree only if the scan provider and lookup provider have non-vector columns in the same order and with the same types.
If the orderings differ, RecordBatch::try_new will either:
- panic/error if field types don't match positionally, or
- silently assign the wrong data to each column if the types happen to match (e.g., two
UInt64columns swapped).
There's no validation anywhere that enforces this contract. Either add an explicit column-name-to-index mapping at the start of this function (look up each field of params.schema by name in full_schema), or validate at registration time that the scan provider's non-vector columns are a superset of the lookup provider schema with matching order.
// Safe approach: build output_col_indices by looking up each
// lookup_provider field name in the full_scan schema.
let lookup_schema = registered.lookup_provider.schema();
let output_col_indices: Vec<usize> = lookup_schema
.fields()
.iter()
.map(|f| full_schema.index_of(f.name()).map_err(|_| {
DataFusionError::Execution(format!(
"USearchExec: column '{}' from lookup schema not found in full scan schema",
f.name()
))
}))
.collect::<Result<_>>()?;| // Max-heap: largest distance at top → gets evicted first. | ||
| self.distance | ||
| .partial_cmp(&other.distance) | ||
| .unwrap_or(std::cmp::Ordering::Less) |
There was a problem hiding this comment.
P2 — NaN distance sinks to bottom of max-heap instead of being evicted
unwrap_or(Ordering::Less) means a NaN distance compares as less than everything. In a max-heap the root is the greatest element, so NaN rows sink to the bottom and are never evicted — they stay in the heap indefinitely and leak into results.
The existing guard (Err(_) => continue) handles null vectors, but compute_distance_for_row can still return Ok(NaN) if the stored vector contains NaN values (both the L2sq and NegativeDot formulas propagate NaN from inputs). Map NaN to Ordering::Greater so that NaN rows are treated as infinitely far and evicted first:
self.distance
.partial_cmp(&other.distance)
.unwrap_or(std::cmp::Ordering::Greater)Or guard earlier, before the push:
if dist.is_nan() {
continue;
}There was a problem hiding this comment.
Review
Issues
P1 — Silent schema mismatch (src/planner.rs line 623)
parquet_native_execute builds output columns by stripping the vector column from the full scan schema, then calls RecordBatch::try_new(params.schema, ...) where params.schema is derived from the lookup provider. These agree only if the scan provider and lookup provider share identical non-vector column ordering and types — a contract that is implicit, undocumented at the API boundary, and unvalidated at registration time.
If the orderings differ, RecordBatch::try_new either errors with a type mismatch or — worse — succeeds silently with data assigned to the wrong columns. The existing tests don't catch this because they happen to use matching schemas. The fix is to build output_col_indices by looking up each lookup-provider field name in full_schema by name rather than by positional exclusion.
P2 — NaN distances accumulate in top-k heap (src/planner.rs line 654)
ScoredRow::Ord maps NaN comparison to Ordering::Less, so NaN-distance rows sink to the bottom of the max-heap and are never evicted. They silently appear in results. The null-vector guard (Err(_) => continue) doesn't cover the case where stored vectors contain NaN values, which L2sq and dot-product distance formulas propagate. Use Ordering::Greater so NaN rows are treated as maximally distant and evicted first, or add an explicit if dist.is_nan() { continue } guard before the push.
Action Required
- Fix
output_col_indicesderivation to use name-based lookup againstfull_schemarather than positional exclusion. - Fix NaN handling in
ScoredRow::Ord(or guard before heap push).
Update usage examples for the scan_provider + lookup_provider API. Document the three-path adaptive filtering (unfiltered HNSW, high-sel HNSW filtered, low-sel Parquet-native). Remove stale DataFusion 51 API notes and benchmark section.
| // This matches the lookup_provider schema (same column order). | ||
| let output_col_indices: Vec<usize> = (0..full_schema.fields().len()) | ||
| .filter(|&i| i != vec_col_idx) | ||
| .collect(); |
There was a problem hiding this comment.
P1 — schema mismatch at runtime for real deployments
output_col_indices is built by positionally dropping the vector column from scan_provider.schema(). The resulting columns are then assembled into a RecordBatch using params.schema, which is derived from lookup_provider.schema() (see registry.rs:277–297).
This only works when scan_provider.schema() minus the vector column equals lookup_provider.schema() — same fields, same order. That invariant is never validated. For a typical Parquet + SQLite deployment where the Parquet file has additional columns not cached in SQLite (e.g. body, category, year), this produces a column-count mismatch and RecordBatch::try_new fails at query time.
Fix options:
- Validate at registration (
add_with_config) thatscan_provider.schema()minusvector_col==lookup_provider.schema(), and error early with a clear message. - Project the scan output by name, using
lookup_provider.schema()field names to pick columns from the batch (order-independent and tolerant of extra scan columns).
- Use name-based column lookup from lookup_provider schema into full scan schema instead of positional filtering. Prevents silent column mismatches if the two schemas have different orderings. - Skip NaN distances before pushing to the top-k heap. Prevents NaN rows from sinking to the bottom and never being evicted.
There was a problem hiding this comment.
Review
Issues
P1 — parquet_native_execute schema assumption is unvalidated (src/planner.rs:536-540, 623)
registered.schema (the expected output schema) is built from lookup_provider.schema() + _distance at registration time (registry.rs:277-297). parquet_native_execute builds its output by positionally stripping the vector column from scan_provider.schema(). These two match only when scan_provider.schema() minus the vector column equals lookup_provider.schema() — same fields, same order.
That invariant is never checked. For a typical Parquet + SQLite deployment where the Parquet file has columns the SQLite table does not cache (e.g. body, category, year), the column count diverges and RecordBatch::try_new(params.schema.clone(), all_cols) errors at query execution time. The test exercises only the narrow case where schemas match exactly.
Fix options: (a) validate at add_with_config that scan_provider.schema() minus vector_col equals lookup_provider.schema() field-for-field, failing fast at registration with a clear message; or (b) project the scan output by name using lookup_provider.schema() field names rather than positional indices.
P2 — Two stale doc comments (not in diff, behavior changed by this PR)
- src/planner.rs header comment (lines 14-20): still describes the old low-selectivity path ("use pre-computed (key, distance) pairs, heap-select top-k, then fetch_by_keys"). The new path streams a second full scan.
- src/registry.rs USearchTableConfig::brute_force_selectivity_threshold doc: still says "exact brute-force search over only the rows that pass the WHERE filter." Should describe the new behavior (bypass HNSW and stream the full scan provider).
Action Required
Resolve the P1 schema contract before merging. The inline comment on planner.rs:540 has the details.
Update planner.rs header to describe the three execution paths (unfiltered, high-sel HNSW filtered, low-sel Parquet-native). Update registry.rs brute_force_selectivity_threshold doc to describe the scan_provider-based path instead of the old brute-force approach.
| } | ||
| } | ||
| impl Ord for ScoredRow { | ||
| fn cmp(&self, other: &Self) -> std::cmp::Ordering { | ||
| // Max-heap: largest distance at top → gets evicted first. |
There was a problem hiding this comment.
The comment "NaN sorts to the top of the heap so it gets evicted first" is incorrect. unwrap_or(Ordering::Less) treats NaN as less than everything, so NaN rows would sink to the bottom of the max-heap and stay. The behaviour is harmless in practice because NaN distances are filtered out before the heap.push call, but the comment is misleading and the code would be wrong if that guard were ever removed.
| let mut stream = scan_plan.execute(0, task_ctx.clone())?; | ||
|
|
||
| // Evaluate filters and collect valid rows (stream batch-by-batch, O(1) memory). | ||
| let mut valid_keys: HashSet<u64> = HashSet::new(); |
There was a problem hiding this comment.
P2 (non-blocking): valid_keys is always collected as a HashSet<u64> during the pre-scan. For the low-selectivity path, only valid_keys.len() and .is_empty() are used — the key set itself is discarded before parquet_native_execute (which re-evaluates filters independently). The set is only needed as a predicate closure in the high-selectivity HNSW path.
At ≤5% selectivity the set holds at most ~5% of n entries, so memory impact is bounded, but a usize counter (plus an is_empty sentinel) would suffice for the low-sel path and halves allocations on that branch. Consider splitting the pre-scan into a count-only phase that short-circuits to parquet_native_execute without materializing the full set, upgrading to a set only when the high-sel path is chosen.
| // At low selectivity (<=5%), the number of passing rows is small. | ||
| let mut heap: BinaryHeap<ScoredRow> = BinaryHeap::with_capacity(params.k + 1); | ||
|
|
||
| let mut stream = full_scan.execute(0, task_ctx)?; |
There was a problem hiding this comment.
P3 (non-blocking): full_scan.execute(0, task_ctx) reads only partition 0, same as the pre-scan on line 413. For a multi-partition TableProvider (e.g. a Parquet directory with multiple files), rows in partitions > 0 are silently skipped on both passes. This is a pre-existing limitation in the codebase — not introduced by this PR — but the new Parquet-native path adds a second instance of it. Worth tracking as a follow-up once multi-partition support is addressed.
| row: Vec<Arc<dyn Array>>, | ||
| } | ||
|
|
||
| impl PartialEq for ScoredRow { |
There was a problem hiding this comment.
P3 (non-blocking): PartialEq compares only distance, so two ScoredRow instances with equal distance but different row data compare as ==. This is harmless for BinaryHeap (Rust's heap only uses Ord for ordering decisions), but it means == on ScoredRow doesn't express structural equality, which could be surprising for future maintainers or if ScoredRow is ever used in a set/map. Consider adding a comment clarifying that PartialEq/Eq here are intentionally distance-only stubs required by Ord.
Summary
Architecture
Test plan
cargo fmt --checkcargo clippy -- -D warningscargo test— all 20 tests pass