1010// 3. Attach _distance column.
1111//
1212// ── With filters, high selectivity (> threshold) ─────────────────────────────
13- // 1. Pre-scan: scan_provider with projection (scalar + _key cols only,
14- // no vector column) and filter pushdown . Collect valid_keys.
13+ // 1. Pre-scan: CoalescePartitionsExec → FilterExec → DataSourceExec
14+ // (_key + filter cols only) . Collect valid_keys from all partitions .
1515// 2. selectivity = valid_keys.len() / index.size()
1616// 3. filtered_search(query, k, |key| valid_keys.contains(key))
1717// 4. lookup_provider.fetch_by_keys() → O(k) rows. Attach _distance.
1818//
19- // ── With filters, low selectivity (≤ threshold) — Parquet-native ─────────────
19+ // ── With filters, low selectivity (≤ threshold) — index-get ───── ─────────────
2020// 1. Pre-scan: same as above, collect valid_keys and compute selectivity.
21- // 2. Full scan: scan_provider with all columns (including vector) and
22- // filter pushdown. Evaluate WHERE per batch, compute distances for
23- // passing rows, maintain top-k heap. Return directly — no USearch,
24- // no lookup_provider.
21+ // 2. index.get(key) for each valid_key → compute distances → top-k heap.
22+ // 3. lookup_provider.fetch_by_keys() → O(k) rows. Attach _distance.
2523//
2624// All I/O is deferred to USearchExec::execute() — plan_extension is purely
27- // structural (validate registry entry , compile PhysicalExprs, build scan plans).
25+ // structural (validate registry, compile PhysicalExprs, build scan plans).
2826//
2927// The Sort node is kept in the logical plan so DataFusion handles ordering
3028// by _distance / dist alias.
@@ -34,16 +32,20 @@ use std::collections::{BinaryHeap, HashMap, HashSet};
3432use std:: fmt;
3533use std:: sync:: Arc ;
3634
37- use arrow_array:: { Array , BooleanArray , Float32Array , RecordBatch } ;
35+ use arrow_array:: { Array , Float32Array , RecordBatch } ;
3836use arrow_schema:: SchemaRef ;
3937use async_trait:: async_trait;
4038use datafusion:: common:: Result ;
4139use datafusion:: error:: DataFusionError ;
4240use datafusion:: execution:: context:: QueryPlanner ;
4341use datafusion:: execution:: { SendableRecordBatchStream , SessionState , TaskContext } ;
4442use datafusion:: logical_expr:: { LogicalPlan , UserDefinedLogicalNode } ;
45- use datafusion:: physical_expr:: { EquivalenceProperties , PhysicalExpr , create_physical_expr} ;
43+ use datafusion:: physical_expr:: {
44+ EquivalenceProperties , PhysicalExpr , conjunction, create_physical_expr,
45+ } ;
46+ use datafusion:: physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
4647use datafusion:: physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
48+ use datafusion:: physical_plan:: filter:: FilterExec ;
4749use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
4850use datafusion:: physical_plan:: {
4951 DisplayAs , DisplayFormatType , ExecutionPlan , Partitioning , PlanProperties ,
@@ -149,18 +151,16 @@ impl ExtensionPlanner for USearchExecPlanner {
149151 }
150152 } ;
151153
152- // Compile filter Exprs → PhysicalExprs (synchronous, no I/O).
153154 let exec_props = session_state. execution_props ( ) ;
154- let physical_filters: Vec < Arc < dyn PhysicalExpr > > = node
155- . filters
156- . iter ( )
157- . map ( |f| create_physical_expr ( f, & node. schema , exec_props) )
158- . collect :: < Result < _ > > ( ) ?;
159155
160- // For the filtered path, pre-plan a provider scan:
161- // Pre-scan (_key + filter cols only) — cheap selectivity estimation.
162- // The scan receives the node's WHERE filters for Parquet predicate pushdown.
163- let ( provider_scan, prescan_filters) = if !node. filters . is_empty ( ) {
156+ // For the filtered path, build a pre-scan plan:
157+ // CoalescePartitionsExec → FilterExec → DataSourceExec
158+ // DataSourceExec may have multiple partitions (file groups); FilterExec
159+ // evaluates the predicate per partition; CoalescePartitionsExec merges
160+ // all partitions into a single stream of matching rows.
161+ // DataFusion's physical optimizer pushes the predicate from FilterExec
162+ // into the Parquet reader for row group / bloom / page index pruning.
163+ let provider_scan = if !node. filters . is_empty ( ) {
164164 let scan_schema = registered. scan_provider . schema ( ) ;
165165
166166 // Pre-scan projection: _key + columns referenced by filters.
@@ -185,29 +185,40 @@ impl ExtensionPlanner for USearchExecPlanner {
185185 } )
186186 . collect ( ) ;
187187
188- let pre_scan = registered
188+ // Don't pass filters to scan() — FilterExec handles filtering, and
189+ // DataFusion's physical optimizer pushes it into the Parquet reader
190+ // for row group / bloom / page index pruning.
191+ let data_source = registered
189192 . scan_provider
190- . scan ( session_state, Some ( & scalar_projection) , & node . filters , None )
193+ . scan ( session_state, Some ( & scalar_projection) , & [ ] , None )
191194 . await ?;
192195
193- // Compile physical filters against the pre-scan's projected schema
194- // so column indices match the narrower batch layout.
195- let pre_scan_schema = pre_scan. schema ( ) ;
196- let pre_scan_df_schema =
197- datafusion:: common:: DFSchema :: try_from ( pre_scan_schema. as_ref ( ) . clone ( ) ) ?;
198- let prescan_physical_filters: Vec < Arc < dyn PhysicalExpr > > = node
196+ // Compile physical filters against the projected schema and wrap
197+ // in a FilterExec. Column qualifiers are stripped because the
198+ // projected schema (from Arrow Schema) is unqualified.
199+ let proj_schema = data_source. schema ( ) ;
200+ let proj_df_schema =
201+ datafusion:: common:: DFSchema :: try_from ( proj_schema. as_ref ( ) . clone ( ) ) ?;
202+ let phys_filters: Vec < Arc < dyn PhysicalExpr > > = node
199203 . filters
200204 . iter ( )
201205 . map ( |f| {
202- // Strip table qualifiers — the projected schema is unqualified.
203206 let unqualified = strip_column_qualifier ( f) ;
204- create_physical_expr ( & unqualified, & pre_scan_df_schema , exec_props)
207+ create_physical_expr ( & unqualified, & proj_df_schema , exec_props)
205208 } )
206209 . collect :: < Result < _ > > ( ) ?;
210+ let predicate = conjunction ( phys_filters) ;
211+ let filtered: Arc < dyn ExecutionPlan > =
212+ Arc :: new ( FilterExec :: try_new ( predicate, data_source) ?) ;
213+
214+ // Merge all partitions into a single stream so the pre-scan
215+ // collects valid keys from the entire dataset, not just one
216+ // partition's file group.
217+ let coalesced: Arc < dyn ExecutionPlan > = Arc :: new ( CoalescePartitionsExec :: new ( filtered) ) ;
207218
208- ( Some ( pre_scan ) , prescan_physical_filters )
219+ Some ( coalesced )
209220 } else {
210- ( None , vec ! [ ] )
221+ None
211222 } ;
212223
213224 Ok ( Some ( Arc :: new ( USearchExec :: new ( SearchParams {
@@ -216,8 +227,7 @@ impl ExtensionPlanner for USearchExecPlanner {
216227 query_vec : node. query_vec_f64 ( ) ,
217228 k : node. k ,
218229 distance_type : node. distance_type . clone ( ) ,
219- physical_filters,
220- prescan_filters,
230+ has_filters : !node. filters . is_empty ( ) ,
221231 schema : registered. schema . clone ( ) ,
222232 key_col : registered. key_col . clone ( ) ,
223233 scalar_kind : registered. scalar_kind ,
@@ -237,11 +247,9 @@ struct SearchParams {
237247 query_vec : Vec < f64 > ,
238248 k : usize ,
239249 distance_type : DistanceType ,
240- physical_filters : Vec < Arc < dyn PhysicalExpr > > ,
241- /// Physical filters compiled against the pre-scan's projected schema.
242- /// Column indices match the narrow _key + filter-col projection, not the
243- /// full table schema. Used by adaptive_filtered_execute for pre-scan evaluation.
244- prescan_filters : Vec < Arc < dyn PhysicalExpr > > ,
250+ /// Whether the query has WHERE-clause filters. Used to choose between the
251+ /// unfiltered HNSW path and the adaptive filtered path.
252+ has_filters : bool ,
245253 schema : SchemaRef ,
246254 key_col : String ,
247255 scalar_kind : ScalarKind ,
@@ -276,10 +284,8 @@ impl DisplayAs for USearchExec {
276284 fn fmt_as ( & self , _t : DisplayFormatType , f : & mut fmt:: Formatter ) -> fmt:: Result {
277285 write ! (
278286 f,
279- "USearchExec: table={}, k={}, filters={}" ,
280- self . params. table_name,
281- self . params. k,
282- self . params. physical_filters. len( )
287+ "USearchExec: table={}, k={}, filtered={}" ,
288+ self . params. table_name, self . params. k, self . params. has_filters
283289 )
284290 }
285291}
@@ -346,7 +352,7 @@ impl ExecutionPlan for USearchExec {
346352 fields(
347353 usearch. table = %params. table_name,
348354 usearch. k = params. k,
349- usearch. filter_count = params. physical_filters . len ( ) ,
355+ usearch. has_filters = params. has_filters ,
350356 )
351357) ]
352358async fn usearch_execute (
@@ -361,7 +367,7 @@ async fn usearch_execute(
361367 ) )
362368 } ) ?;
363369
364- if params. physical_filters . is_empty ( ) {
370+ if ! params. has_filters {
365371 // ── Unfiltered path ───────────────────────────────────────────────
366372 let matches = {
367373 let _span = tracing:: info_span!(
@@ -424,7 +430,7 @@ async fn usearch_execute(
424430 fields(
425431 usearch. table = %params. table_name,
426432 usearch. k = params. k,
427- usearch. filter_count = params. physical_filters . len ( ) ,
433+ usearch. has_filters = params. has_filters ,
428434 usearch. valid_rows = tracing:: field:: Empty ,
429435 usearch. total_rows = tracing:: field:: Empty ,
430436 usearch. selectivity = tracing:: field:: Empty ,
@@ -449,24 +455,19 @@ async fn adaptive_filtered_execute(
449455 // Key column index in lookup_provider schema — used by attach_distances (high-sel path).
450456 let lookup_key_col_idx = provider_key_col_idx ( registered) ?;
451457
452- // ── Phase 1: Pre-scan (scalar + _key only) for selectivity estimation ────
458+ // ── Phase 1: Pre-scan for selectivity estimation ───────────────────────
459+ // The scan_plan is CoalescePartitionsExec → FilterExec → DataSourceExec,
460+ // so execute(0) yields already-filtered rows from all partitions.
453461 let mut stream = scan_plan. execute ( 0 , task_ctx. clone ( ) ) ?;
454462 let mut valid_keys: HashSet < u64 > = HashSet :: new ( ) ;
455463
456464 let scan_span = tracing:: info_span!( "usearch_pre_scan" , usearch. table = %params. table_name) ;
457465 async {
458466 while let Some ( batch_result) = stream. next ( ) . await {
459467 let batch = batch_result?;
460- let mask = evaluate_filters ( & params. prescan_filters , & batch) ?;
461468 let keys = extract_keys_as_u64 ( batch. column ( scan_key_col_idx) . as_ref ( ) ) ?;
462-
463- for row_idx in 0 ..batch. num_rows ( ) {
464- if !mask. is_null ( row_idx)
465- && mask. value ( row_idx)
466- && let Some ( Some ( key) ) = keys. get ( row_idx)
467- {
468- valid_keys. insert ( * key) ;
469- }
469+ for key in keys. into_iter ( ) . flatten ( ) {
470+ valid_keys. insert ( key) ;
470471 }
471472 }
472473 Ok :: < _ , datafusion:: error:: DataFusionError > ( ( ) )
@@ -794,39 +795,6 @@ fn compute_raw_distance_f64(v: &[f64], q: &[f64], dist_type: &DistanceType) -> f
794795
795796// ── Helpers ───────────────────────────────────────────────────────────────────
796797
797- /// AND all physical filter expressions against a batch.
798- /// Returns a BooleanArray (one value per row, true = passes all filters).
799- fn evaluate_filters (
800- filters : & [ Arc < dyn PhysicalExpr > ] ,
801- batch : & RecordBatch ,
802- ) -> Result < BooleanArray > {
803- use datafusion:: arrow:: compute;
804-
805- if filters. is_empty ( ) {
806- return Ok ( BooleanArray :: from ( vec ! [ true ; batch. num_rows( ) ] ) ) ;
807- }
808-
809- let mut combined: Option < BooleanArray > = None ;
810- for filter in filters {
811- let col_val = filter. evaluate ( batch) ?;
812- let arr = col_val. into_array ( batch. num_rows ( ) ) ?;
813- let bool_arr = arr
814- . as_any ( )
815- . downcast_ref :: < BooleanArray > ( )
816- . ok_or_else ( || {
817- DataFusionError :: Execution ( "filter expression did not return BooleanArray" . into ( ) )
818- } ) ?
819- . clone ( ) ;
820-
821- combined = Some ( match combined {
822- None => bool_arr,
823- Some ( prev) => compute:: and ( & prev, & bool_arr)
824- . map_err ( |e| DataFusionError :: ArrowError ( Box :: new ( e) , None ) ) ?,
825- } ) ;
826- }
827- Ok ( combined. unwrap ( ) )
828- }
829-
830798/// Extract the distance from a single row of a vector column.
831799///
832800/// Index of the key column in the lookup provider schema.
0 commit comments