Skip to content

feat: Parquet-native adaptive filter for vector search#7

Merged
anoop-narang merged 6 commits into
mainfrom
feat/parquet-native-adaptive-filter
Mar 18, 2026
Merged

feat: Parquet-native adaptive filter for vector search#7
anoop-narang merged 6 commits into
mainfrom
feat/parquet-native-adaptive-filter

Conversation

@anoop-narang
Copy link
Copy Markdown
Collaborator

Summary

  • Pre-scan now uses projection (scalar + _key only, excludes vector column) with filter pushdown for selectivity estimation
  • New low-selectivity path (≤5%) bypasses USearch and SQLite entirely — streams full Parquet scan, evaluates filters inline, computes distances, and maintains a top-k heap to return results directly
  • USearchExec now exposes scan plans as children for DataFusion physical optimizer visibility

Architecture

Query with WHERE clause
  ├─ Pre-scan: Parquet (scalar + _key cols, filter pushdown)
  │   → collect valid_keys, compute selectivity
  │
  ├─ Low selectivity (≤5%)
  │   → Full Parquet scan (all cols + vector, filter pushdown)
  │   → evaluate WHERE, compute distances, top-k heap
  │   → return directly — NO USearch, NO SQLite
  │
  └─ High selectivity (>5%)
      → HNSW filtered_search(valid_keys) → SQLite fetch

Test plan

  • cargo fmt --check
  • cargo clippy -- -D warnings
  • cargo test — all 20 tests pass
  • Integration test with runtimedb after rev bump

Pre-scan uses projection (scalar + _key only, no vector column) with
filter pushdown for selectivity estimation. Low-selectivity path
(<= 5%) bypasses USearch and SQLite entirely — streams full Parquet
scan, evaluates filters inline, computes distances, and maintains a
top-k heap (ScoredRow) to return results directly.

High-selectivity path unchanged: HNSW filtered_search → SQLite fetch.

- Remove old brute-force path and heap_select_top_k helper
- Add parquet_native_execute() for the low-selectivity path
- Add ScoredRow struct with Ord impl for max-heap eviction
Report provider_scan and full_scan as children so DataFusion's
physical optimizer can traverse and optimize the scan plans.
with_new_children correctly replaces whichever scans are present.
Comment thread src/planner.rs
skip_all,
fields(usearch.table = %params.table_name)
)]
async fn parquet_native_execute(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — No test coverage for this path.

parquet_native_execute is never exercised by the existing test suite. The test fixtures use a 4-row index with default brute_force_selectivity_threshold = 0.05. Every WHERE-clause test (e.g. label != 'alpha') passes 2 of 4 rows → 50% selectivity, which always takes the filtered_search branch. Any bug here — including the schema-mismatch that would occur if lookup_provider includes the vector column — would be silent.

A minimal test needs to force the parquet-native branch. The cleanest way is add_with_config with brute_force_selectivity_threshold: 1.0 and a lookup provider whose schema excludes the vector column (matching the intended Parquet+SQLite deployment). Without it the new path is effectively untested code that ships in main.

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

src/planner.rs, line 522 — parquet_native_execute has no test coverage

The new low-selectivity path is never reached by any existing test. All WHERE-clause tests use a 4-row index with the default 5% threshold; any predicate matching even one row produces ≥ 25% selectivity and takes the filtered_search branch. The parquet_native_execute function is dead code from the test suite's perspective.

This matters beyond coverage: the path has a latent schema-mismatch bug. When the same provider is used for both scan_provider and lookup_provider (e.g. HashKeyProvider with the full schema including the vector column), params.schema has N+1 fields (scalar + vector + _distance), but output_col_indices strips the vector column, so all_cols has only N columns. RecordBatch::try_new would fail at runtime. A test would catch this immediately.

Action Required

Add at least one end-to-end test that forces the parquet-native branch:

  • Use USearchRegistry::add_with_config with brute_force_selectivity_threshold: 1.0 to always take the low-sel path.
  • Use a lookup_provider whose schema excludes the vector column (matching the intended Parquet+SQLite deployment) so params.schema aligns with what parquet_native_execute produces.
  • Assert that the returned rows and distances are correct.

Force the low-selectivity path via brute_force_selectivity_threshold=1.0
with a lookup provider that excludes the vector column (matching the
real Parquet+SQLite deployment). Four new tests:

- WHERE exclusion with distance ordering
- Equality filter with distance ordering
- LIMIT < matching rows (heap eviction)
- WHERE with no matches (empty result)
Comment thread src/planner.rs
let mut all_cols = concat_cols;
all_cols.push(Arc::new(Float32Array::from(distances)));

let result_batch = RecordBatch::try_new(params.schema.clone(), all_cols)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Silent schema mismatch

all_cols is derived by stripping the vector column from the full scan schema and appending _distance. params.schema (= registered.schema) is built from the lookup_provider schema + _distance. These two agree only if the scan provider and lookup provider have non-vector columns in the same order and with the same types.

If the orderings differ, RecordBatch::try_new will either:

  • panic/error if field types don't match positionally, or
  • silently assign the wrong data to each column if the types happen to match (e.g., two UInt64 columns swapped).

There's no validation anywhere that enforces this contract. Either add an explicit column-name-to-index mapping at the start of this function (look up each field of params.schema by name in full_schema), or validate at registration time that the scan provider's non-vector columns are a superset of the lookup provider schema with matching order.

// Safe approach: build output_col_indices by looking up each
// lookup_provider field name in the full_scan schema.
let lookup_schema = registered.lookup_provider.schema();
let output_col_indices: Vec<usize> = lookup_schema
    .fields()
    .iter()
    .map(|f| full_schema.index_of(f.name()).map_err(|_| {
        DataFusionError::Execution(format!(
            "USearchExec: column '{}' from lookup schema not found in full scan schema",
            f.name()
        ))
    }))
    .collect::<Result<_>>()?;

Comment thread src/planner.rs
// Max-heap: largest distance at top → gets evicted first.
self.distance
.partial_cmp(&other.distance)
.unwrap_or(std::cmp::Ordering::Less)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — NaN distance sinks to bottom of max-heap instead of being evicted

unwrap_or(Ordering::Less) means a NaN distance compares as less than everything. In a max-heap the root is the greatest element, so NaN rows sink to the bottom and are never evicted — they stay in the heap indefinitely and leak into results.

The existing guard (Err(_) => continue) handles null vectors, but compute_distance_for_row can still return Ok(NaN) if the stored vector contains NaN values (both the L2sq and NegativeDot formulas propagate NaN from inputs). Map NaN to Ordering::Greater so that NaN rows are treated as infinitely far and evicted first:

        self.distance
            .partial_cmp(&other.distance)
            .unwrap_or(std::cmp::Ordering::Greater)

Or guard earlier, before the push:

if dist.is_nan() {
    continue;
}

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — Silent schema mismatch (src/planner.rs line 623)

parquet_native_execute builds output columns by stripping the vector column from the full scan schema, then calls RecordBatch::try_new(params.schema, ...) where params.schema is derived from the lookup provider. These agree only if the scan provider and lookup provider share identical non-vector column ordering and types — a contract that is implicit, undocumented at the API boundary, and unvalidated at registration time.

If the orderings differ, RecordBatch::try_new either errors with a type mismatch or — worse — succeeds silently with data assigned to the wrong columns. The existing tests don't catch this because they happen to use matching schemas. The fix is to build output_col_indices by looking up each lookup-provider field name in full_schema by name rather than by positional exclusion.

P2 — NaN distances accumulate in top-k heap (src/planner.rs line 654)

ScoredRow::Ord maps NaN comparison to Ordering::Less, so NaN-distance rows sink to the bottom of the max-heap and are never evicted. They silently appear in results. The null-vector guard (Err(_) => continue) doesn't cover the case where stored vectors contain NaN values, which L2sq and dot-product distance formulas propagate. Use Ordering::Greater so NaN rows are treated as maximally distant and evicted first, or add an explicit if dist.is_nan() { continue } guard before the push.

Action Required

  1. Fix output_col_indices derivation to use name-based lookup against full_schema rather than positional exclusion.
  2. Fix NaN handling in ScoredRow::Ord (or guard before heap push).

Update usage examples for the scan_provider + lookup_provider API.
Document the three-path adaptive filtering (unfiltered HNSW, high-sel
HNSW filtered, low-sel Parquet-native). Remove stale DataFusion 51 API
notes and benchmark section.
Comment thread src/planner.rs Outdated
// This matches the lookup_provider schema (same column order).
let output_col_indices: Vec<usize> = (0..full_schema.fields().len())
.filter(|&i| i != vec_col_idx)
.collect();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — schema mismatch at runtime for real deployments

output_col_indices is built by positionally dropping the vector column from scan_provider.schema(). The resulting columns are then assembled into a RecordBatch using params.schema, which is derived from lookup_provider.schema() (see registry.rs:277–297).

This only works when scan_provider.schema() minus the vector column equals lookup_provider.schema() — same fields, same order. That invariant is never validated. For a typical Parquet + SQLite deployment where the Parquet file has additional columns not cached in SQLite (e.g. body, category, year), this produces a column-count mismatch and RecordBatch::try_new fails at query time.

Fix options:

  1. Validate at registration (add_with_config) that scan_provider.schema() minus vector_col == lookup_provider.schema(), and error early with a clear message.
  2. Project the scan output by name, using lookup_provider.schema() field names to pick columns from the batch (order-independent and tolerant of extra scan columns).

- Use name-based column lookup from lookup_provider schema into full
  scan schema instead of positional filtering. Prevents silent column
  mismatches if the two schemas have different orderings.
- Skip NaN distances before pushing to the top-k heap. Prevents NaN
  rows from sinking to the bottom and never being evicted.
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — parquet_native_execute schema assumption is unvalidated (src/planner.rs:536-540, 623)

registered.schema (the expected output schema) is built from lookup_provider.schema() + _distance at registration time (registry.rs:277-297). parquet_native_execute builds its output by positionally stripping the vector column from scan_provider.schema(). These two match only when scan_provider.schema() minus the vector column equals lookup_provider.schema() — same fields, same order.

That invariant is never checked. For a typical Parquet + SQLite deployment where the Parquet file has columns the SQLite table does not cache (e.g. body, category, year), the column count diverges and RecordBatch::try_new(params.schema.clone(), all_cols) errors at query execution time. The test exercises only the narrow case where schemas match exactly.

Fix options: (a) validate at add_with_config that scan_provider.schema() minus vector_col equals lookup_provider.schema() field-for-field, failing fast at registration with a clear message; or (b) project the scan output by name using lookup_provider.schema() field names rather than positional indices.

P2 — Two stale doc comments (not in diff, behavior changed by this PR)

  • src/planner.rs header comment (lines 14-20): still describes the old low-selectivity path ("use pre-computed (key, distance) pairs, heap-select top-k, then fetch_by_keys"). The new path streams a second full scan.
  • src/registry.rs USearchTableConfig::brute_force_selectivity_threshold doc: still says "exact brute-force search over only the rows that pass the WHERE filter." Should describe the new behavior (bypass HNSW and stream the full scan provider).

Action Required

Resolve the P1 schema contract before merging. The inline comment on planner.rs:540 has the details.

Update planner.rs header to describe the three execution paths
(unfiltered, high-sel HNSW filtered, low-sel Parquet-native).
Update registry.rs brute_force_selectivity_threshold doc to describe
the scan_provider-based path instead of the old brute-force approach.
Comment thread src/planner.rs
Comment on lines +660 to +664
}
}
impl Ord for ScoredRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Max-heap: largest distance at top → gets evicted first.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment "NaN sorts to the top of the heap so it gets evicted first" is incorrect. unwrap_or(Ordering::Less) treats NaN as less than everything, so NaN rows would sink to the bottom of the max-heap and stay. The behaviour is harmless in practice because NaN distances are filtered out before the heap.push call, but the comment is misleading and the code would be wrong if that guard were ever removed.

Comment thread src/planner.rs
let mut stream = scan_plan.execute(0, task_ctx.clone())?;

// Evaluate filters and collect valid rows (stream batch-by-batch, O(1) memory).
let mut valid_keys: HashSet<u64> = HashSet::new();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (non-blocking): valid_keys is always collected as a HashSet<u64> during the pre-scan. For the low-selectivity path, only valid_keys.len() and .is_empty() are used — the key set itself is discarded before parquet_native_execute (which re-evaluates filters independently). The set is only needed as a predicate closure in the high-selectivity HNSW path.

At ≤5% selectivity the set holds at most ~5% of n entries, so memory impact is bounded, but a usize counter (plus an is_empty sentinel) would suffice for the low-sel path and halves allocations on that branch. Consider splitting the pre-scan into a count-only phase that short-circuits to parquet_native_execute without materializing the full set, upgrading to a set only when the high-sel path is chosen.

Comment thread src/planner.rs
// At low selectivity (<=5%), the number of passing rows is small.
let mut heap: BinaryHeap<ScoredRow> = BinaryHeap::with_capacity(params.k + 1);

let mut stream = full_scan.execute(0, task_ctx)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 (non-blocking): full_scan.execute(0, task_ctx) reads only partition 0, same as the pre-scan on line 413. For a multi-partition TableProvider (e.g. a Parquet directory with multiple files), rows in partitions > 0 are silently skipped on both passes. This is a pre-existing limitation in the codebase — not introduced by this PR — but the new Parquet-native path adds a second instance of it. Worth tracking as a follow-up once multi-partition support is addressed.

Comment thread src/planner.rs
row: Vec<Arc<dyn Array>>,
}

impl PartialEq for ScoredRow {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 (non-blocking): PartialEq compares only distance, so two ScoredRow instances with equal distance but different row data compare as ==. This is harmless for BinaryHeap (Rust's heap only uses Ord for ordering decisions), but it means == on ScoredRow doesn't express structural equality, which could be surprising for future maintainers or if ScoredRow is ever used in a set/map. Consider adding a comment clarifying that PartialEq/Eq here are intentionally distance-only stubs required by Ord.

@anoop-narang anoop-narang merged commit 80059cc into main Mar 18, 2026
5 checks passed
@anoop-narang anoop-narang deleted the feat/parquet-native-adaptive-filter branch March 18, 2026 05:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant