Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,13 +724,171 @@ pub fn is_volatile(expr: &Arc<dyn PhysicalExpr>) -> bool {
is_volatile
}

/// A transparent wrapper that marks a [`PhysicalExpr`] as *optional* — i.e.,
/// droppable without affecting query correctness.
///
/// This is used for filters that are performance hints (e.g., dynamic join
/// filters) as opposed to mandatory predicates. The selectivity tracker can
/// detect this wrapper via `expr.as_any().downcast_ref::<OptionalFilterPhysicalExpr>()`
/// and choose to drop the filter entirely when it is not cost-effective.
///
/// All [`PhysicalExpr`] methods are delegated to the wrapped inner expression.
///
/// Currently used by `HashJoinExec` for dynamic join filters. When the
/// selectivity tracker drops such a filter, the join still enforces
/// correctness independently — "dropped" simply means the filter is never
/// applied as a scan-time optimization.
#[derive(Debug)]
pub struct OptionalFilterPhysicalExpr {
inner: Arc<dyn PhysicalExpr>,
}

impl OptionalFilterPhysicalExpr {
/// Create a new optional filter wrapping the given expression.
pub fn new(inner: Arc<dyn PhysicalExpr>) -> Self {
Self { inner }
}

/// Returns a clone of the inner (unwrapped) expression.
pub fn inner(&self) -> Arc<dyn PhysicalExpr> {
Arc::clone(&self.inner)
}
}

impl Display for OptionalFilterPhysicalExpr {
/// Pass through to the inner expression. Surfacing the `Optional(..)`
/// wrapper in plan output would require updating dozens of sqllogictest
/// baselines for what is purely a runtime concept (the adaptive
/// scheduler's permission to drop this filter); plan readers don't need
/// to see it.
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.inner)
}
}

impl PartialEq for OptionalFilterPhysicalExpr {
fn eq(&self, other: &Self) -> bool {
self.inner.as_ref() == other.inner.as_ref()
}
}

impl Eq for OptionalFilterPhysicalExpr {}

impl Hash for OptionalFilterPhysicalExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.inner.as_ref().hash(state);
}
}

impl PhysicalExpr for OptionalFilterPhysicalExpr {
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.inner.data_type(input_schema)
}

fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.inner.nullable(input_schema)
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
self.inner.evaluate(batch)
}

fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
self.inner.return_field(input_schema)
}

fn evaluate_selection(
&self,
batch: &RecordBatch,
selection: &BooleanArray,
) -> Result<ColumnarValue> {
self.inner.evaluate_selection(batch, selection)
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.inner]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
assert_eq_or_internal_err!(
children.len(),
1,
"OptionalFilterPhysicalExpr: expected 1 child"
);
Ok(Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(
&children[0],
))))
}

fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
self.inner.evaluate_bounds(children)
}

fn propagate_constraints(
&self,
interval: &Interval,
children: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
self.inner.propagate_constraints(interval, children)
}

#[expect(deprecated)]
fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
self.inner.evaluate_statistics(children)
}

#[expect(deprecated)]
fn propagate_statistics(
&self,
parent: &Distribution,
children: &[&Distribution],
) -> Result<Option<Vec<Distribution>>> {
self.inner.propagate_statistics(parent, children)
}

fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
self.inner.get_properties(children)
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.inner.fmt_sql(f)
}

fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
// Always unwrap the Optional wrapper for snapshot consumers (e.g. PruningPredicate).
// If inner has a snapshot, use it; otherwise return the inner directly.
Ok(Some(match self.inner.snapshot()? {
Some(snap) => snap,
None => Arc::clone(&self.inner),
}))
}

fn snapshot_generation(&self) -> u64 {
// The wrapper itself is not dynamic; tree-walking picks up
// inner's generation via children().
0
}

fn is_volatile_node(&self) -> bool {
self.inner.is_volatile_node()
}

fn placement(&self) -> ExpressionPlacement {
self.inner.placement()
}
}

#[cfg(test)]
mod test {
use crate::physical_expr::PhysicalExpr;
use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Schema};
use datafusion_expr_common::columnar_value::ColumnarValue;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

#[derive(Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -905,4 +1063,104 @@ mod test {
&BooleanArray::from(vec![true; 5]),
);
}

#[test]
fn test_optional_filter_downcast() {
use super::OptionalFilterPhysicalExpr;

let inner: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let optional = Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(&inner)));

// Can downcast to detect the wrapper
let as_physical: Arc<dyn PhysicalExpr> = optional;
assert!(
as_physical
.downcast_ref::<OptionalFilterPhysicalExpr>()
.is_some()
);

// Inner expr is NOT detectable as optional
assert!(inner.downcast_ref::<OptionalFilterPhysicalExpr>().is_none());
}

#[test]
fn test_optional_filter_delegates_evaluate() {
use super::OptionalFilterPhysicalExpr;

let inner: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let optional = OptionalFilterPhysicalExpr::new(Arc::clone(&inner));

let batch =
unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 5) };
let result = optional.evaluate(&batch).unwrap();
let array = result.to_array(5).unwrap();
assert_eq!(array.len(), 5);
}

#[test]
fn test_optional_filter_children_and_with_new_children() {
use super::OptionalFilterPhysicalExpr;

let inner: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let optional = Arc::new(OptionalFilterPhysicalExpr::new(Arc::clone(&inner)));

// children() returns the inner
let children = optional.children();
assert_eq!(children.len(), 1);

// with_new_children preserves the wrapper
let new_inner: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let rewrapped = Arc::clone(&optional)
.with_new_children(vec![new_inner])
.unwrap();
assert!(
rewrapped
.downcast_ref::<OptionalFilterPhysicalExpr>()
.is_some()
);
}

#[test]
fn test_optional_filter_inner() {
use super::OptionalFilterPhysicalExpr;

let inner: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let optional = OptionalFilterPhysicalExpr::new(Arc::clone(&inner));

// inner() returns a clone of the wrapped expression
let unwrapped = optional.inner();
assert!(unwrapped.downcast_ref::<TestExpr>().is_some());
}

#[test]
fn test_optional_filter_snapshot_generation_zero() {
use super::OptionalFilterPhysicalExpr;

let inner: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let optional = OptionalFilterPhysicalExpr::new(inner);

assert_eq!(optional.snapshot_generation(), 0);
}

#[test]
fn test_optional_filter_eq_hash() {
use super::OptionalFilterPhysicalExpr;
use std::collections::hash_map::DefaultHasher;

let inner1: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
let inner2: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});

let opt1 = OptionalFilterPhysicalExpr::new(inner1);
let opt2 = OptionalFilterPhysicalExpr::new(inner2);

// Same inner type → equal
assert_eq!(opt1, opt2);

// Same hash
let mut h1 = DefaultHasher::new();
let mut h2 = DefaultHasher::new();
opt1.hash(&mut h1);
opt2.hash(&mut h2);
assert_eq!(h1.finish(), h2.finish());
}
}
6 changes: 6 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,8 @@ message PhysicalExprNode {
PhysicalScalarSubqueryExprNode scalar_subquery = 22;

PhysicalDynamicFilterNode dynamic_filter = 23;

PhysicalOptionalFilterNode optional_filter = 24;
}
}

Expand All @@ -947,6 +949,10 @@ message PhysicalDynamicFilterNode {
bool is_complete = 5;
}

message PhysicalOptionalFilterNode {
PhysicalExprNode inner = 1;
}

message PhysicalScalarUdfNode {
string name = 1;
repeated PhysicalExprNode args = 2;
Expand Down
Loading
Loading