Skip to content

Commit 811e6da

Browse files
committed
Port IsNotNullExpr proto serialization hooks
1 parent bdf8a6d commit 811e6da

3 files changed

Lines changed: 178 additions & 24 deletions

File tree

datafusion/physical-expr/src/expressions/is_not_null.rs

Lines changed: 175 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ use arrow::{
2222
datatypes::{DataType, Schema},
2323
record_batch::RecordBatch,
2424
};
25-
use datafusion_common::Result;
26-
use datafusion_common::ScalarValue;
25+
use datafusion_common::{Result, ScalarValue};
2726
use datafusion_expr::ColumnarValue;
2827
use std::hash::Hash;
2928
use std::sync::Arc;
@@ -103,6 +102,60 @@ impl PhysicalExpr for IsNotNullExpr {
103102
self.arg.fmt_sql(f)?;
104103
write!(f, " IS NOT NULL")
105104
}
105+
106+
#[cfg(feature = "proto")]
107+
fn try_to_proto(
108+
&self,
109+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
110+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
111+
use datafusion_proto_models::protobuf;
112+
113+
Ok(Some(protobuf::PhysicalExprNode {
114+
expr_id: None,
115+
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
116+
Box::new(protobuf::PhysicalIsNotNull {
117+
expr: Some(Box::new(ctx.encode_child(&self.arg)?)),
118+
}),
119+
)),
120+
}))
121+
}
122+
}
123+
124+
#[cfg(feature = "proto")]
125+
impl IsNotNullExpr {
126+
/// Reconstruct an [`IsNotNullExpr`] from its protobuf representation.
127+
///
128+
/// Takes the whole [`PhysicalExprNode`] — the exact inverse of what
129+
/// [`PhysicalExpr::try_to_proto`] produces — so every expression's
130+
/// `try_from_proto` shares one signature.
131+
///
132+
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
133+
/// [`PhysicalExpr::try_to_proto`]: datafusion_physical_expr_common::physical_expr::PhysicalExpr::try_to_proto
134+
/// [`PhysicalExprDecodeCtx::decode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::decode
135+
pub fn try_from_proto(
136+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
137+
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
138+
) -> Result<Arc<dyn PhysicalExpr>> {
139+
use datafusion_proto_models::protobuf;
140+
141+
let node = match &node.expr_type {
142+
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(node)) => {
143+
node.as_ref()
144+
}
145+
_ => {
146+
return datafusion_common::internal_err!(
147+
"PhysicalExprNode is not an IsNotNullExpr"
148+
);
149+
}
150+
};
151+
let expr = node.expr.as_deref().ok_or_else(|| {
152+
datafusion_common::DataFusionError::Internal(
153+
"IsNotNullExpr is missing required field 'expr'".to_string(),
154+
)
155+
})?;
156+
157+
Ok(Arc::new(IsNotNullExpr::new(ctx.decode(expr)?)))
158+
}
106159
}
107160

108161
/// Create an IS NOT NULL expression
@@ -113,6 +166,8 @@ pub fn is_not_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>
113166
#[cfg(test)]
114167
mod tests {
115168
use super::*;
169+
#[cfg(feature = "proto")]
170+
use crate::expressions::Column;
116171
use crate::expressions::col;
117172
use arrow::array::{
118173
Array, BooleanArray, Float64Array, Int32Array, StringArray, UnionArray,
@@ -122,6 +177,54 @@ mod tests {
122177
use datafusion_common::cast::as_boolean_array;
123178
use datafusion_physical_expr_common::physical_expr::fmt_sql;
124179

180+
#[cfg(feature = "proto")]
181+
use datafusion_physical_expr_common::physical_expr::{
182+
proto_decode::{PhysicalExprDecode, PhysicalExprDecodeCtx},
183+
proto_encode::{PhysicalExprEncode, PhysicalExprEncodeCtx},
184+
};
185+
#[cfg(feature = "proto")]
186+
use datafusion_proto_models::protobuf;
187+
188+
#[cfg(feature = "proto")]
189+
struct TestProtoCodec;
190+
191+
#[cfg(feature = "proto")]
192+
impl PhysicalExprEncode for TestProtoCodec {
193+
fn encode(
194+
&self,
195+
expr: &Arc<dyn PhysicalExpr>,
196+
) -> Result<protobuf::PhysicalExprNode> {
197+
let ctx = PhysicalExprEncodeCtx::new(self);
198+
expr.try_to_proto(&ctx)?.ok_or_else(|| {
199+
datafusion_common::DataFusionError::Internal(
200+
"Expression did not serialize in test codec".to_string(),
201+
)
202+
})
203+
}
204+
}
205+
206+
#[cfg(feature = "proto")]
207+
impl PhysicalExprDecode for TestProtoCodec {
208+
fn decode(
209+
&self,
210+
node: &protobuf::PhysicalExprNode,
211+
schema: &Schema,
212+
) -> Result<Arc<dyn PhysicalExpr>> {
213+
let ctx = PhysicalExprDecodeCtx::new(schema, self);
214+
match &node.expr_type {
215+
Some(protobuf::physical_expr_node::ExprType::Column(_)) => {
216+
Column::try_from_proto(node, &ctx)
217+
}
218+
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(_)) => {
219+
IsNotNullExpr::try_from_proto(node, &ctx)
220+
}
221+
_ => datafusion_common::internal_err!(
222+
"Unsupported expression in test decoder"
223+
),
224+
}
225+
}
226+
}
227+
125228
#[test]
126229
fn is_not_null_op() -> Result<()> {
127230
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
@@ -212,4 +315,74 @@ mod tests {
212315

213316
Ok(())
214317
}
318+
319+
#[cfg(feature = "proto")]
320+
#[test]
321+
fn is_not_null_proto_hook_roundtrip() -> Result<()> {
322+
let arg = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
323+
let expr = Arc::new(IsNotNullExpr::new(arg)) as Arc<dyn PhysicalExpr>;
324+
325+
let codec = TestProtoCodec;
326+
let proto = codec.encode(&expr)?;
327+
328+
let is_not_null = match &proto.expr_type {
329+
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(is_not_null)) => {
330+
is_not_null
331+
}
332+
other => panic!("Expected IsNotNullExpr proto, got {other:?}"),
333+
};
334+
assert!(matches!(
335+
is_not_null
336+
.expr
337+
.as_deref()
338+
.and_then(|expr| expr.expr_type.as_ref()),
339+
Some(protobuf::physical_expr_node::ExprType::Column(_))
340+
));
341+
342+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
343+
let decode_ctx = PhysicalExprDecodeCtx::new(&schema, &codec);
344+
let decoded = IsNotNullExpr::try_from_proto(&proto, &decode_ctx)?;
345+
346+
assert_eq!(decoded.to_string(), "a@0 IS NOT NULL");
347+
Ok(())
348+
}
349+
350+
#[cfg(feature = "proto")]
351+
#[test]
352+
fn is_not_null_try_from_proto_errors() {
353+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
354+
let codec = TestProtoCodec;
355+
let decode_ctx = PhysicalExprDecodeCtx::new(&schema, &codec);
356+
357+
let wrong_variant = protobuf::PhysicalExprNode {
358+
expr_id: None,
359+
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
360+
protobuf::PhysicalColumn {
361+
name: "a".to_string(),
362+
index: 0,
363+
},
364+
)),
365+
};
366+
let error = IsNotNullExpr::try_from_proto(&wrong_variant, &decode_ctx)
367+
.expect_err("wrong variant should error")
368+
.strip_backtrace();
369+
assert!(
370+
error.contains("PhysicalExprNode is not an IsNotNullExpr"),
371+
"{error}"
372+
);
373+
374+
let missing_child = protobuf::PhysicalExprNode {
375+
expr_id: None,
376+
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
377+
Box::new(protobuf::PhysicalIsNotNull { expr: None }),
378+
)),
379+
};
380+
let error = IsNotNullExpr::try_from_proto(&missing_child, &decode_ctx)
381+
.expect_err("missing child should error")
382+
.strip_backtrace();
383+
assert!(
384+
error.contains("IsNotNullExpr is missing required field 'expr'"),
385+
"{error}"
386+
);
387+
}
215388
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -303,15 +303,7 @@ pub fn parse_physical_expr_with_converter(
303303
proto_converter,
304304
)?))
305305
}
306-
ExprType::IsNotNullExpr(e) => {
307-
Arc::new(IsNotNullExpr::new(parse_required_physical_expr(
308-
e.expr.as_deref(),
309-
ctx,
310-
"expr",
311-
input_schema,
312-
proto_converter,
313-
)?))
314-
}
306+
ExprType::IsNotNullExpr(_) => IsNotNullExpr::try_from_proto(proto, &decode_ctx)?,
315307
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
316308
e.expr.as_deref(),
317309
ctx,

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
3636
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal,
40-
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
39+
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNullExpr, Literal, NegativeExpr,
40+
NotExpr, TryCastExpr, UnKnownColumn,
4141
};
4242
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
@@ -401,17 +401,6 @@ pub fn serialize_physical_expr_with_converter(
401401
}),
402402
)),
403403
})
404-
} else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
405-
Ok(protobuf::PhysicalExprNode {
406-
expr_id,
407-
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
408-
Box::new(protobuf::PhysicalIsNotNull {
409-
expr: Some(Box::new(
410-
proto_converter.physical_expr_to_proto(expr.arg(), codec)?,
411-
)),
412-
}),
413-
)),
414-
})
415404
} else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
416405
Ok(protobuf::PhysicalExprNode {
417406
expr_id,

0 commit comments

Comments
 (0)