diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index b63effdbb9c88..16061103ef589 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -25,7 +25,9 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, FieldRef, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{Result, ScalarValue, cast::as_boolean_array, internal_err}; +use datafusion_common::{ + Result, ScalarValue, cast::as_boolean_array, internal_datafusion_err, internal_err, +}; use datafusion_expr::ColumnarValue; use datafusion_expr::interval_arithmetic::Interval; #[expect(deprecated)] @@ -181,6 +183,47 @@ impl PhysicalExpr for NotExpr { write!(f, "NOT ")?; self.arg.fmt_sql(f) } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( + protobuf::PhysicalNot { + expr: Some(Box::new(ctx.encode_child(&self.arg)?)), + }, + ))), + })) + } +} + +#[cfg(feature = "proto")] +impl NotExpr { + /// Reconstruct a [`NotExpr`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + let protobuf::PhysicalNot { expr } = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::NotExpr(e)) => e.as_ref(), + _ => return internal_err!("PhysicalExprNode is not a NotExpr"), + }; + let expr = expr.as_deref().ok_or_else(|| { + internal_datafusion_err!( + "NotExpr is missing required field 'expr' (expr_id: {:?}, expr_type: NotExpr)", + node.expr_id + ) + })?; + + Ok(Arc::new(NotExpr::new(ctx.decode(expr)?))) + } } /// Creates a unary expression NOT @@ -198,6 +241,46 @@ mod tests { use arrow::{array::BooleanArray, datatypes::*}; use datafusion_physical_expr_common::physical_expr::fmt_sql; + #[cfg(feature = "proto")] + #[test] + fn test_from_proto_missing_child() { + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::{ + PhysicalExprDecode, PhysicalExprDecodeCtx, + }; + use datafusion_proto_models::protobuf::{ + PhysicalExprNode, PhysicalNot, physical_expr_node, + }; + + struct NoopDecoder; + + impl PhysicalExprDecode for NoopDecoder { + fn decode( + &self, + _node: &PhysicalExprNode, + _schema: &Schema, + ) -> Result> { + unreachable!("missing child should be rejected before decoding") + } + } + + let node = PhysicalExprNode { + expr_id: Some(42), + expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( + PhysicalNot { expr: None }, + ))), + }; + let schema = Schema::empty(); + let decoder = NoopDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!(err, DataFusionError::Internal(msg) + if msg.contains("NotExpr is missing required field 'expr'") + && msg.contains("expr_id: Some(42)") + && msg.contains("expr_type: NotExpr"))); + } + #[test] fn neg_op() -> Result<()> { let schema = schema(); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..cb2f20a23a816 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -312,13 +312,7 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?)) } - ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( - e.expr.as_deref(), - ctx, - "expr", - input_schema, - proto_converter, - )?)), + ExprType::NotExpr(_) => NotExpr::try_from_proto(proto, &decode_ctx)?, ExprType::Negative(e) => { Arc::new(NegativeExpr::new(parse_required_physical_expr( e.expr.as_deref(), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..6fc25fc0233c2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + NegativeExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -379,17 +379,6 @@ pub fn serialize_physical_expr_with_converter( ), ), }) - } else if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( - protobuf::PhysicalNot { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.arg(), codec)?, - )), - }, - ))), - }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id,