From 8c8ad2f2a6ba6faf0a2b08d1f3afa3d0804c3b17 Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 12:50:42 +0530 Subject: [PATCH 1/3] refactor: add try_to_proto / try_from_proto to DynamicFilterPhysicalExpr Part of #22434. Adds try_to_proto / try_from_proto to DynamicFilterPhysicalExpr so it participates in the expression-local serialization pattern introduced in #21929. The centralized arms in to_proto.rs / from_proto.rs remain as fallbacks for now. Cleanup of the pub-for-proto scaffolding (from_parts, inner, original_children, remapped_children) can follow once decode reads state directly via try_from_proto. --- .../src/expressions/dynamic_filters.rs | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 5b9de882160aa..ebe0f63c75c09 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -419,6 +419,114 @@ impl DynamicFilterPhysicalExpr { } } +#[cfg(feature = "proto")] +impl DynamicFilterPhysicalExpr { + /// Serialize this expression to protobuf. + /// + /// Encodes `children`, `remapped_children`, and the atomically-captured + /// `Inner` state (expression id, generation, current expr, is_complete). + pub fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> + { + use datafusion_proto_models::protobuf; + use datafusion_proto_models::protobuf::physical_expr_node::ExprType; + + let children = self + .children + .iter() + .map(|c| ctx.encode_child(c)) + .collect::>>()?; + + let remapped_children = match &self.remapped_children { + Some(remapped) => remapped + .iter() + .map(|c| ctx.encode_child(c)) + .collect::>>()?, + None => vec![], + }; + + let inner = self.inner.read().clone(); + let inner_expr = Box::new(ctx.encode_child(&inner.expr)?); + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: Some(inner.expression_id), + expr_type: Some(ExprType::DynamicFilter(Box::new( + protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: inner.generation, + inner_expr: Some(inner_expr), + is_complete: inner.is_complete, + }, + ))), + })) + } + + /// Reconstruct a [`DynamicFilterPhysicalExpr`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> + { + use datafusion_proto_models::protobuf::physical_expr_node::ExprType; + + let dynamic_filter = match &node.expr_type { + Some(ExprType::DynamicFilter(df)) => df.as_ref(), + _ => return datafusion_common::internal_err!( + "PhysicalExprNode is not a DynamicFilter" + ), + }; + + let expression_id = node.expr_id.ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \ + to be set by the serializer" + .to_string(), + ) + })?; + + let children = dynamic_filter + .children + .iter() + .map(|c| ctx.decode(c)) + .collect::>>()?; + + let remapped_children = if !dynamic_filter.remapped_children.is_empty() { + Some( + dynamic_filter + .remapped_children + .iter() + .map(|c| ctx.decode(c)) + .collect::>>()?, + ) + } else { + None + }; + + let inner_expr = ctx.decode( + dynamic_filter + .inner_expr + .as_deref() + .ok_or_else(|| datafusion_common::DataFusionError::Internal( + "DynamicFilterPhysicalExpr missing inner_expr".to_string(), + ))?, + )?; + + Ok(Arc::new(DynamicFilterPhysicalExpr::from_parts( + children, + remapped_children, + Inner { + expression_id, + generation: dynamic_filter.generation, + expr: inner_expr, + is_complete: dynamic_filter.is_complete, + }, + ))) + } +} + impl PhysicalExpr for DynamicFilterPhysicalExpr { fn children(&self) -> Vec<&Arc> { self.remapped_children From 952d16d8ae8384120bdbf1dd3944891b009d7d9b Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 14:10:12 +0530 Subject: [PATCH 2/3] style: apply cargo fmt --- .../src/expressions/dynamic_filters.rs | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index ebe0f63c75c09..cd48b926e1ff4 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -428,8 +428,7 @@ impl DynamicFilterPhysicalExpr { pub fn try_to_proto( &self, ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, - ) -> Result> - { + ) -> Result> { use datafusion_proto_models::protobuf; use datafusion_proto_models::protobuf::physical_expr_node::ExprType; @@ -465,18 +464,19 @@ impl DynamicFilterPhysicalExpr { } /// Reconstruct a [`DynamicFilterPhysicalExpr`] from its protobuf representation. - pub fn try_from_proto( + pub fn try_from_proto( node: &datafusion_proto_models::protobuf::PhysicalExprNode, ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, - ) -> Result> - { + ) -> Result> { use datafusion_proto_models::protobuf::physical_expr_node::ExprType; let dynamic_filter = match &node.expr_type { Some(ExprType::DynamicFilter(df)) => df.as_ref(), - _ => return datafusion_common::internal_err!( - "PhysicalExprNode is not a DynamicFilter" - ), + _ => { + return datafusion_common::internal_err!( + "PhysicalExprNode is not a DynamicFilter" + ); + } }; let expression_id = node.expr_id.ok_or_else(|| { @@ -505,14 +505,12 @@ impl DynamicFilterPhysicalExpr { None }; - let inner_expr = ctx.decode( - dynamic_filter - .inner_expr - .as_deref() - .ok_or_else(|| datafusion_common::DataFusionError::Internal( + let inner_expr = + ctx.decode(dynamic_filter.inner_expr.as_deref().ok_or_else(|| { + datafusion_common::DataFusionError::Internal( "DynamicFilterPhysicalExpr missing inner_expr".to_string(), - ))?, - )?; + ) + })?)?; Ok(Arc::new(DynamicFilterPhysicalExpr::from_parts( children, From 52b1786c4df2fea3ccd59e2c65984c51640d4a66 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 22 May 2026 17:08:35 +0000 Subject: [PATCH 3/3] refactor: let dynamic filters own proto serialization --- .../src/expressions/dynamic_filters.rs | 244 +++++------------- .../physical-expr/src/expressions/mod.rs | 2 +- .../proto/src/physical_plan/from_proto.rs | 54 +--- .../proto/src/physical_plan/to_proto.rs | 37 +-- 4 files changed, 74 insertions(+), 263 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index cd48b926e1ff4..6e84d716d6373 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -82,22 +82,18 @@ pub struct DynamicFilterPhysicalExpr { /// `expression_id` lives here because it identifies the actual filter expression `expr`. /// Derived `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) are /// the same logical filter and must report the same `expression_id`. -/// -/// **Warning:** exposed publicly solely so that proto (de)serialization in -/// `datafusion-proto` can read and rebuild this state. Do not treat this type -/// or its layout as a stable API. #[derive(Clone, Debug)] -pub struct Inner { +struct Inner { /// A unique identifier for the expression. - pub expression_id: u64, + expression_id: u64, /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. - pub generation: u64, - pub expr: Arc, + generation: u64, + expr: Arc, /// Flag for quick synchronous check if filter is complete. /// This is redundant with the watch channel state, but allows us to return immediately /// from `wait_complete()` without subscribing if already complete. - pub is_complete: bool, + is_complete: bool, } impl Inner { @@ -361,108 +357,10 @@ impl DynamicFilterPhysicalExpr { write!(f, " ]") } - - /// Return the filter's original children (before any remapping). - /// - /// **Warning:** intended only for `datafusion-proto` (de)serialization. - /// Not a stable API. - pub fn original_children(&self) -> &[Arc] { - &self.children - } - - /// Return the filter's remapped children, if any have been set via - /// [`PhysicalExpr::with_new_children`]. - /// - /// **Warning:** intended only for `datafusion-proto` (de)serialization. - /// Not a stable API. - pub fn remapped_children(&self) -> Option<&[Arc]> { - self.remapped_children.as_deref() - } - - /// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by - /// proto deserialization. - /// - /// **Warning:** intended only for `datafusion-proto` (de)serialization. - /// Not a stable API. - pub fn from_parts( - children: Vec>, - remapped_children: Option>>, - inner: Inner, - ) -> Self { - let state = if inner.is_complete { - FilterState::Complete { - generation: inner.generation, - } - } else { - FilterState::InProgress { - generation: inner.generation, - } - }; - let (state_watch, _) = watch::channel(state); - - Self { - children, - remapped_children, - inner: Arc::new(RwLock::new(inner)), - state_watch, - data_type: Arc::new(RwLock::new(None)), - nullable: Arc::new(RwLock::new(None)), - } - } - - /// Return a clone of the atomically-captured `Inner` state. - /// - /// **Warning:** intended only for `datafusion-proto` (de)serialization. - /// Not a stable API. - pub fn inner(&self) -> Inner { - self.inner.read().clone() - } } #[cfg(feature = "proto")] impl DynamicFilterPhysicalExpr { - /// Serialize this expression to protobuf. - /// - /// Encodes `children`, `remapped_children`, and the atomically-captured - /// `Inner` state (expression id, generation, current expr, is_complete). - pub fn try_to_proto( - &self, - ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, - ) -> Result> { - use datafusion_proto_models::protobuf; - use datafusion_proto_models::protobuf::physical_expr_node::ExprType; - - let children = self - .children - .iter() - .map(|c| ctx.encode_child(c)) - .collect::>>()?; - - let remapped_children = match &self.remapped_children { - Some(remapped) => remapped - .iter() - .map(|c| ctx.encode_child(c)) - .collect::>>()?, - None => vec![], - }; - - let inner = self.inner.read().clone(); - let inner_expr = Box::new(ctx.encode_child(&inner.expr)?); - - Ok(Some(protobuf::PhysicalExprNode { - expr_id: Some(inner.expression_id), - expr_type: Some(ExprType::DynamicFilter(Box::new( - protobuf::PhysicalDynamicFilterNode { - children, - remapped_children, - generation: inner.generation, - inner_expr: Some(inner_expr), - is_complete: inner.is_complete, - }, - ))), - })) - } - /// Reconstruct a [`DynamicFilterPhysicalExpr`] from its protobuf representation. pub fn try_from_proto( node: &datafusion_proto_models::protobuf::PhysicalExprNode, @@ -512,16 +410,31 @@ impl DynamicFilterPhysicalExpr { ) })?)?; - Ok(Arc::new(DynamicFilterPhysicalExpr::from_parts( + let inner = Inner { + expression_id, + generation: dynamic_filter.generation, + expr: inner_expr, + is_complete: dynamic_filter.is_complete, + }; + let state = if inner.is_complete { + FilterState::Complete { + generation: inner.generation, + } + } else { + FilterState::InProgress { + generation: inner.generation, + } + }; + let (state_watch, _) = watch::channel(state); + + Ok(Arc::new(DynamicFilterPhysicalExpr { children, remapped_children, - Inner { - expression_id, - generation: dynamic_filter.generation, - expr: inner_expr, - is_complete: dynamic_filter.is_complete, - }, - ))) + inner: Arc::new(RwLock::new(inner)), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + })) } } @@ -609,6 +522,45 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { current.evaluate(batch) } + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + use datafusion_proto_models::protobuf::physical_expr_node::ExprType; + + let children = self + .children + .iter() + .map(|c| ctx.encode_child(c)) + .collect::>>()?; + + let remapped_children = match &self.remapped_children { + Some(remapped) => remapped + .iter() + .map(|c| ctx.encode_child(c)) + .collect::>>()?, + None => vec![], + }; + + let inner = self.inner.read().clone(); + let inner_expr = Box::new(ctx.encode_child(&inner.expr)?); + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: Some(inner.expression_id), + expr_type: Some(ExprType::DynamicFilter(Box::new( + protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: inner.generation, + inner_expr: Some(inner_expr), + is_complete: inner.is_complete, + }, + ))), + })) + } + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.render(f, |expr, f| expr.fmt_sql(f)) } @@ -1068,68 +1020,6 @@ mod test { ); } - /// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr` - /// whose observable state (original children, remapped children, - /// expression id, inner generation/expr/is_complete) matches the source - /// filter. - #[test] - fn test_from_parts_preserves_state() { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let col_a = col("a", &schema).unwrap(); - - // Create a dynamic filter with children - let expr = Arc::new(BinaryExpr::new( - Arc::clone(&col_a), - datafusion_expr::Operator::Gt, - lit(10) as Arc, - )); - let filter = DynamicFilterPhysicalExpr::new( - vec![Arc::clone(&col_a)], - expr as Arc, - ); - - // Add remapped children. - let reassigned_schema = Arc::new(Schema::new(vec![ - Field::new("b", DataType::Int32, false), - Field::new("a", DataType::Int32, false), - ])); - let reassigned = reassign_expr_columns( - Arc::new(filter) as Arc, - &reassigned_schema, - ) - .expect("reassign_expr_columns should succeed"); - let reassigned = reassigned - .downcast_ref::() - .expect("Expected dynamic filter after reassignment"); - - reassigned - .update(lit(42) as Arc) - .expect("Update should succeed"); - reassigned.mark_complete(); - - // Capture the parts and reconstruct. `expression_id` rides in `inner`. - let reconstructed = DynamicFilterPhysicalExpr::from_parts( - reassigned.original_children().to_vec(), - reassigned.remapped_children().map(|r| r.to_vec()), - reassigned.inner(), - ); - - assert_eq!( - reassigned.original_children(), - reconstructed.original_children(), - ); - assert_eq!( - reassigned.remapped_children(), - reconstructed.remapped_children(), - ); - assert_eq!(reassigned.expression_id(), reconstructed.expression_id()); - let r = reassigned.inner(); - let c = reconstructed.inner(); - assert_eq!(r.generation, c.generation); - assert_eq!(r.is_complete, c.is_complete); - assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr)); - } - #[tokio::test] async fn test_expression_id() { let source_schema = diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 7cf874c448ea0..bb6d9ca9c9c78 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,7 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner}; +pub use dynamic_filters::DynamicFilterPhysicalExpr; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index f5fd214ef683f..58cdb9bec0495 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -39,6 +39,7 @@ use datafusion_execution::{FunctionRegistry, TaskContext}; use datafusion_expr::WindowFunctionDefinition; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::SubqueryIndex; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; @@ -60,9 +61,6 @@ use super::{ use crate::convert::TryFromProto; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, convert_required_proto, protobuf}; -use datafusion_physical_expr::expressions::{ - DynamicFilterInner, DynamicFilterPhysicalExpr, -}; /// Parses a physical sort expression from a protobuf. /// @@ -281,6 +279,9 @@ pub fn parse_physical_expr_with_converter( ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)), ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?, + ExprType::DynamicFilter(_) => { + DynamicFilterPhysicalExpr::try_from_proto(proto, &decode_ctx)? + } ExprType::AggregateExpr(_) => { return not_impl_err!( "Cannot convert aggregate expr node to physical expression" @@ -478,53 +479,6 @@ pub fn parse_physical_expr_with_converter( results.clone(), )) } - ExprType::DynamicFilter(dynamic_filter) => { - let children = parse_physical_exprs( - &dynamic_filter.children, - ctx, - input_schema, - proto_converter, - )?; - - let remapped_children = if !dynamic_filter.remapped_children.is_empty() { - Some(parse_physical_exprs( - &dynamic_filter.remapped_children, - ctx, - input_schema, - proto_converter, - )?) - } else { - None - }; - - let inner_expr = parse_required_physical_expr( - dynamic_filter.inner_expr.as_deref(), - ctx, - "inner_expr", - input_schema, - proto_converter, - )?; - - let expression_id = proto.expr_id.ok_or_else(|| { - proto_error( - "DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \ - to be set by the serializer", - ) - })?; - - let base_filter: Arc = - Arc::new(DynamicFilterPhysicalExpr::from_parts( - children, - remapped_children, - DynamicFilterInner { - expression_id, - generation: dynamic_filter.generation, - expr: inner_expr, - is_complete: dynamic_filter.is_complete, - }, - )); - base_filter - } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5181c9740130a..b8777f99adadc 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, CastExpr, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, + NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -528,39 +528,6 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(df) = expr.downcast_ref::() { - let children = df - .original_children() - .iter() - .map(|child| proto_converter.physical_expr_to_proto(child, codec)) - .collect::>>()?; - - let remapped_children = if let Some(remapped) = df.remapped_children() { - remapped - .iter() - .map(|child| proto_converter.physical_expr_to_proto(child, codec)) - .collect::>>()? - } else { - vec![] - }; - - // Atomic snapshot of inner state. - let inner = df.inner(); - let inner_expr = - Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?); - - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( - Box::new(protobuf::PhysicalDynamicFilterNode { - children, - remapped_children, - generation: inner.generation, - inner_expr: Some(inner_expr), - is_complete: inner.is_complete, - }), - )), - }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(value, &mut buf) {