Skip to content

Commit e4dcc5a

Browse files
committed
Port IsNotNullExpr proto serialization hooks
1 parent 857eb4a commit e4dcc5a

3 files changed

Lines changed: 151 additions & 23 deletions

File tree

datafusion/physical-expr/src/expressions/is_not_null.rs

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ use arrow::{
2222
datatypes::{DataType, Schema},
2323
record_batch::RecordBatch,
2424
};
25-
use datafusion_common::Result;
26-
use datafusion_common::ScalarValue;
25+
use datafusion_common::{Result, ScalarValue};
2726
use datafusion_expr::ColumnarValue;
2827
use std::hash::Hash;
2928
use std::sync::Arc;
@@ -103,6 +102,48 @@ impl PhysicalExpr for IsNotNullExpr {
103102
self.arg.fmt_sql(f)?;
104103
write!(f, " IS NOT NULL")
105104
}
105+
106+
#[cfg(feature = "proto")]
107+
fn try_to_proto(
108+
&self,
109+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
110+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
111+
use datafusion_proto_models::protobuf;
112+
113+
Ok(Some(protobuf::PhysicalExprNode {
114+
expr_id: None,
115+
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
116+
Box::new(protobuf::PhysicalIsNotNull {
117+
expr: Some(Box::new(ctx.encode_child(&self.arg)?)),
118+
}),
119+
)),
120+
}))
121+
}
122+
}
123+
124+
#[cfg(feature = "proto")]
125+
impl IsNotNullExpr {
126+
/// Reconstruct an [`IsNotNullExpr`] from its protobuf representation.
127+
pub fn try_from_proto(
128+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
129+
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
130+
) -> Result<Arc<dyn PhysicalExpr>> {
131+
use datafusion_physical_expr_common::expect_expr_variant;
132+
use datafusion_proto_models::protobuf;
133+
134+
let node = expect_expr_variant!(
135+
node,
136+
protobuf::physical_expr_node::ExprType::IsNotNullExpr,
137+
"IsNotNullExpr",
138+
);
139+
let expr = ctx.decode_required_expression(
140+
node.expr.as_deref(),
141+
"IsNotNullExpr",
142+
"expr",
143+
)?;
144+
145+
Ok(Arc::new(IsNotNullExpr::new(expr)))
146+
}
106147
}
107148

108149
/// Create an IS NOT NULL expression
@@ -213,3 +254,109 @@ mod tests {
213254
Ok(())
214255
}
215256
}
257+
258+
#[cfg(all(test, feature = "proto"))]
259+
mod proto_tests {
260+
use super::*;
261+
use crate::expressions::{Column, col};
262+
use crate::proto_test_util::{
263+
StubDecoder, StubEncoder, UnreachableDecoder, column_node,
264+
};
265+
use arrow::datatypes::Field;
266+
use datafusion_common::DataFusionError;
267+
use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
268+
use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
269+
use datafusion_proto_models::protobuf::{
270+
PhysicalExprNode, PhysicalIsNotNull, physical_expr_node,
271+
};
272+
273+
fn is_not_null_node(expr: Option<Box<PhysicalExprNode>>) -> PhysicalExprNode {
274+
PhysicalExprNode {
275+
expr_id: None,
276+
expr_type: Some(physical_expr_node::ExprType::IsNotNullExpr(Box::new(
277+
PhysicalIsNotNull { expr },
278+
))),
279+
}
280+
}
281+
282+
fn is_not_null_fixture() -> IsNotNullExpr {
283+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
284+
IsNotNullExpr::new(col("a", &schema).unwrap())
285+
}
286+
287+
#[test]
288+
fn try_to_proto_encodes_is_not_null_expr() {
289+
let is_not_null = is_not_null_fixture();
290+
let encoder = StubEncoder::ok();
291+
let ctx = PhysicalExprEncodeCtx::new(&encoder);
292+
293+
let node = is_not_null
294+
.try_to_proto(&ctx)
295+
.unwrap()
296+
.expect("IsNotNullExpr should encode to Some(node)");
297+
298+
assert!(node.expr_id.is_none());
299+
let is_not_null_node = match node.expr_type {
300+
Some(physical_expr_node::ExprType::IsNotNullExpr(boxed)) => *boxed,
301+
other => panic!("expected an IsNotNullExpr node, got {other:?}"),
302+
};
303+
assert!(is_not_null_node.expr.is_some());
304+
}
305+
306+
#[test]
307+
fn try_to_proto_propagates_expr_encode_error() {
308+
let is_not_null = is_not_null_fixture();
309+
let encoder = StubEncoder::failing_on(1);
310+
let ctx = PhysicalExprEncodeCtx::new(&encoder);
311+
let err = is_not_null.try_to_proto(&ctx).unwrap_err();
312+
assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 1")));
313+
}
314+
315+
#[test]
316+
fn try_from_proto_decodes_is_not_null_expr() {
317+
let node = is_not_null_node(Some(Box::new(column_node("a"))));
318+
let schema = Schema::empty();
319+
let decoder = StubDecoder::ok();
320+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
321+
322+
let decoded = IsNotNullExpr::try_from_proto(&node, &ctx).unwrap();
323+
let is_not_null = decoded
324+
.downcast_ref::<IsNotNullExpr>()
325+
.expect("decoded expr should be an IsNotNullExpr");
326+
assert!(is_not_null.arg().downcast_ref::<Column>().is_some());
327+
}
328+
329+
#[test]
330+
fn try_from_proto_rejects_non_is_not_null_node() {
331+
let node = column_node("a");
332+
let schema = Schema::empty();
333+
let decoder = UnreachableDecoder;
334+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
335+
let err = IsNotNullExpr::try_from_proto(&node, &ctx).unwrap_err();
336+
assert!(
337+
matches!(err, DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a IsNotNullExpr"))
338+
);
339+
}
340+
341+
#[test]
342+
fn try_from_proto_rejects_missing_expr() {
343+
let node = is_not_null_node(None);
344+
let schema = Schema::empty();
345+
let decoder = UnreachableDecoder;
346+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
347+
let err = IsNotNullExpr::try_from_proto(&node, &ctx).unwrap_err();
348+
assert!(
349+
matches!(err, DataFusionError::Internal(msg) if msg.contains("IsNotNullExpr is missing required field 'expr'"))
350+
);
351+
}
352+
353+
#[test]
354+
fn try_from_proto_propagates_expr_decode_error() {
355+
let node = is_not_null_node(Some(Box::new(column_node("a"))));
356+
let schema = Schema::empty();
357+
let decoder = StubDecoder::failing_on(1);
358+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
359+
let err = IsNotNullExpr::try_from_proto(&node, &ctx).unwrap_err();
360+
assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 1")));
361+
}
362+
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -307,15 +307,7 @@ pub fn parse_physical_expr_with_converter(
307307
proto_converter,
308308
)?))
309309
}
310-
ExprType::IsNotNullExpr(e) => {
311-
Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
312-
e.expr.as_deref(),
313-
ctx,
314-
"expr",
315-
input_schema,
316-
proto_converter,
317-
)?))
318-
}
310+
ExprType::IsNotNullExpr(_) => IsNotNullExpr::try_from_proto(proto, &decode_ctx)?,
319311
ExprType::NotExpr(_) => NotExpr::try_from_proto(proto, &decode_ctx)?,
320312
ExprType::Negative(_) => NegativeExpr::try_from_proto(proto, &decode_ctx)?,
321313
ExprType::InList(_) => InListExpr::try_from_proto(proto, &decode_ctx)?,

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
3636
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, TryCastExpr,
39+
CaseExpr, DynamicFilterPhysicalExpr, IsNullExpr, Literal, TryCastExpr,
4040
};
4141
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
4242
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
@@ -356,17 +356,6 @@ pub fn serialize_physical_expr_with_converter(
356356
}),
357357
)),
358358
})
359-
} else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
360-
Ok(protobuf::PhysicalExprNode {
361-
expr_id,
362-
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
363-
Box::new(protobuf::PhysicalIsNotNull {
364-
expr: Some(Box::new(
365-
proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
366-
)),
367-
}),
368-
)),
369-
})
370359
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
371360
Ok(protobuf::PhysicalExprNode {
372361
expr_id,

0 commit comments

Comments
 (0)