@@ -22,8 +22,7 @@ use arrow::{
2222 datatypes:: { DataType , Schema } ,
2323 record_batch:: RecordBatch ,
2424} ;
25- use datafusion_common:: Result ;
26- use datafusion_common:: ScalarValue ;
25+ use datafusion_common:: { Result , ScalarValue } ;
2726use datafusion_expr:: ColumnarValue ;
2827use std:: hash:: Hash ;
2928use std:: sync:: Arc ;
@@ -102,6 +101,60 @@ impl PhysicalExpr for IsNullExpr {
102101 self . arg . fmt_sql ( f) ?;
103102 write ! ( f, " IS NULL" )
104103 }
104+
105+ #[ cfg( feature = "proto" ) ]
106+ fn try_to_proto (
107+ & self ,
108+ ctx : & datafusion_physical_expr_common:: physical_expr:: proto_encode:: PhysicalExprEncodeCtx < ' _ > ,
109+ ) -> Result < Option < datafusion_proto_models:: protobuf:: PhysicalExprNode > > {
110+ use datafusion_proto_models:: protobuf;
111+
112+ Ok ( Some ( protobuf:: PhysicalExprNode {
113+ expr_id : None ,
114+ expr_type : Some ( protobuf:: physical_expr_node:: ExprType :: IsNullExpr (
115+ Box :: new ( protobuf:: PhysicalIsNull {
116+ expr : Some ( Box :: new ( ctx. encode_child ( & self . arg ) ?) ) ,
117+ } ) ,
118+ ) ) ,
119+ } ) )
120+ }
121+ }
122+
123+ #[ cfg( feature = "proto" ) ]
124+ impl IsNullExpr {
125+ /// Reconstruct an [`IsNullExpr`] from its protobuf representation.
126+ ///
127+ /// Takes the whole [`PhysicalExprNode`] — the exact inverse of what
128+ /// [`PhysicalExpr::try_to_proto`] produces — so every expression's
129+ /// `try_from_proto` shares one signature.
130+ ///
131+ /// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
132+ /// [`PhysicalExpr::try_to_proto`]: datafusion_physical_expr_common::physical_expr::PhysicalExpr::try_to_proto
133+ /// [`PhysicalExprDecodeCtx::decode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::decode
134+ pub fn try_from_proto (
135+ node : & datafusion_proto_models:: protobuf:: PhysicalExprNode ,
136+ ctx : & datafusion_physical_expr_common:: physical_expr:: proto_decode:: PhysicalExprDecodeCtx < ' _ > ,
137+ ) -> Result < Arc < dyn PhysicalExpr > > {
138+ use datafusion_proto_models:: protobuf;
139+
140+ let node = match & node. expr_type {
141+ Some ( protobuf:: physical_expr_node:: ExprType :: IsNullExpr ( node) ) => {
142+ node. as_ref ( )
143+ }
144+ _ => {
145+ return datafusion_common:: internal_err!(
146+ "PhysicalExprNode is not an IsNullExpr"
147+ ) ;
148+ }
149+ } ;
150+ let expr = node. expr . as_deref ( ) . ok_or_else ( || {
151+ datafusion_common:: DataFusionError :: Internal (
152+ "IsNullExpr is missing required field 'expr'" . to_string ( ) ,
153+ )
154+ } ) ?;
155+
156+ Ok ( Arc :: new ( IsNullExpr :: new ( ctx. decode ( expr) ?) ) )
157+ }
105158}
106159
107160/// Create an IS NULL expression
@@ -112,6 +165,8 @@ pub fn is_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
112165#[ cfg( test) ]
113166mod tests {
114167 use super :: * ;
168+ #[ cfg( feature = "proto" ) ]
169+ use crate :: expressions:: Column ;
115170 use crate :: expressions:: col;
116171 use arrow:: array:: {
117172 Array , BooleanArray , Float64Array , Int32Array , StringArray , UnionArray ,
@@ -121,6 +176,54 @@ mod tests {
121176 use datafusion_common:: cast:: as_boolean_array;
122177 use datafusion_physical_expr_common:: physical_expr:: fmt_sql;
123178
179+ #[ cfg( feature = "proto" ) ]
180+ use datafusion_physical_expr_common:: physical_expr:: {
181+ proto_decode:: { PhysicalExprDecode , PhysicalExprDecodeCtx } ,
182+ proto_encode:: { PhysicalExprEncode , PhysicalExprEncodeCtx } ,
183+ } ;
184+ #[ cfg( feature = "proto" ) ]
185+ use datafusion_proto_models:: protobuf;
186+
187+ #[ cfg( feature = "proto" ) ]
188+ struct TestProtoCodec ;
189+
190+ #[ cfg( feature = "proto" ) ]
191+ impl PhysicalExprEncode for TestProtoCodec {
192+ fn encode (
193+ & self ,
194+ expr : & Arc < dyn PhysicalExpr > ,
195+ ) -> Result < protobuf:: PhysicalExprNode > {
196+ let ctx = PhysicalExprEncodeCtx :: new ( self ) ;
197+ expr. try_to_proto ( & ctx) ?. ok_or_else ( || {
198+ datafusion_common:: DataFusionError :: Internal (
199+ "Expression did not serialize in test codec" . to_string ( ) ,
200+ )
201+ } )
202+ }
203+ }
204+
205+ #[ cfg( feature = "proto" ) ]
206+ impl PhysicalExprDecode for TestProtoCodec {
207+ fn decode (
208+ & self ,
209+ node : & protobuf:: PhysicalExprNode ,
210+ schema : & Schema ,
211+ ) -> Result < Arc < dyn PhysicalExpr > > {
212+ let ctx = PhysicalExprDecodeCtx :: new ( schema, self ) ;
213+ match & node. expr_type {
214+ Some ( protobuf:: physical_expr_node:: ExprType :: Column ( _) ) => {
215+ Column :: try_from_proto ( node, & ctx)
216+ }
217+ Some ( protobuf:: physical_expr_node:: ExprType :: IsNullExpr ( _) ) => {
218+ IsNullExpr :: try_from_proto ( node, & ctx)
219+ }
220+ _ => datafusion_common:: internal_err!(
221+ "Unsupported expression in test decoder"
222+ ) ,
223+ }
224+ }
225+ }
226+
124227 #[ test]
125228 fn is_null_op ( ) -> Result < ( ) > {
126229 let schema = Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Utf8 , true ) ] ) ;
@@ -223,4 +326,72 @@ mod tests {
223326
224327 Ok ( ( ) )
225328 }
329+
330+ #[ cfg( feature = "proto" ) ]
331+ #[ test]
332+ fn is_null_proto_hook_roundtrip ( ) -> Result < ( ) > {
333+ let arg = Arc :: new ( Column :: new ( "a" , 0 ) ) as Arc < dyn PhysicalExpr > ;
334+ let expr = Arc :: new ( IsNullExpr :: new ( arg) ) as Arc < dyn PhysicalExpr > ;
335+
336+ let codec = TestProtoCodec ;
337+ let proto = codec. encode ( & expr) ?;
338+
339+ let is_null = match & proto. expr_type {
340+ Some ( protobuf:: physical_expr_node:: ExprType :: IsNullExpr ( is_null) ) => is_null,
341+ other => panic ! ( "Expected IsNullExpr proto, got {other:?}" ) ,
342+ } ;
343+ assert ! ( matches!(
344+ is_null
345+ . expr
346+ . as_deref( )
347+ . and_then( |expr| expr. expr_type. as_ref( ) ) ,
348+ Some ( protobuf:: physical_expr_node:: ExprType :: Column ( _) )
349+ ) ) ;
350+
351+ let schema = Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int64 , true ) ] ) ;
352+ let decode_ctx = PhysicalExprDecodeCtx :: new ( & schema, & codec) ;
353+ let decoded = IsNullExpr :: try_from_proto ( & proto, & decode_ctx) ?;
354+
355+ assert_eq ! ( decoded. to_string( ) , "a@0 IS NULL" ) ;
356+ Ok ( ( ) )
357+ }
358+
359+ #[ cfg( feature = "proto" ) ]
360+ #[ test]
361+ fn is_null_try_from_proto_errors ( ) {
362+ let schema = Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int64 , true ) ] ) ;
363+ let codec = TestProtoCodec ;
364+ let decode_ctx = PhysicalExprDecodeCtx :: new ( & schema, & codec) ;
365+
366+ let wrong_variant = protobuf:: PhysicalExprNode {
367+ expr_id : None ,
368+ expr_type : Some ( protobuf:: physical_expr_node:: ExprType :: Column (
369+ protobuf:: PhysicalColumn {
370+ name : "a" . to_string ( ) ,
371+ index : 0 ,
372+ } ,
373+ ) ) ,
374+ } ;
375+ let error = IsNullExpr :: try_from_proto ( & wrong_variant, & decode_ctx)
376+ . expect_err ( "wrong variant should error" )
377+ . strip_backtrace ( ) ;
378+ assert ! (
379+ error. contains( "PhysicalExprNode is not an IsNullExpr" ) ,
380+ "{error}"
381+ ) ;
382+
383+ let missing_child = protobuf:: PhysicalExprNode {
384+ expr_id : None ,
385+ expr_type : Some ( protobuf:: physical_expr_node:: ExprType :: IsNullExpr (
386+ Box :: new ( protobuf:: PhysicalIsNull { expr : None } ) ,
387+ ) ) ,
388+ } ;
389+ let error = IsNullExpr :: try_from_proto ( & missing_child, & decode_ctx)
390+ . expect_err ( "missing child should error" )
391+ . strip_backtrace ( ) ;
392+ assert ! (
393+ error. contains( "IsNullExpr is missing required field 'expr'" ) ,
394+ "{error}"
395+ ) ;
396+ }
226397}
0 commit comments