Skip to content

Commit 988ae5d

Browse files
physical-expr-common: remote PhysicalExpr::snapshot method
See #21807 `PhysicalExpr::snapshot` isn't required because it only applies to dynamic filters. The only place that its used is in `pruning_predicate.rs`. Since there's only one use case, we can just downcast to `DynamicFilterPhysicalExpr` and call `current()`. Remove `PhysicalExpr::snapshot` and associated code. Yes, the existing tests should cover the exact same functionality. Filter pushdown still workers with dynamic filters; we use `current()` instead of `snapshot()` to capture the expression. Yes. `PhysicalExpr::snapshot()` no longer exists. Users should downcast to `DynamicFilterPhysicalExpr` and call `current()` intead of calling `snapshot()`. Similarly, in `datafusion/ffi`, `FFI_PhysicalExpr` no longer has a `snapshot()` method.
1 parent ba038e9 commit 988ae5d

5 files changed

Lines changed: 37 additions & 156 deletions

File tree

datafusion/ffi/src/physical_expr/mod.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ pub struct FFI_PhysicalExpr {
112112

113113
pub fmt_sql: unsafe extern "C" fn(&Self) -> FFI_Result<SString>,
114114

115-
pub snapshot: unsafe extern "C" fn(&Self) -> FFI_Result<FFI_Option<FFI_PhysicalExpr>>,
116-
117115
pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64,
118116

119117
pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool,
@@ -364,16 +362,6 @@ unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_Result<S
364362
FFI_Result::Ok(result.into())
365363
}
366364

367-
unsafe extern "C" fn snapshot_fn_wrapper(
368-
expr: &FFI_PhysicalExpr,
369-
) -> FFI_Result<FFI_Option<FFI_PhysicalExpr>> {
370-
let expr = expr.inner();
371-
sresult!(
372-
expr.snapshot()
373-
.map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into())
374-
)
375-
}
376-
377365
unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
378366
let expr = expr.inner();
379367
expr.snapshot_generation()
@@ -427,7 +415,6 @@ unsafe extern "C" fn clone_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_PhysicalEx
427415
propagate_statistics: propagate_statistics_fn_wrapper,
428416
get_properties: get_properties_fn_wrapper,
429417
fmt_sql: fmt_sql_fn_wrapper,
430-
snapshot: snapshot_fn_wrapper,
431418
snapshot_generation: snapshot_generation_fn_wrapper,
432419
is_volatile_node: is_volatile_node_fn_wrapper,
433420
display: display_fn_wrapper,
@@ -470,7 +457,6 @@ impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
470457
propagate_statistics: propagate_statistics_fn_wrapper,
471458
get_properties: get_properties_fn_wrapper,
472459
fmt_sql: fmt_sql_fn_wrapper,
473-
snapshot: snapshot_fn_wrapper,
474460
snapshot_generation: snapshot_generation_fn_wrapper,
475461
is_volatile_node: is_volatile_node_fn_wrapper,
476462
display: display_fn_wrapper,
@@ -691,15 +677,6 @@ impl PhysicalExpr for ForeignPhysicalExpr {
691677
}
692678
}
693679

694-
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
695-
unsafe {
696-
let result = df_result!((self.expr.snapshot)(&self.expr))?;
697-
Ok(result
698-
.map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
699-
.into())
700-
}
701-
}
702-
703680
fn snapshot_generation(&self) -> u64 {
704681
unsafe { (self.expr.snapshot_generation)(&self.expr) }
705682
}
@@ -940,19 +917,13 @@ mod tests {
940917
}
941918

942919
#[test]
943-
fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> {
920+
fn ffi_physical_expr_snapshot_generation() {
944921
let (original, foreign_expr) = create_test_expr();
945922

946-
let left = original.snapshot()?;
947-
let right = foreign_expr.snapshot()?;
948-
assert_eq!(left, right);
949-
950923
assert_eq!(
951924
original.snapshot_generation(),
952925
foreign_expr.snapshot_generation()
953926
);
954-
955-
Ok(())
956927
}
957928

958929
#[test]

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 1 addition & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ use arrow::array::{Array, ArrayRef, BooleanArray, new_empty_array};
2727
use arrow::compute::filter_record_batch;
2828
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
2929
use arrow::record_batch::RecordBatch;
30-
use datafusion_common::tree_node::{
31-
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32-
};
30+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
3331
use datafusion_common::{
3432
Result, ScalarValue, assert_eq_or_internal_err, exec_err, not_impl_err,
3533
};
@@ -355,55 +353,6 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
355353
/// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
356354
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
357355

358-
/// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
359-
///
360-
/// "Dynamic" in this case means containing references to structures that may change
361-
/// during plan execution, such as hash tables.
362-
///
363-
/// This method is used to capture the current state of `PhysicalExpr`s that may contain
364-
/// dynamic references to other operators in order to serialize it over the wire
365-
/// or treat it via downcast matching.
366-
///
367-
/// You should not call this method directly as it does not handle recursion.
368-
/// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
369-
/// full state of the `PhysicalExpr`.
370-
///
371-
/// This is expected to return "simple" expressions that do not have mutable state
372-
/// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
373-
/// Callers however should *not* assume anything about the returned expressions
374-
/// since callers and implementers may not agree on what "simple" or "built-in"
375-
/// means.
376-
/// In other words, if you need to serialize a `PhysicalExpr` across the wire
377-
/// you should call this method and then try to serialize the result,
378-
/// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
379-
/// just as if you had not called this method at all.
380-
///
381-
/// In particular, consider:
382-
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
383-
/// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
384-
/// This function may return something like `a >= 12`.
385-
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
386-
/// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
387-
/// This function may return something like `t2.b IN (1, 5, 7)`.
388-
///
389-
/// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
390-
/// or needs to serialize this state to bytes may not be able to handle these dynamic references.
391-
/// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
392-
/// contain these dynamic references.
393-
///
394-
/// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
395-
/// and send it across the wire to a remote executor may want to call this method after
396-
/// every batch on the source side and broadcast / update the current snapshot to the remote executor.
397-
///
398-
/// Note for implementers: this method should *not* handle recursion.
399-
/// Recursion is handled in [`snapshot_physical_expr`].
400-
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
401-
// By default, we return None to indicate that this PhysicalExpr does not
402-
// have any dynamic references or state.
403-
// This is a safe default behavior.
404-
Ok(None)
405-
}
406-
407356
/// Returns the generation of this `PhysicalExpr` for snapshotting purposes.
408357
/// The generation is an arbitrary u64 that can be used to track changes
409358
/// in the state of the `PhysicalExpr` over time without having to do an exhaustive comparison.
@@ -616,50 +565,6 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
616565
Wrapper { expr }
617566
}
618567

619-
/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
620-
///
621-
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
622-
/// This is used to capture the current state of `PhysicalExpr`s that may contain
623-
/// dynamic references to other operators in order to serialize it over the wire
624-
/// or treat it via downcast matching.
625-
///
626-
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
627-
///
628-
/// # Returns
629-
///
630-
/// Returns a snapshot of the `PhysicalExpr` if it is dynamic, otherwise
631-
/// returns itself.
632-
pub fn snapshot_physical_expr(
633-
expr: Arc<dyn PhysicalExpr>,
634-
) -> Result<Arc<dyn PhysicalExpr>> {
635-
snapshot_physical_expr_opt(expr).data()
636-
}
637-
638-
/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
639-
///
640-
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
641-
/// This is used to capture the current state of `PhysicalExpr`s that may contain
642-
/// dynamic references to other operators in order to serialize it over the wire
643-
/// or treat it via downcast matching.
644-
///
645-
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
646-
///
647-
/// # Returns
648-
///
649-
/// Returns a `[`Transformed`] indicating whether a snapshot was taken,
650-
/// along with the resulting `PhysicalExpr`.
651-
pub fn snapshot_physical_expr_opt(
652-
expr: Arc<dyn PhysicalExpr>,
653-
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
654-
expr.transform_up(|e| {
655-
if let Some(snapshot) = e.snapshot()? {
656-
Ok(Transformed::yes(snapshot))
657-
} else {
658-
Ok(Transformed::no(Arc::clone(&e)))
659-
}
660-
})
661-
}
662-
663568
/// Check the generation of this `PhysicalExpr`.
664569
/// Dynamic `PhysicalExpr`s may have a generation that is incremented
665570
/// every time the state of the `PhysicalExpr` changes.

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -525,11 +525,6 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
525525
self.render(f, |expr, f| expr.fmt_sql(f))
526526
}
527527

528-
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
529-
// Return the current expression as a snapshot.
530-
Ok(Some(self.current()?))
531-
}
532-
533528
fn snapshot_generation(&self) -> u64 {
534529
// Return the current generation of the expression.
535530
self.inner.read().generation
@@ -611,14 +606,20 @@ mod test {
611606
&filter_schema_1,
612607
)
613608
.unwrap();
614-
let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
609+
let dynamic_filter_1_df = dynamic_filter_1
610+
.downcast_ref::<DynamicFilterPhysicalExpr>()
611+
.expect("Expected DynamicFilterPhysicalExpr");
612+
let snap = dynamic_filter_1_df.current().unwrap();
615613
insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
616614
let dynamic_filter_2 = reassign_expr_columns(
617615
Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
618616
&filter_schema_2,
619617
)
620618
.unwrap();
621-
let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
619+
let dynamic_filter_2_df = dynamic_filter_2
620+
.downcast_ref::<DynamicFilterPhysicalExpr>()
621+
.expect("Expected DynamicFilterPhysicalExpr");
622+
let snap = dynamic_filter_2_df.current().unwrap();
622623
insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
623624
// Both filters allow evaluating the same expression
624625
let batch_1 = RecordBatch::try_new(
@@ -684,20 +685,17 @@ mod test {
684685
}
685686

686687
#[test]
687-
fn test_snapshot() {
688+
fn test_current() {
688689
let expr = lit(42) as Arc<dyn PhysicalExpr>;
689690
let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));
690691

691-
// Take a snapshot of the current expression
692-
let snapshot = dynamic_filter.snapshot().unwrap();
693-
assert_eq!(snapshot, Some(expr));
692+
// Read the current expression.
693+
assert_eq!(&dynamic_filter.current().unwrap(), &expr);
694694

695-
// Update the current expression
695+
// Update the current expression.
696696
let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
697697
dynamic_filter.update(Arc::clone(&new_expr)).unwrap();
698-
// Take another snapshot
699-
let snapshot = dynamic_filter.snapshot().unwrap();
700-
assert_eq!(snapshot, Some(new_expr));
698+
assert_eq!(&dynamic_filter.current().unwrap(), &new_expr);
701699
}
702700

703701
#[test]

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,10 @@ use itertools::{Itertools, izip};
326326
/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327327
/// and internally maintains a reference to the hash table or other state.
328328
///
329-
/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330-
/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331-
/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
329+
/// To make working with these sorts of dynamic filters more tractable callers can downcast a
330+
/// [`PhysicalExpr`] to [`DynamicFilterPhysicalExpr`] and call `current()` to get a snapshot
331+
/// of the expression (ie. a a static filter expression). For a join this could mean converting
332+
/// it to an `InList` filter or a min/max filter for example.
332333
/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333334
///
334335
/// # Example: Push TopK filters into Scans
@@ -376,7 +377,7 @@ use itertools::{Itertools, izip};
376377
/// <https://github.com/apache/datafusion/issues/15037>
377378
///
378379
/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379-
/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380+
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
380381
/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381382
/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382383
/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec

datafusion/pruning/src/pruning_predicate.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ use datafusion_common::{
4242
tree_node::{Transformed, TreeNode},
4343
};
4444
use datafusion_expr_common::operator::Operator;
45+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
4546
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
4647
use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
47-
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
4848
use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
4949

5050
/// Used to prove that arbitrary predicates (boolean expression) can not
@@ -456,22 +456,28 @@ impl PruningPredicate {
456456
/// details.
457457
///
458458
/// Note that `PruningPredicate` does not attempt to normalize or simplify
459-
/// the input expression unless calling [`snapshot_physical_expr_opt`]
460-
/// returns a new expression.
459+
/// the input expression unless any [`DynamicFilterPhysicalExpr`] nodes
460+
/// are replaced with their current value.
461461
/// It is recommended that you pass the expressions through [`PhysicalExprSimplifier`]
462462
/// before calling this method to make sure the expressions can be used for pruning.
463463
pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
464-
// Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`.
465-
// In particular this unravels any `DynamicFilterPhysicalExpr`s by snapshotting them
466-
// so that PruningPredicate can work with a static expression.
467-
let tf = snapshot_physical_expr_opt(expr)?;
464+
// Replace any `DynamicFilterPhysicalExpr` nodes with their current value so
465+
// that `PruningPredicate` can work with a static expression.
466+
let tf = expr.transform_up(|e| {
467+
if let Some(df) = e.downcast_ref::<DynamicFilterPhysicalExpr>() {
468+
Ok(Transformed::yes(df.current()?))
469+
} else {
470+
Ok(Transformed::no(e))
471+
}
472+
})?;
468473
if tf.transformed {
469474
// If we had an expression such as Dynamic(part_col < 5 and col < 10)
470475
// (this could come from something like `select * from t order by part_col, col, limit 10`)
471-
// after snapshotting and because `DynamicFilterPhysicalExpr` applies child replacements to its
472-
// children after snapshotting and previously `replace_columns_with_literals` may have been called with partition values
473-
// the expression we have now is `8 < 5 and col < 10`.
474-
// Thus we need as simplifier pass to get `false and col < 10` => `false` here.
476+
// after replacing the dynamic filter with its current value and because
477+
// `DynamicFilterPhysicalExpr` applies child replacements to its children,
478+
// and previously `replace_columns_with_literals` may have been called with
479+
// partition values, the expression we have now is `8 < 5 and col < 10`.
480+
// Thus we need a simplifier pass to get `false and col < 10` => `false` here.
475481
let simplifier = PhysicalExprSimplifier::new(&schema);
476482
expr = simplifier.simplify(tf.data)?;
477483
} else {

0 commit comments

Comments
 (0)