diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index e2bda4c8aaf49..86641d5e6935d 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -174,6 +174,46 @@ impl PhysicalExpr for NegativeExpr { self.arg.fmt_sql(f)?; write!(f, ")") } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( + protobuf::PhysicalNegativeNode { + expr: Some(Box::new(ctx.encode_child(&self.arg)?)), + }, + ))), + })) + } +} + +#[cfg(feature = "proto")] +impl NegativeExpr { + /// Reconstruct a [`NegativeExpr`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + let protobuf::PhysicalNegativeNode { expr } = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::Negative(n)) => n.as_ref(), + _ => return internal_err!("PhysicalExprNode is not a Negative"), + }; + let expr = expr.as_deref().ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "Negative is missing required field 'expr'".to_string(), + ) + })?; + + Ok(Arc::new(NegativeExpr::new(ctx.decode(expr)?))) + } } /// Creates a unary expression NEGATIVE diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..17d8edc5fe809 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -319,15 +319,7 @@ pub fn parse_physical_expr_with_converter( input_schema, proto_converter, )?)), - ExprType::Negative(e) => { - Arc::new(NegativeExpr::new(parse_required_physical_expr( - e.expr.as_deref(), - ctx, - "expr", - input_schema, - proto_converter, - )?)) - } + ExprType::Negative(_) => NegativeExpr::try_from_proto(proto, &decode_ctx)?, ExprType::InList(_) => InListExpr::try_from_proto(proto, &decode_ctx)?, ExprType::Case(e) => Arc::new(CaseExpr::try_new( e.expr diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..79ff944f9258f 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -412,17 +412,6 @@ pub fn serialize_physical_expr_with_converter( }), )), }) - } else if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( - protobuf::PhysicalNegativeNode { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.arg(), codec)?, - )), - }, - ))), - }) } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id,