Skip to content

Commit 8e9e6c4

Browse files
physical-expr-common: deprecate PhysicalExpr::snapshot method
Which issue does this PR close? See #21807 Rationale for this change `PhysicalExpr::snapshot` isn't required because it only applies to dynamic filters. The only place that its used is in `pruning_predicate.rs`. Since there's only one use case, we can just downcast to `DynamicFilterPhysicalExpr` and call `current()`. What changes are included in this PR? This change marks `PhysicalExpr::snapshot`, `snapshot_physical_expr`, and `snapshot_physical_expr_opt` as deprecated. Callers should downcast to `DynamicFilterPhysicalExpr` and call `current()`. Are these changes tested? Yes, the existing tests should cover the exact same functionality. Filter pushdown still workers with dynamic filters; we use `current()` instead of `snapshot()` to capture the expression. Are there any user-facing changes? `PhysicalExpr::snapshot`, `snapshot_physical_expr`, and `snapshot_physical_expr_opt` are deprecated and should not be used.
1 parent ba038e9 commit 8e9e6c4

5 files changed

Lines changed: 47 additions & 85 deletions

File tree

datafusion/ffi/src/physical_expr/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,8 @@ mod tests {
939939
assert_eq!(left, right);
940940
}
941941

942+
/// The snapshot API is deprecated.
943+
#[allow(deprecated)]
942944
#[test]
943945
fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> {
944946
let (original, foreign_expr) = create_test_expr();

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 14 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -355,48 +355,10 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
355355
/// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
356356
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
357357

358-
/// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
359-
///
360-
/// "Dynamic" in this case means containing references to structures that may change
361-
/// during plan execution, such as hash tables.
362-
///
363-
/// This method is used to capture the current state of `PhysicalExpr`s that may contain
364-
/// dynamic references to other operators in order to serialize it over the wire
365-
/// or treat it via downcast matching.
366-
///
367-
/// You should not call this method directly as it does not handle recursion.
368-
/// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
369-
/// full state of the `PhysicalExpr`.
370-
///
371-
/// This is expected to return "simple" expressions that do not have mutable state
372-
/// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
373-
/// Callers however should *not* assume anything about the returned expressions
374-
/// since callers and implementers may not agree on what "simple" or "built-in"
375-
/// means.
376-
/// In other words, if you need to serialize a `PhysicalExpr` across the wire
377-
/// you should call this method and then try to serialize the result,
378-
/// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
379-
/// just as if you had not called this method at all.
380-
///
381-
/// In particular, consider:
382-
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
383-
/// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
384-
/// This function may return something like `a >= 12`.
385-
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
386-
/// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
387-
/// This function may return something like `t2.b IN (1, 5, 7)`.
388-
///
389-
/// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
390-
/// or needs to serialize this state to bytes may not be able to handle these dynamic references.
391-
/// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
392-
/// contain these dynamic references.
393-
///
394-
/// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
395-
/// and send it across the wire to a remote executor may want to call this method after
396-
/// every batch on the source side and broadcast / update the current snapshot to the remote executor.
397-
///
398-
/// Note for implementers: this method should *not* handle recursion.
399-
/// Recursion is handled in [`snapshot_physical_expr`].
358+
#[deprecated(
359+
since = "54.0.0",
360+
note = "downcast to `DynamicFilterPhysicalExpr` and call `current()` instead"
361+
)]
400362
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
401363
// By default, we return None to indicate that this PhysicalExpr does not
402364
// have any dynamic references or state.
@@ -616,38 +578,22 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
616578
Wrapper { expr }
617579
}
618580

619-
/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
620-
///
621-
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
622-
/// This is used to capture the current state of `PhysicalExpr`s that may contain
623-
/// dynamic references to other operators in order to serialize it over the wire
624-
/// or treat it via downcast matching.
625-
///
626-
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
627-
///
628-
/// # Returns
629-
///
630-
/// Returns a snapshot of the `PhysicalExpr` if it is dynamic, otherwise
631-
/// returns itself.
581+
#[deprecated(
582+
since = "54.0.0",
583+
note = "downcast to `DynamicFilterPhysicalExpr` and call `current()` instead"
584+
)]
585+
#[allow(deprecated)]
632586
pub fn snapshot_physical_expr(
633587
expr: Arc<dyn PhysicalExpr>,
634588
) -> Result<Arc<dyn PhysicalExpr>> {
635589
snapshot_physical_expr_opt(expr).data()
636590
}
637591

638-
/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
639-
///
640-
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
641-
/// This is used to capture the current state of `PhysicalExpr`s that may contain
642-
/// dynamic references to other operators in order to serialize it over the wire
643-
/// or treat it via downcast matching.
644-
///
645-
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
646-
///
647-
/// # Returns
648-
///
649-
/// Returns a `[`Transformed`] indicating whether a snapshot was taken,
650-
/// along with the resulting `PhysicalExpr`.
592+
#[deprecated(
593+
since = "54.0.0",
594+
note = "downcast to `DynamicFilterPhysicalExpr` and call `current()` instead"
595+
)]
596+
#[allow(deprecated)]
651597
pub fn snapshot_physical_expr_opt(
652598
expr: Arc<dyn PhysicalExpr>,
653599
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,20 @@ mod test {
611611
&filter_schema_1,
612612
)
613613
.unwrap();
614-
let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
614+
let dynamic_filter_1_df = dynamic_filter_1
615+
.downcast_ref::<DynamicFilterPhysicalExpr>()
616+
.expect("Expected DynamicFilterPhysicalExpr");
617+
let snap = dynamic_filter_1_df.current().unwrap();
615618
insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
616619
let dynamic_filter_2 = reassign_expr_columns(
617620
Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
618621
&filter_schema_2,
619622
)
620623
.unwrap();
621-
let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
624+
let dynamic_filter_2_df = dynamic_filter_2
625+
.downcast_ref::<DynamicFilterPhysicalExpr>()
626+
.expect("Expected DynamicFilterPhysicalExpr");
627+
let snap = dynamic_filter_2_df.current().unwrap();
622628
insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
623629
// Both filters allow evaluating the same expression
624630
let batch_1 = RecordBatch::try_new(
@@ -683,6 +689,7 @@ mod test {
683689
assert!(arr_1.eq(&expected));
684690
}
685691

692+
#[allow(deprecated)]
686693
#[test]
687694
fn test_snapshot() {
688695
let expr = lit(42) as Arc<dyn PhysicalExpr>;

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,10 @@ use itertools::{Itertools, izip};
326326
/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327327
/// and internally maintains a reference to the hash table or other state.
328328
///
329-
/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330-
/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331-
/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
329+
/// To make working with these sorts of dynamic filters more tractable callers can downcast a
330+
/// [`PhysicalExpr`] to [`DynamicFilterPhysicalExpr`] and call `current()` to get a snapshot
331+
/// of the expression (ie. a a static filter expression). For a join this could mean converting
332+
/// it to an `InList` filter or a min/max filter for example.
332333
/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333334
///
334335
/// # Example: Push TopK filters into Scans
@@ -376,7 +377,7 @@ use itertools::{Itertools, izip};
376377
/// <https://github.com/apache/datafusion/issues/15037>
377378
///
378379
/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379-
/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380+
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
380381
/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381382
/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382383
/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec

datafusion/pruning/src/pruning_predicate.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ use datafusion_common::{
4242
tree_node::{Transformed, TreeNode},
4343
};
4444
use datafusion_expr_common::operator::Operator;
45+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
4546
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
4647
use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
47-
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
4848
use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
4949

5050
/// Used to prove that arbitrary predicates (boolean expression) can not
@@ -456,22 +456,28 @@ impl PruningPredicate {
456456
/// details.
457457
///
458458
/// Note that `PruningPredicate` does not attempt to normalize or simplify
459-
/// the input expression unless calling [`snapshot_physical_expr_opt`]
460-
/// returns a new expression.
459+
/// the input expression unless any [`DynamicFilterPhysicalExpr`] nodes
460+
/// are replaced with their current value.
461461
/// It is recommended that you pass the expressions through [`PhysicalExprSimplifier`]
462462
/// before calling this method to make sure the expressions can be used for pruning.
463463
pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
464-
// Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`.
465-
// In particular this unravels any `DynamicFilterPhysicalExpr`s by snapshotting them
466-
// so that PruningPredicate can work with a static expression.
467-
let tf = snapshot_physical_expr_opt(expr)?;
464+
// Replace any `DynamicFilterPhysicalExpr` nodes with their current value so
465+
// that `PruningPredicate` can work with a static expression.
466+
let tf = expr.transform_up(|e| {
467+
if let Some(df) = e.downcast_ref::<DynamicFilterPhysicalExpr>() {
468+
Ok(Transformed::yes(df.current()?))
469+
} else {
470+
Ok(Transformed::no(e))
471+
}
472+
})?;
468473
if tf.transformed {
469474
// If we had an expression such as Dynamic(part_col < 5 and col < 10)
470475
// (this could come from something like `select * from t order by part_col, col, limit 10`)
471-
// after snapshotting and because `DynamicFilterPhysicalExpr` applies child replacements to its
472-
// children after snapshotting and previously `replace_columns_with_literals` may have been called with partition values
473-
// the expression we have now is `8 < 5 and col < 10`.
474-
// Thus we need as simplifier pass to get `false and col < 10` => `false` here.
476+
// after replacing the dynamic filter with its current value and because
477+
// `DynamicFilterPhysicalExpr` applies child replacements to its children,
478+
// and previously `replace_columns_with_literals` may have been called with
479+
// partition values, the expression we have now is `8 < 5 and col < 10`.
480+
// Thus we need a simplifier pass to get `false and col < 10` => `false` here.
475481
let simplifier = PhysicalExprSimplifier::new(&schema);
476482
expr = simplifier.simplify(tf.data)?;
477483
} else {

0 commit comments

Comments
 (0)