Skip to content

Commit 4269e6a

Browse files
authored
feat: wrap FilterExec in LanceFilterExec (#3939)
This wraps usage of Datafusion FilterExec in a LanceFilterExec node that carries the original logical Expr for the filter alongside the wrapped FilterExec. The reason for this is to enable a datafusion optimizer rule to more easily serialize the execution logic of the filter (using substrait) and send the node over the wire.
1 parent fc23656 commit 4269e6a

4 files changed

Lines changed: 146 additions & 64 deletions

File tree

rust/lance/src/dataset/scanner.rs

Lines changed: 42 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use arrow_array::{Array, Float32Array, Int64Array, RecordBatch};
1212
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, SortOptions};
1313
use arrow_select::concat::concat_batches;
1414
use async_recursion::async_recursion;
15-
use datafusion::common::SchemaExt;
15+
use datafusion::common::{DFSchema, SchemaExt};
1616
use datafusion::functions_aggregate;
1717
use datafusion::functions_aggregate::count::count_udaf;
18-
use datafusion::logical_expr::Expr;
18+
use datafusion::logical_expr::{col, lit, Expr};
1919
use datafusion::physical_expr::PhysicalSortExpr;
2020
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
2121
use datafusion::physical_plan::expressions;
@@ -25,16 +25,15 @@ use datafusion::physical_plan::{
2525
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
2626
display::DisplayableExecutionPlan,
2727
expressions::Literal,
28-
filter::FilterExec,
2928
limit::GlobalLimitExec,
3029
repartition::RepartitionExec,
3130
union::UnionExec,
3231
ExecutionPlan, SendableRecordBatchStream,
3332
};
3433
use datafusion::scalar::ScalarValue;
35-
use datafusion_expr::Operator;
34+
use datafusion_expr::execution_props::ExecutionProps;
3635
use datafusion_physical_expr::{aggregate::AggregateExprBuilder, expressions::Column};
37-
use datafusion_physical_expr::{LexOrdering, Partitioning, PhysicalExpr};
36+
use datafusion_physical_expr::{create_physical_expr, LexOrdering, Partitioning, PhysicalExpr};
3837
use datafusion_physical_plan::{empty::EmptyExec, joins::HashJoinExec};
3938
use futures::future::BoxFuture;
4039
use futures::stream::{Stream, StreamExt};
@@ -69,7 +68,7 @@ use crate::index::DatasetIndexInternalExt;
6968
use crate::io::exec::fts::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec};
7069
use crate::io::exec::knn::MultivectorScoringExec;
7170
use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec};
72-
use crate::io::exec::{get_physical_optimizer, LanceScanConfig};
71+
use crate::io::exec::{get_physical_optimizer, LanceFilterExec, LanceScanConfig};
7372
use crate::io::exec::{
7473
knn::new_knn_exec, project, AddRowAddrExec, FilterPlan, KNNVectorDistanceExec,
7574
LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec,
@@ -1591,10 +1590,7 @@ impl Scanner {
15911590
if let Some(refine_expr) = filter_plan.refine_expr {
15921591
// We create a new planner specific to the node's schema, since
15931592
// physical expressions reference column by index rather than by name.
1594-
let planner = Planner::new(plan.schema());
1595-
let physical_refine_expr = planner.create_physical_expr(&refine_expr)?;
1596-
1597-
plan = Arc::new(FilterExec::try_new(physical_refine_expr, plan)?);
1593+
plan = Arc::new(LanceFilterExec::try_new(refine_expr, plan)?);
15981594
}
15991595

16001596
// Stage 3: sort
@@ -2066,9 +2062,7 @@ impl Scanner {
20662062

20672063
if let Some(expr) = filter_plan.full_expr.as_ref() {
20682064
// If there is a prefilter we need to manually apply it to the new data
2069-
let planner = Planner::new(scan_node.schema());
2070-
let physical_refine_expr = planner.create_physical_expr(expr)?;
2071-
scan_node = Arc::new(FilterExec::try_new(physical_refine_expr, scan_node)?);
2065+
scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
20722066
}
20732067

20742068
let flat_match_plan = Arc::new(FlatMatchQueryExec::new(
@@ -2186,10 +2180,7 @@ impl Scanner {
21862180
)
21872181
};
21882182
if let Some(refine_expr) = &filter_plan.refine_expr {
2189-
let planner = Planner::new(plan.schema());
2190-
let physical_refine_expr = planner.create_physical_expr(refine_expr)?;
2191-
2192-
plan = Arc::new(FilterExec::try_new(physical_refine_expr, plan)?);
2183+
plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?);
21932184
}
21942185
Ok(self.flat_knn(plan, q)?)
21952186
}
@@ -2254,9 +2245,7 @@ impl Scanner {
22542245

22552246
if let Some(expr) = filter_plan.full_expr.as_ref() {
22562247
// If there is a prefilter we need to manually apply it to the new data
2257-
let planner = Planner::new(scan_node.schema());
2258-
let physical_refine_expr = planner.create_physical_expr(expr)?;
2259-
scan_node = Arc::new(FilterExec::try_new(physical_refine_expr, scan_node)?);
2248+
scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
22602249
}
22612250
// first we do flat search on just the new data
22622251
let topk_appended = self.flat_knn(scan_node, &q)?;
@@ -2429,9 +2418,7 @@ impl Scanner {
24292418
if let Some(post_take_filter) = post_take_filter {
24302419
let planner = Planner::new(plan.schema());
24312420
let optimized_filter = planner.optimize_expr(post_take_filter)?;
2432-
let physical_refine_expr = planner.create_physical_expr(&optimized_filter)?;
2433-
2434-
plan = Arc::new(FilterExec::try_new(physical_refine_expr, plan)?);
2421+
plan = Arc::new(LanceFilterExec::try_new(optimized_filter, plan)?);
24352422
}
24362423

24372424
if self.with_row_address {
@@ -2459,7 +2446,6 @@ impl Scanner {
24592446
let scan_arrow_schema = Arc::new(scan_schema.as_ref().into());
24602447
let planner = Planner::new(scan_arrow_schema);
24612448
let optimized_filter = planner.optimize_expr(filter.clone())?;
2462-
let physical_refine_expr = planner.create_physical_expr(&optimized_filter)?;
24632449

24642450
let new_data_scan = self.scan_fragments(
24652451
true,
@@ -2471,7 +2457,7 @@ impl Scanner {
24712457
None,
24722458
false,
24732459
);
2474-
let filtered = Arc::new(FilterExec::try_new(physical_refine_expr, new_data_scan)?);
2460+
let filtered = Arc::new(LanceFilterExec::try_new(optimized_filter, new_data_scan)?);
24752461
Some(Arc::new(project(filtered, plan.schema().as_ref())?))
24762462
} else {
24772463
None
@@ -2599,45 +2585,43 @@ impl Scanner {
25992585
q.metric_type,
26002586
)?);
26012587

2602-
// filter out elements out of distance range
2603-
let lower_bound_expr = q
2588+
let lower: Option<(Expr, Arc<dyn PhysicalExpr>)> = q
26042589
.lower_bound
2605-
.map(|v| {
2606-
let lower_bound = expressions::lit(v);
2607-
expressions::binary(
2608-
expressions::col(DIST_COL, flat_dist.schema().as_ref())?,
2609-
Operator::GtEq,
2610-
lower_bound,
2611-
flat_dist.schema().as_ref(),
2612-
)
2590+
.map(|v| -> Result<(Expr, Arc<dyn PhysicalExpr>)> {
2591+
let logical = col(DIST_COL).gt_eq(lit(v));
2592+
let schema = flat_dist.schema();
2593+
let df_schema = DFSchema::try_from(schema)?;
2594+
let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
2595+
Ok::<(Expr, Arc<dyn PhysicalExpr>), _>((logical, physical))
26132596
})
26142597
.transpose()?;
2615-
let upper_bound_expr = q
2598+
2599+
let upper = q
26162600
.upper_bound
2617-
.map(|v| {
2618-
let upper_bound = expressions::lit(v);
2619-
expressions::binary(
2620-
expressions::col(DIST_COL, flat_dist.schema().as_ref())?,
2621-
Operator::Lt,
2622-
upper_bound,
2623-
flat_dist.schema().as_ref(),
2624-
)
2601+
.map(|v| -> Result<(Expr, Arc<dyn PhysicalExpr>)> {
2602+
let logical = col(DIST_COL).lt(lit(v));
2603+
let schema = flat_dist.schema();
2604+
let df_schema = DFSchema::try_from(schema)?;
2605+
let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
2606+
Ok::<(Expr, Arc<dyn PhysicalExpr>), _>((logical, physical))
26252607
})
26262608
.transpose()?;
2627-
let filter_expr = match (lower_bound_expr, upper_bound_expr) {
2628-
(Some(lower), Some(upper)) => Some(expressions::binary(
2629-
lower,
2630-
Operator::And,
2631-
upper,
2632-
flat_dist.schema().as_ref(),
2633-
)?),
2634-
(Some(lower), None) => Some(lower),
2635-
(None, Some(upper)) => Some(upper),
2609+
2610+
let filter_expr = match (lower, upper) {
2611+
(Some((llog, _)), Some((ulog, _))) => {
2612+
let logical = llog.and(ulog);
2613+
let schema = flat_dist.schema();
2614+
let df_schema = DFSchema::try_from(schema)?;
2615+
let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
2616+
Some((logical, physical))
2617+
}
2618+
(Some((llog, lphys)), None) => Some((llog, lphys)),
2619+
(None, Some((ulog, uphys))) => Some((ulog, uphys)),
26362620
(None, None) => None,
26372621
};
26382622

26392623
let knn_plan: Arc<dyn ExecutionPlan> = if let Some(filter_expr) = filter_expr {
2640-
Arc::new(FilterExec::try_new(filter_expr, flat_dist)?)
2624+
Arc::new(LanceFilterExec::try_new(filter_expr.0, flat_dist)?)
26412625
} else {
26422626
flat_dist
26432627
};
@@ -2655,12 +2639,10 @@ impl Scanner {
26552639
)
26562640
.with_fetch(Some(q.k));
26572641

2658-
let not_nulls = FilterExec::try_new(
2659-
expressions::is_not_null(expressions::col(DIST_COL, sort.schema().as_ref())?)?,
2660-
Arc::new(sort),
2661-
)?;
2642+
let logical_not_null = col(DIST_COL).is_not_null();
2643+
let not_nulls = Arc::new(LanceFilterExec::try_new(logical_not_null, Arc::new(sort))?);
26622644

2663-
Ok(Arc::new(not_nulls))
2645+
Ok(not_nulls)
26642646
}
26652647

26662648
fn get_fragments_as_bitmap(&self) -> RoaringBitmap {
@@ -2826,10 +2808,8 @@ impl Scanner {
28262808
let columns_in_filter = Planner::column_names_in_expr(refine_expr);
28272809
let filter_schema = Arc::new(self.dataset.schema().project(&columns_in_filter)?);
28282810
let filter_input = self.scan(true, false, true, None, filter_schema);
2829-
let planner = Planner::new(filter_input.schema());
2830-
let physical_refine_expr = planner.create_physical_expr(refine_expr)?;
28312811
let filtered_row_ids =
2832-
Arc::new(FilterExec::try_new(physical_refine_expr, filter_input)?);
2812+
Arc::new(LanceFilterExec::try_new(refine_expr.clone(), filter_input)?);
28332813
PreFilterSource::FilteredRowIds(filtered_row_ids)
28342814
}
28352815
// No prefilter

rust/lance/src/io/exec.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
//!
66
//! WARNING: Internal API with no stability guarantees.
77
8+
mod filter;
89
pub mod filtered_read;
910
pub mod fts;
1011
pub(crate) mod knn;
@@ -19,6 +20,7 @@ mod take;
1920
pub mod testing;
2021
pub mod utils;
2122

23+
pub use filter::LanceFilterExec;
2224
pub use knn::{ANNIvfPartitionExec, ANNIvfSubIndexExec, KNNVectorDistanceExec};
2325
pub use lance_datafusion::planner::Planner;
2426
pub use lance_index::scalar::expression::FilterPlan;

rust/lance/src/io/exec/filter.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
use std::sync::Arc;
5+
6+
use datafusion::{execution::TaskContext, logical_expr::Expr};
7+
use datafusion_physical_plan::{
8+
filter::FilterExec, metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
9+
PlanProperties, SendableRecordBatchStream, Statistics,
10+
};
11+
use lance_core::{error::DataFusionResult, Result};
12+
use lance_datafusion::planner::Planner;
13+
14+
#[derive(Debug)]
15+
// LanceFilterExec is a wrapper around FilterExec that includes the original
16+
// expression for the filter node. In comparison to a FilterExec, this makes it
17+
// possible for an optimization rule to serialize the filter to substrait and
18+
// send it to a remote worker.
19+
pub struct LanceFilterExec {
20+
expr: Expr,
21+
pub filter: Arc<FilterExec>,
22+
}
23+
24+
impl DisplayAs for LanceFilterExec {
25+
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
26+
self.filter.fmt_as(t, f)
27+
}
28+
}
29+
30+
impl LanceFilterExec {
31+
pub fn try_new(expr: Expr, input: Arc<dyn ExecutionPlan>) -> Result<Self> {
32+
let planner = Planner::new(input.schema());
33+
let predicate = planner.create_physical_expr(&expr)?;
34+
let filter_exec = FilterExec::try_new(predicate.clone(), input)?;
35+
Ok(Self {
36+
expr,
37+
filter: Arc::new(filter_exec),
38+
})
39+
}
40+
41+
pub fn expr(&self) -> &Expr {
42+
&self.expr
43+
}
44+
}
45+
46+
impl ExecutionPlan for LanceFilterExec {
47+
fn name(&self) -> &str {
48+
"LanceFilterExec"
49+
}
50+
51+
fn as_any(&self) -> &dyn std::any::Any {
52+
self
53+
}
54+
55+
fn properties(&self) -> &PlanProperties {
56+
self.filter.properties()
57+
}
58+
59+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
60+
self.filter.children()
61+
}
62+
63+
fn maintains_input_order(&self) -> Vec<bool> {
64+
self.filter.maintains_input_order()
65+
}
66+
67+
fn with_new_children(
68+
self: Arc<Self>,
69+
children: Vec<Arc<dyn ExecutionPlan>>,
70+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
71+
self.filter.clone().with_new_children(children)
72+
}
73+
74+
fn execute(
75+
&self,
76+
partition: usize,
77+
context: Arc<TaskContext>,
78+
) -> DataFusionResult<SendableRecordBatchStream> {
79+
self.filter.execute(partition, context)
80+
}
81+
82+
fn metrics(&self) -> Option<MetricsSet> {
83+
self.filter.metrics()
84+
}
85+
86+
fn statistics(&self) -> DataFusionResult<Statistics> {
87+
self.filter.statistics()
88+
}
89+
90+
fn cardinality_effect(&self) -> datafusion_physical_plan::execution_plan::CardinalityEffect {
91+
self.filter.cardinality_effect()
92+
}
93+
94+
fn try_swapping_with_projection(
95+
&self,
96+
projection: &datafusion_physical_plan::projection::ProjectionExec,
97+
) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
98+
self.filter.try_swapping_with_projection(projection)
99+
}
100+
}

rust/lance/src/io/exec/knn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ pub struct KNNVectorDistanceExec {
111111

112112
/// The vector query to execute.
113113
pub query: ArrayRef,
114-
column: String,
115-
distance_type: DistanceType,
114+
pub column: String,
115+
pub distance_type: DistanceType,
116116

117117
output_schema: SchemaRef,
118118
properties: PlanProperties,

0 commit comments

Comments
 (0)