Skip to content

Commit ff682e2

Browse files
committed
Port IsNullExpr proto serialization hooks
1 parent 1b8451c commit ff682e2

3 files changed

Lines changed: 176 additions & 24 deletions

File tree

datafusion/physical-expr/src/expressions/is_null.rs

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ use arrow::{
2222
datatypes::{DataType, Schema},
2323
record_batch::RecordBatch,
2424
};
25-
use datafusion_common::Result;
26-
use datafusion_common::ScalarValue;
25+
use datafusion_common::{Result, ScalarValue};
2726
use datafusion_expr::ColumnarValue;
2827
use std::hash::Hash;
2928
use std::sync::Arc;
@@ -102,6 +101,60 @@ impl PhysicalExpr for IsNullExpr {
102101
self.arg.fmt_sql(f)?;
103102
write!(f, " IS NULL")
104103
}
104+
105+
#[cfg(feature = "proto")]
106+
fn try_to_proto(
107+
&self,
108+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
109+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
110+
use datafusion_proto_models::protobuf;
111+
112+
Ok(Some(protobuf::PhysicalExprNode {
113+
expr_id: None,
114+
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
115+
Box::new(protobuf::PhysicalIsNull {
116+
expr: Some(Box::new(ctx.encode_child(&self.arg)?)),
117+
}),
118+
)),
119+
}))
120+
}
121+
}
122+
123+
#[cfg(feature = "proto")]
124+
impl IsNullExpr {
125+
/// Reconstruct an [`IsNullExpr`] from its protobuf representation.
126+
///
127+
/// Takes the whole [`PhysicalExprNode`] — the exact inverse of what
128+
/// [`PhysicalExpr::try_to_proto`] produces — so every expression's
129+
/// `try_from_proto` shares one signature.
130+
///
131+
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
132+
/// [`PhysicalExpr::try_to_proto`]: datafusion_physical_expr_common::physical_expr::PhysicalExpr::try_to_proto
133+
/// [`PhysicalExprDecodeCtx::decode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::decode
134+
pub fn try_from_proto(
135+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
136+
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
137+
) -> Result<Arc<dyn PhysicalExpr>> {
138+
use datafusion_proto_models::protobuf;
139+
140+
let node = match &node.expr_type {
141+
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(node)) => {
142+
node.as_ref()
143+
}
144+
_ => {
145+
return datafusion_common::internal_err!(
146+
"PhysicalExprNode is not an IsNullExpr"
147+
);
148+
}
149+
};
150+
let expr = node.expr.as_deref().ok_or_else(|| {
151+
datafusion_common::DataFusionError::Internal(
152+
"IsNullExpr is missing required field 'expr'".to_string(),
153+
)
154+
})?;
155+
156+
Ok(Arc::new(IsNullExpr::new(ctx.decode(expr)?)))
157+
}
105158
}
106159

107160
/// Create an IS NULL expression
@@ -112,6 +165,8 @@ pub fn is_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
112165
#[cfg(test)]
113166
mod tests {
114167
use super::*;
168+
#[cfg(feature = "proto")]
169+
use crate::expressions::Column;
115170
use crate::expressions::col;
116171
use arrow::array::{
117172
Array, BooleanArray, Float64Array, Int32Array, StringArray, UnionArray,
@@ -121,6 +176,54 @@ mod tests {
121176
use datafusion_common::cast::as_boolean_array;
122177
use datafusion_physical_expr_common::physical_expr::fmt_sql;
123178

179+
#[cfg(feature = "proto")]
180+
use datafusion_physical_expr_common::physical_expr::{
181+
proto_decode::{PhysicalExprDecode, PhysicalExprDecodeCtx},
182+
proto_encode::{PhysicalExprEncode, PhysicalExprEncodeCtx},
183+
};
184+
#[cfg(feature = "proto")]
185+
use datafusion_proto_models::protobuf;
186+
187+
#[cfg(feature = "proto")]
188+
struct TestProtoCodec;
189+
190+
#[cfg(feature = "proto")]
191+
impl PhysicalExprEncode for TestProtoCodec {
192+
fn encode(
193+
&self,
194+
expr: &Arc<dyn PhysicalExpr>,
195+
) -> Result<protobuf::PhysicalExprNode> {
196+
let ctx = PhysicalExprEncodeCtx::new(self);
197+
expr.try_to_proto(&ctx)?.ok_or_else(|| {
198+
datafusion_common::DataFusionError::Internal(
199+
"Expression did not serialize in test codec".to_string(),
200+
)
201+
})
202+
}
203+
}
204+
205+
#[cfg(feature = "proto")]
206+
impl PhysicalExprDecode for TestProtoCodec {
207+
fn decode(
208+
&self,
209+
node: &protobuf::PhysicalExprNode,
210+
schema: &Schema,
211+
) -> Result<Arc<dyn PhysicalExpr>> {
212+
let ctx = PhysicalExprDecodeCtx::new(schema, self);
213+
match &node.expr_type {
214+
Some(protobuf::physical_expr_node::ExprType::Column(_)) => {
215+
Column::try_from_proto(node, &ctx)
216+
}
217+
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(_)) => {
218+
IsNullExpr::try_from_proto(node, &ctx)
219+
}
220+
_ => datafusion_common::internal_err!(
221+
"Unsupported expression in test decoder"
222+
),
223+
}
224+
}
225+
}
226+
124227
#[test]
125228
fn is_null_op() -> Result<()> {
126229
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
@@ -223,4 +326,72 @@ mod tests {
223326

224327
Ok(())
225328
}
329+
330+
#[cfg(feature = "proto")]
331+
#[test]
332+
fn is_null_proto_hook_roundtrip() -> Result<()> {
333+
let arg = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
334+
let expr = Arc::new(IsNullExpr::new(arg)) as Arc<dyn PhysicalExpr>;
335+
336+
let codec = TestProtoCodec;
337+
let proto = codec.encode(&expr)?;
338+
339+
let is_null = match &proto.expr_type {
340+
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(is_null)) => is_null,
341+
other => panic!("Expected IsNullExpr proto, got {other:?}"),
342+
};
343+
assert!(matches!(
344+
is_null
345+
.expr
346+
.as_deref()
347+
.and_then(|expr| expr.expr_type.as_ref()),
348+
Some(protobuf::physical_expr_node::ExprType::Column(_))
349+
));
350+
351+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
352+
let decode_ctx = PhysicalExprDecodeCtx::new(&schema, &codec);
353+
let decoded = IsNullExpr::try_from_proto(&proto, &decode_ctx)?;
354+
355+
assert_eq!(decoded.to_string(), "a@0 IS NULL");
356+
Ok(())
357+
}
358+
359+
#[cfg(feature = "proto")]
360+
#[test]
361+
fn is_null_try_from_proto_errors() {
362+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
363+
let codec = TestProtoCodec;
364+
let decode_ctx = PhysicalExprDecodeCtx::new(&schema, &codec);
365+
366+
let wrong_variant = protobuf::PhysicalExprNode {
367+
expr_id: None,
368+
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
369+
protobuf::PhysicalColumn {
370+
name: "a".to_string(),
371+
index: 0,
372+
},
373+
)),
374+
};
375+
let error = IsNullExpr::try_from_proto(&wrong_variant, &decode_ctx)
376+
.expect_err("wrong variant should error")
377+
.strip_backtrace();
378+
assert!(
379+
error.contains("PhysicalExprNode is not an IsNullExpr"),
380+
"{error}"
381+
);
382+
383+
let missing_child = protobuf::PhysicalExprNode {
384+
expr_id: None,
385+
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
386+
Box::new(protobuf::PhysicalIsNull { expr: None }),
387+
)),
388+
};
389+
let error = IsNullExpr::try_from_proto(&missing_child, &decode_ctx)
390+
.expect_err("missing child should error")
391+
.strip_backtrace();
392+
assert!(
393+
error.contains("IsNullExpr is missing required field 'expr'"),
394+
"{error}"
395+
);
396+
}
226397
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -294,15 +294,7 @@ pub fn parse_physical_expr_with_converter(
294294
ExprType::Sort(_) => {
295295
return not_impl_err!("Cannot convert sort expr node to physical expression");
296296
}
297-
ExprType::IsNullExpr(e) => {
298-
Arc::new(IsNullExpr::new(parse_required_physical_expr(
299-
e.expr.as_deref(),
300-
ctx,
301-
"expr",
302-
input_schema,
303-
proto_converter,
304-
)?))
305-
}
297+
ExprType::IsNullExpr(_) => IsNullExpr::try_from_proto(proto, &decode_ctx)?,
306298
ExprType::IsNotNullExpr(e) => {
307299
Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
308300
e.expr.as_deref(),

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
3636
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal,
40-
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
39+
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, Literal, NegativeExpr,
40+
NotExpr, TryCastExpr, UnKnownColumn,
4141
};
4242
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
@@ -390,17 +390,6 @@ pub fn serialize_physical_expr_with_converter(
390390
},
391391
))),
392392
})
393-
} else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
394-
Ok(protobuf::PhysicalExprNode {
395-
expr_id,
396-
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
397-
Box::new(protobuf::PhysicalIsNull {
398-
expr: Some(Box::new(
399-
proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
400-
)),
401-
}),
402-
)),
403-
})
404393
} else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
405394
Ok(protobuf::PhysicalExprNode {
406395
expr_id,

0 commit comments

Comments
 (0)