Skip to content

Commit 2c65279

Browse files
authored
fix(rule): keep index for k-NN that returns metadata, fall back when vector is projected (#25)
A k-NN query that projects the indexed vector column (SELECT *, or SELECT id, embedding) crashed with a post-optimizer schema-mismatch when an index was present: the passthrough branch built its output from the index node's columns (addressing key + non-vector columns), which can't include the vector, so the rewritten plan's schema differed from the original and DataFusion's invariant check aborted the query. The fix is output-aware. The rule now also anchors on a Projection sitting directly over a passthrough k-NN Sort and drives the rewrite from that outer projection's columns — the query's real output: - vector NOT in output (e.g. SELECT id ... ORDER BY l2_distance(emb, ...), the common "nearest ids" query) -> every output column is producible from the node, so the index is still used. - vector IN output (SELECT *, SELECT id, embedding) -> the rewrite can't produce it, so the rule declines and the query falls back to exact brute-force search (correct, like the existing DESC / metric-mismatch fallbacks) instead of crashing. This keeps the metadata-only k-NN path on the index (no regression) while fixing the crash. A code comment records the rejected alternative (have USearchExec reconstruct the vector via index.get) and why: it would make the index a second source of returned vectors that must byte-match the source, which breaks under F16 quantization. Regression tests model production (lookup schema excludes the vector column, which the existing tests' provider included, masking the bug). README documents the fallback. Fixes #508
1 parent 87e2b73 commit 2c65279

3 files changed

Lines changed: 319 additions & 31 deletions

File tree

README.md

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ A DataFusion extension that integrates [USearch](https://github.com/unum-cloud/u
44

55
Queries matching the `ORDER BY distance_fn(col, query) LIMIT k` pattern are **transparently rewritten** by an optimizer rule into a native USearch index call — no query rewrite needed from the caller. `WHERE` clause filters are handled adaptively: high-selectivity filters use USearch's in-graph predicate API; low-selectivity filters bypass HNSW entirely and scan the data directly.
66

7-
**DataFusion:** 52.2   **USearch:** 2.24
7+
**DataFusion:** 53   **USearch:** 2.24
88

99
---
1010

@@ -230,20 +230,33 @@ tests/
230230

231231
### Optimizer rewrite
232232

233-
The rule (`rule.rs`) matches two logical plan shapes:
233+
The rule (`rule.rs`) matches the `Sort(fetch=k)` over a `TableScan`, with an
234+
optional `Projection` between them and an optional `Filter` directly above the
235+
scan:
234236

235237
```
236238
Sort(fetch=k, ORDER BY dist ASC)
237-
Projection([..., distance_fn(col, lit) AS dist, ...])
238-
TableScan(name)
239-
240-
Sort(fetch=k, ORDER BY dist ASC)
241-
Projection([..., distance_fn(col, lit) AS dist, ...])
242-
Filter(predicate)
239+
[ Projection([..., distance_fn(col, lit) AS dist, ...]) ] ← optional
240+
[ Filter(predicate) ] ← optional
243241
TableScan(name)
244242
```
245243

246-
Preconditions: sort is `ASC`, distance UDF matches index metric, table is registered, query vector is a literal. When the rule fires, it replaces the inner nodes with a `USearchNode` leaf carrying: table name, vector column, query vector, k, distance type, and absorbed filter predicates. The `Sort` node is preserved above for final ordering.
244+
DataFusion omits the `Projection` for `SELECT *` (and for any SELECT whose
245+
columns come straight from the scan), so the `Sort` can sit directly on the
246+
`TableScan`.
247+
248+
Preconditions: sort is `ASC`, distance UDF matches index metric, table is
249+
registered, query vector is a literal. When the rule fires, it replaces the inner
250+
nodes with a `USearchNode` leaf carrying: table name, vector column, query
251+
vector, k, distance type, and absorbed filter predicates. The `Sort` node is
252+
preserved above for final ordering.
253+
254+
**Schema preservation:** an optimizer rule must not change the plan's output
255+
schema. The `USearchNode` produces only what the `lookup_provider` can fetch by
256+
key (addressing key + non-vector columns) plus `_distance` — it cannot produce
257+
the indexed vector column. If the matched `Sort`'s output would include the
258+
vector column (e.g. `SELECT *`), the rule declines and the query falls back to
259+
exact execution rather than emitting a schema-incompatible plan.
247260

248261
Physical planning (`planner.rs`) translates `USearchNode` into `USearchExec`, a physical plan node that executes the actual search.
249262

@@ -305,6 +318,7 @@ Tests cover optimizer rule matching/rejection, end-to-end execution through both
305318

306319
| Limitation | Notes |
307320
|---|---|
321+
| Projecting the indexed vector column | A k-NN query whose output includes the vector column itself (e.g. `SELECT *`, or `SELECT id, vector`) falls back to exact execution. The `lookup_provider` does not store the vector column (see [registration](#register-providers-and-set-up-the-sessioncontext)), so the rewrite cannot reproduce it. Project the metadata columns and the distance instead. |
308322
| Stacked `Filter` nodes | Only one `Filter -> TableScan` layer is absorbed. `Filter -> Filter -> TableScan` falls back to exact execution. DataFusion typically combines multiple WHERE conditions into a single Filter, so this rarely occurs. |
309323
| Runtime query vectors | The query vector must be a compile-time literal (`ARRAY[0.1, ...]`). Column references or subquery results are not rewritten. Use `vector_search_vector` for explicit ANN queries. |
310324
| `ef_search` per-query | `expansion_search` is global to the index instance. Per-query adjustment is not supported. |

src/rule.rs

Lines changed: 92 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,66 @@ impl USearchRule {
4848
}
4949

5050
fn try_match(&self, plan: &LogicalPlan) -> Option<LogicalPlan> {
51+
match plan {
52+
// Anchor on the Sort itself. The projection (if any) sits *below* the
53+
// Sort and supplies its output columns; SELECT * omits it entirely.
54+
LogicalPlan::Sort(sort) => {
55+
let (proj_exprs_slice, after_sort): (&[Expr], &LogicalPlan) =
56+
match sort.input.as_ref() {
57+
LogicalPlan::Projection(p) => (p.expr.as_slice(), p.input.as_ref()),
58+
other => (&[], other),
59+
};
60+
self.build_rewrite(sort, proj_exprs_slice, after_sort)
61+
}
62+
63+
// Output-aware passthrough. When a Projection sits directly over a
64+
// k-NN Sort that rests on the scan (no projection between them), drive
65+
// the rewrite with the OUTER projection's columns — i.e. the query's
66+
// real output. The rewrite can only produce the index node's columns
67+
// (addressing key + non-vector columns + _distance), never the indexed
68+
// vector itself. Routing the output columns through `build_rewrite`
69+
// lets it fire when they're all producible (e.g. `SELECT id … ORDER BY
70+
// l2_distance(emb, …)`) and decline — falling back to exact search —
71+
// when the output needs the vector (`SELECT *`, `SELECT id, emb`),
72+
// rather than emitting a schema the consumer can't satisfy (issue #508).
73+
//
74+
// ALTERNATIVE (not taken): teach USearchExec to reconstruct the vector
75+
// column for the result keys via `index.get(key)`, so even
76+
// vector-returning queries stay on the index. Rejected to keep a single
77+
// source of truth for returned vectors — the index would otherwise be a
78+
// second source that must byte-match the parquet (breaks under F16
79+
// quantization, and relies on USearch never transforming stored vectors).
80+
// See the README "Limitations" entry and runtimedb issue #508.
81+
LogicalPlan::Projection(outer) => {
82+
let LogicalPlan::Sort(sort) = outer.input.as_ref() else {
83+
return None;
84+
};
85+
// Only the passthrough shape; the remap shape (projection *below*
86+
// the Sort) is handled when we visit the Sort above.
87+
if !matches!(
88+
sort.input.as_ref(),
89+
LogicalPlan::TableScan(_) | LogicalPlan::Filter(_)
90+
) {
91+
return None;
92+
}
93+
self.build_rewrite(sort, &outer.expr, sort.input.as_ref())
94+
}
95+
96+
_ => None,
97+
}
98+
}
99+
100+
fn build_rewrite(
101+
&self,
102+
sort: &datafusion::logical_expr::logical_plan::Sort,
103+
proj_exprs_slice: &[Expr],
104+
after_sort: &LogicalPlan,
105+
) -> Option<LogicalPlan> {
51106
use datafusion::logical_expr::logical_plan::TableScan;
52107

53108
// Require Sort with embedded fetch limit.
54-
let sort = match plan {
55-
LogicalPlan::Sort(s) => s,
56-
_ => return None,
57-
};
58109
let k = sort.fetch?;
59110

60-
// Projection is optional — DataFusion 51 omits it for SELECT * queries.
61-
let (proj_exprs_slice, after_sort): (&[Expr], &LogicalPlan) = match sort.input.as_ref() {
62-
LogicalPlan::Projection(p) => (p.expr.as_slice(), p.input.as_ref()),
63-
other => (&[], other),
64-
};
65-
66111
// Accept TableScan directly, or Filter(TableScan) for WHERE clauses.
67112
// Deeper nesting (Filter→Filter→…) is not absorbed — the rule does
68113
// not fire and DataFusion falls back to exact execution.
@@ -155,7 +200,14 @@ impl USearchRule {
155200
// Build the final user-visible projection over USearchNode output.
156201
let dist_alias_str = dist_alias.as_deref().unwrap_or("_distance");
157202
let final_proj_exprs = if proj_exprs_slice.is_empty() {
158-
passthrough_projection(&vsn_df_schema, &table_ref)
203+
// No explicit Projection node (e.g. SELECT *, or a SELECT whose
204+
// columns come straight from the scan, so the Sort sits directly on
205+
// the TableScan). The rewrite must reproduce the original output
206+
// columns; if any isn't producible from the node — the indexed
207+
// vector column is never stored in the fetch path — bail so the
208+
// query falls back to exact brute-force search, like the other
209+
// unsupported shapes (DESC, metric mismatch, stacked filters).
210+
passthrough_projection(after_sort.schema().as_ref(), &vsn_df_schema, &table_ref)?
159211
} else {
160212
remap_projections(proj_exprs_slice, dist_alias_str, &table_ref)
161213
};
@@ -375,21 +427,39 @@ fn build_outer_projection(exprs: &[Expr]) -> Vec<Expr> {
375427
.collect()
376428
}
377429

378-
/// Build a passthrough Projection for SELECT * queries (no original Projection node).
379-
/// Projects only the original table columns (not `_distance`) so the output schema
380-
/// matches the original Sort schema. The Sort re-evaluates the distance UDF expression
381-
/// on the k result rows returned by USearchExec (O(k × dim), negligible for small k).
382-
fn passthrough_projection(schema: &DFSchema, table_ref: &TableReference) -> Vec<Expr> {
383-
schema
430+
/// Build a passthrough Projection for queries with no explicit Projection node
431+
/// (e.g. `SELECT *`, or a SELECT whose columns come straight from the scan so the
432+
/// Sort sits directly on the TableScan).
433+
///
434+
/// The projection must reproduce the *original* output columns (`original_schema`,
435+
/// the Sort's input). The `USearchNode` can only produce the columns in
436+
/// `node_schema` — the fetch path's addressing key + non-vector columns +
437+
/// `_distance`; the indexed vector column is never stored there (see
438+
/// `PointLookupProvider`). If the original output needs a column the node can't
439+
/// produce (the vector column), return `None` so the rule declines to rewrite and
440+
/// the query falls back to exact brute-force search. The Sort re-evaluates the
441+
/// distance UDF on the k result rows returned by USearchExec (O(k × dim)).
442+
fn passthrough_projection(
443+
original_schema: &DFSchema,
444+
node_schema: &DFSchema,
445+
table_ref: &TableReference,
446+
) -> Option<Vec<Expr>> {
447+
original_schema
384448
.inner()
385449
.fields()
386450
.iter()
387-
.filter(|f| f.name() != "_distance")
388451
.map(|f| {
389-
Expr::Column(datafusion::common::Column::new(
390-
Some(table_ref.clone()),
391-
f.name().as_str(),
392-
))
452+
let producible = node_schema
453+
.inner()
454+
.fields()
455+
.iter()
456+
.any(|nf| nf.name() == f.name());
457+
producible.then(|| {
458+
Expr::Column(datafusion::common::Column::new(
459+
Some(table_ref.clone()),
460+
f.name().as_str(),
461+
))
462+
})
393463
})
394464
.collect()
395465
}

0 commit comments

Comments
 (0)