From 88e28a4c0c2523bb61d5f95b75fd9ac7bc1252a8 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Fri, 22 May 2026 18:58:10 +0200 Subject: [PATCH 1/4] Port NotExpr proto hooks --- .../physical-expr/src/expressions/not.rs | 40 +++++++++++++++++++ .../proto/src/physical_plan/from_proto.rs | 8 +--- .../proto/src/physical_plan/to_proto.rs | 15 +------ 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index b63effdbb9c88..76833d1114563 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -181,6 +181,46 @@ impl PhysicalExpr for NotExpr { write!(f, "NOT ")?; self.arg.fmt_sql(f) } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( + protobuf::PhysicalNot { + expr: Some(Box::new(ctx.encode_child(&self.arg)?)), + }, + ))), + })) + } +} + +#[cfg(feature = "proto")] +impl NotExpr { + /// Reconstruct a [`NotExpr`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + let protobuf::PhysicalNot { expr } = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::NotExpr(e)) => e.as_ref(), + _ => return internal_err!("PhysicalExprNode is not a NotExpr"), + }; + let expr = expr.as_deref().ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "NotExpr is missing required field 'expr'".to_string(), + ) + })?; + + Ok(Arc::new(NotExpr::new(ctx.decode(expr)?))) + } } /// Creates a unary expression NOT diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..cb2f20a23a816 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -312,13 +312,7 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?)) } - ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( - e.expr.as_deref(), - ctx, - "expr", - input_schema, - proto_converter, - )?)), + ExprType::NotExpr(_) => NotExpr::try_from_proto(proto, &decode_ctx)?, ExprType::Negative(e) => { Arc::new(NegativeExpr::new(parse_required_physical_expr( e.expr.as_deref(), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..3c7dbd1dd835f 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr, + LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -379,17 +379,6 @@ pub fn serialize_physical_expr_with_converter( ), ), }) - } else if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( - protobuf::PhysicalNot { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.arg(), codec)?, - )), - }, - ))), - }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id, From 15800c7f03db7c7ed5fad8acc8c8bfe96580ec52 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Sun, 24 May 2026 11:03:10 +0200 Subject: [PATCH 2/4] Add NotExpr proto missing child test --- .../physical-expr/src/expressions/not.rs | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 76833d1114563..64cb7733087bc 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -238,6 +238,45 @@ mod tests { use arrow::{array::BooleanArray, datatypes::*}; use datafusion_physical_expr_common::physical_expr::fmt_sql; + #[cfg(feature = "proto")] + #[test] + fn test_from_proto_missing_child() { + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::{ + PhysicalExprDecode, PhysicalExprDecodeCtx, + }; + use datafusion_proto_models::protobuf::{ + PhysicalExprNode, PhysicalNot, physical_expr_node, + }; + + struct NoopDecoder; + + impl PhysicalExprDecode for NoopDecoder { + fn decode( + &self, + _node: &PhysicalExprNode, + _schema: &Schema, + ) -> Result> { + unreachable!("missing child should be rejected before decoding") + } + } + + let node = PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( + PhysicalNot { expr: None }, + ))), + }; + let schema = Schema::empty(); + let decoder = NoopDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!( + matches!(err, DataFusionError::Internal(msg) if msg == "NotExpr is missing required field 'expr'") + ); + } + #[test] fn neg_op() -> Result<()> { let schema = schema(); From 812b9e05655329d31c75f51a57f43aee9fac628a Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Mon, 25 May 2026 16:21:24 +0200 Subject: [PATCH 3/4] Improve NotExpr proto decode error Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- .../physical-expr/src/expressions/not.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 64cb7733087bc..16061103ef589 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -25,7 +25,9 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, FieldRef, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{Result, ScalarValue, cast::as_boolean_array, internal_err}; +use datafusion_common::{ + Result, ScalarValue, cast::as_boolean_array, internal_datafusion_err, internal_err, +}; use datafusion_expr::ColumnarValue; use datafusion_expr::interval_arithmetic::Interval; #[expect(deprecated)] @@ -214,8 +216,9 @@ impl NotExpr { _ => return internal_err!("PhysicalExprNode is not a NotExpr"), }; let expr = expr.as_deref().ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "NotExpr is missing required field 'expr'".to_string(), + internal_datafusion_err!( + "NotExpr is missing required field 'expr' (expr_id: {:?}, expr_type: NotExpr)", + node.expr_id ) })?; @@ -262,7 +265,7 @@ mod tests { } let node = PhysicalExprNode { - expr_id: None, + expr_id: Some(42), expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( PhysicalNot { expr: None }, ))), @@ -272,9 +275,10 @@ mod tests { let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); - assert!( - matches!(err, DataFusionError::Internal(msg) if msg == "NotExpr is missing required field 'expr'") - ); + assert!(matches!(err, DataFusionError::Internal(msg) + if msg.contains("NotExpr is missing required field 'expr'") + && msg.contains("expr_id: Some(42)") + && msg.contains("expr_type: NotExpr"))); } #[test] From 33a90db6f256d9683940b87eacfbf8cc00c7e35b Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Mon, 25 May 2026 16:23:59 +0200 Subject: [PATCH 4/4] Remove stale proto serializer imports Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- datafusion/proto/src/physical_plan/to_proto.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 3c7dbd1dd835f..6fc25fc0233c2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, + NegativeExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr;