Skip to content

Commit ab576ad

Browse files
committed
fix(planner): scan all partitions and use FilterExec for pre-scan
The pre-scan previously called execute(0) on the DataSourceExec, reading only the first partition's file group and missing valid keys from the rest of the dataset. This was a correctness bug — selectivity calculations and valid_key collection were based on partial data. Wrap the pre-scan as CoalescePartitionsExec → FilterExec → DataSourceExec: - CoalescePartitionsExec merges all partitions into a single stream - FilterExec evaluates the predicate per partition (DataFusion's physical optimizer pushes it into the Parquet reader for pruning) - The stream yields only matching rows — no manual evaluate_filters This also removes prescan_filters, evaluate_filters, and the manual physical filter compilation from SearchParams, simplifying the code.
1 parent 83fa0c4 commit ab576ad

1 file changed

Lines changed: 57 additions & 89 deletions

File tree

src/planner.rs

Lines changed: 57 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,19 @@
1010
// 3. Attach _distance column.
1111
//
1212
// ── With filters, high selectivity (> threshold) ─────────────────────────────
13-
// 1. Pre-scan: scan_provider with projection (scalar + _key cols only,
14-
// no vector column) and filter pushdown. Collect valid_keys.
13+
// 1. Pre-scan: CoalescePartitionsExec → FilterExec → DataSourceExec
14+
// (_key + filter cols only). Collect valid_keys from all partitions.
1515
// 2. selectivity = valid_keys.len() / index.size()
1616
// 3. filtered_search(query, k, |key| valid_keys.contains(key))
1717
// 4. lookup_provider.fetch_by_keys() → O(k) rows. Attach _distance.
1818
//
19-
// ── With filters, low selectivity (≤ threshold) — Parquet-native ─────────────
19+
// ── With filters, low selectivity (≤ threshold) — index-get ──────────────────
2020
// 1. Pre-scan: same as above, collect valid_keys and compute selectivity.
21-
// 2. Full scan: scan_provider with all columns (including vector) and
22-
// filter pushdown. Evaluate WHERE per batch, compute distances for
23-
// passing rows, maintain top-k heap. Return directly — no USearch,
24-
// no lookup_provider.
21+
// 2. index.get(key) for each valid_key → compute distances → top-k heap.
22+
// 3. lookup_provider.fetch_by_keys() → O(k) rows. Attach _distance.
2523
//
2624
// All I/O is deferred to USearchExec::execute() — plan_extension is purely
27-
// structural (validate registry entry, compile PhysicalExprs, build scan plans).
25+
// structural (validate registry, compile PhysicalExprs, build scan plans).
2826
//
2927
// The Sort node is kept in the logical plan so DataFusion handles ordering
3028
// by _distance / dist alias.
@@ -34,16 +32,20 @@ use std::collections::{BinaryHeap, HashMap, HashSet};
3432
use std::fmt;
3533
use std::sync::Arc;
3634

37-
use arrow_array::{Array, BooleanArray, Float32Array, RecordBatch};
35+
use arrow_array::{Array, Float32Array, RecordBatch};
3836
use arrow_schema::SchemaRef;
3937
use async_trait::async_trait;
4038
use datafusion::common::Result;
4139
use datafusion::error::DataFusionError;
4240
use datafusion::execution::context::QueryPlanner;
4341
use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext};
4442
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
45-
use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr, create_physical_expr};
43+
use datafusion::physical_expr::{
44+
EquivalenceProperties, PhysicalExpr, conjunction, create_physical_expr,
45+
};
46+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
4647
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
48+
use datafusion::physical_plan::filter::FilterExec;
4749
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
4850
use datafusion::physical_plan::{
4951
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
@@ -149,18 +151,16 @@ impl ExtensionPlanner for USearchExecPlanner {
149151
}
150152
};
151153

152-
// Compile filter Exprs → PhysicalExprs (synchronous, no I/O).
153154
let exec_props = session_state.execution_props();
154-
let physical_filters: Vec<Arc<dyn PhysicalExpr>> = node
155-
.filters
156-
.iter()
157-
.map(|f| create_physical_expr(f, &node.schema, exec_props))
158-
.collect::<Result<_>>()?;
159155

160-
// For the filtered path, pre-plan a provider scan:
161-
// Pre-scan (_key + filter cols only) — cheap selectivity estimation.
162-
// The scan receives the node's WHERE filters for Parquet predicate pushdown.
163-
let (provider_scan, prescan_filters) = if !node.filters.is_empty() {
156+
// For the filtered path, build a pre-scan plan:
157+
// CoalescePartitionsExec → FilterExec → DataSourceExec
158+
// DataSourceExec may have multiple partitions (file groups); FilterExec
159+
// evaluates the predicate per partition; CoalescePartitionsExec merges
160+
// all partitions into a single stream of matching rows.
161+
// DataFusion's physical optimizer pushes the predicate from FilterExec
162+
// into the Parquet reader for row group / bloom / page index pruning.
163+
let provider_scan = if !node.filters.is_empty() {
164164
let scan_schema = registered.scan_provider.schema();
165165

166166
// Pre-scan projection: _key + columns referenced by filters.
@@ -185,29 +185,40 @@ impl ExtensionPlanner for USearchExecPlanner {
185185
})
186186
.collect();
187187

188-
let pre_scan = registered
188+
// Don't pass filters to scan() — FilterExec handles filtering, and
189+
// DataFusion's physical optimizer pushes it into the Parquet reader
190+
// for row group / bloom / page index pruning.
191+
let data_source = registered
189192
.scan_provider
190-
.scan(session_state, Some(&scalar_projection), &node.filters, None)
193+
.scan(session_state, Some(&scalar_projection), &[], None)
191194
.await?;
192195

193-
// Compile physical filters against the pre-scan's projected schema
194-
// so column indices match the narrower batch layout.
195-
let pre_scan_schema = pre_scan.schema();
196-
let pre_scan_df_schema =
197-
datafusion::common::DFSchema::try_from(pre_scan_schema.as_ref().clone())?;
198-
let prescan_physical_filters: Vec<Arc<dyn PhysicalExpr>> = node
196+
// Compile physical filters against the projected schema and wrap
197+
// in a FilterExec. Column qualifiers are stripped because the
198+
// projected schema (from Arrow Schema) is unqualified.
199+
let proj_schema = data_source.schema();
200+
let proj_df_schema =
201+
datafusion::common::DFSchema::try_from(proj_schema.as_ref().clone())?;
202+
let phys_filters: Vec<Arc<dyn PhysicalExpr>> = node
199203
.filters
200204
.iter()
201205
.map(|f| {
202-
// Strip table qualifiers — the projected schema is unqualified.
203206
let unqualified = strip_column_qualifier(f);
204-
create_physical_expr(&unqualified, &pre_scan_df_schema, exec_props)
207+
create_physical_expr(&unqualified, &proj_df_schema, exec_props)
205208
})
206209
.collect::<Result<_>>()?;
210+
let predicate = conjunction(phys_filters);
211+
let filtered: Arc<dyn ExecutionPlan> =
212+
Arc::new(FilterExec::try_new(predicate, data_source)?);
213+
214+
// Merge all partitions into a single stream so the pre-scan
215+
// collects valid keys from the entire dataset, not just one
216+
// partition's file group.
217+
let coalesced: Arc<dyn ExecutionPlan> = Arc::new(CoalescePartitionsExec::new(filtered));
207218

208-
(Some(pre_scan), prescan_physical_filters)
219+
Some(coalesced)
209220
} else {
210-
(None, vec![])
221+
None
211222
};
212223

213224
Ok(Some(Arc::new(USearchExec::new(SearchParams {
@@ -216,8 +227,7 @@ impl ExtensionPlanner for USearchExecPlanner {
216227
query_vec: node.query_vec_f64(),
217228
k: node.k,
218229
distance_type: node.distance_type.clone(),
219-
physical_filters,
220-
prescan_filters,
230+
has_filters: !node.filters.is_empty(),
221231
schema: registered.schema.clone(),
222232
key_col: registered.key_col.clone(),
223233
scalar_kind: registered.scalar_kind,
@@ -237,11 +247,9 @@ struct SearchParams {
237247
query_vec: Vec<f64>,
238248
k: usize,
239249
distance_type: DistanceType,
240-
physical_filters: Vec<Arc<dyn PhysicalExpr>>,
241-
/// Physical filters compiled against the pre-scan's projected schema.
242-
/// Column indices match the narrow _key + filter-col projection, not the
243-
/// full table schema. Used by adaptive_filtered_execute for pre-scan evaluation.
244-
prescan_filters: Vec<Arc<dyn PhysicalExpr>>,
250+
/// Whether the query has WHERE-clause filters. Used to choose between the
251+
/// unfiltered HNSW path and the adaptive filtered path.
252+
has_filters: bool,
245253
schema: SchemaRef,
246254
key_col: String,
247255
scalar_kind: ScalarKind,
@@ -276,10 +284,8 @@ impl DisplayAs for USearchExec {
276284
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
277285
write!(
278286
f,
279-
"USearchExec: table={}, k={}, filters={}",
280-
self.params.table_name,
281-
self.params.k,
282-
self.params.physical_filters.len()
287+
"USearchExec: table={}, k={}, filtered={}",
288+
self.params.table_name, self.params.k, self.params.has_filters
283289
)
284290
}
285291
}
@@ -346,7 +352,7 @@ impl ExecutionPlan for USearchExec {
346352
fields(
347353
usearch.table = %params.table_name,
348354
usearch.k = params.k,
349-
usearch.filter_count = params.physical_filters.len(),
355+
usearch.has_filters = params.has_filters,
350356
)
351357
)]
352358
async fn usearch_execute(
@@ -361,7 +367,7 @@ async fn usearch_execute(
361367
))
362368
})?;
363369

364-
if params.physical_filters.is_empty() {
370+
if !params.has_filters {
365371
// ── Unfiltered path ───────────────────────────────────────────────
366372
let matches = {
367373
let _span = tracing::info_span!(
@@ -424,7 +430,7 @@ async fn usearch_execute(
424430
fields(
425431
usearch.table = %params.table_name,
426432
usearch.k = params.k,
427-
usearch.filter_count = params.physical_filters.len(),
433+
usearch.has_filters = params.has_filters,
428434
usearch.valid_rows = tracing::field::Empty,
429435
usearch.total_rows = tracing::field::Empty,
430436
usearch.selectivity = tracing::field::Empty,
@@ -449,24 +455,19 @@ async fn adaptive_filtered_execute(
449455
// Key column index in lookup_provider schema — used by attach_distances (high-sel path).
450456
let lookup_key_col_idx = provider_key_col_idx(registered)?;
451457

452-
// ── Phase 1: Pre-scan (scalar + _key only) for selectivity estimation ────
458+
// ── Phase 1: Pre-scan for selectivity estimation ───────────────────────
459+
// The scan_plan is CoalescePartitionsExec → FilterExec → DataSourceExec,
460+
// so execute(0) yields already-filtered rows from all partitions.
453461
let mut stream = scan_plan.execute(0, task_ctx.clone())?;
454462
let mut valid_keys: HashSet<u64> = HashSet::new();
455463

456464
let scan_span = tracing::info_span!("usearch_pre_scan", usearch.table = %params.table_name);
457465
async {
458466
while let Some(batch_result) = stream.next().await {
459467
let batch = batch_result?;
460-
let mask = evaluate_filters(&params.prescan_filters, &batch)?;
461468
let keys = extract_keys_as_u64(batch.column(scan_key_col_idx).as_ref())?;
462-
463-
for row_idx in 0..batch.num_rows() {
464-
if !mask.is_null(row_idx)
465-
&& mask.value(row_idx)
466-
&& let Some(Some(key)) = keys.get(row_idx)
467-
{
468-
valid_keys.insert(*key);
469-
}
469+
for key in keys.into_iter().flatten() {
470+
valid_keys.insert(key);
470471
}
471472
}
472473
Ok::<_, datafusion::error::DataFusionError>(())
@@ -794,39 +795,6 @@ fn compute_raw_distance_f64(v: &[f64], q: &[f64], dist_type: &DistanceType) -> f
794795

795796
// ── Helpers ───────────────────────────────────────────────────────────────────
796797

797-
/// AND all physical filter expressions against a batch.
798-
/// Returns a BooleanArray (one value per row, true = passes all filters).
799-
fn evaluate_filters(
800-
filters: &[Arc<dyn PhysicalExpr>],
801-
batch: &RecordBatch,
802-
) -> Result<BooleanArray> {
803-
use datafusion::arrow::compute;
804-
805-
if filters.is_empty() {
806-
return Ok(BooleanArray::from(vec![true; batch.num_rows()]));
807-
}
808-
809-
let mut combined: Option<BooleanArray> = None;
810-
for filter in filters {
811-
let col_val = filter.evaluate(batch)?;
812-
let arr = col_val.into_array(batch.num_rows())?;
813-
let bool_arr = arr
814-
.as_any()
815-
.downcast_ref::<BooleanArray>()
816-
.ok_or_else(|| {
817-
DataFusionError::Execution("filter expression did not return BooleanArray".into())
818-
})?
819-
.clone();
820-
821-
combined = Some(match combined {
822-
None => bool_arr,
823-
Some(prev) => compute::and(&prev, &bool_arr)
824-
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?,
825-
});
826-
}
827-
Ok(combined.unwrap())
828-
}
829-
830798
/// Extract the distance from a single row of a vector column.
831799
///
832800
/// Index of the key column in the lookup provider schema.

0 commit comments

Comments
 (0)