Skip to content

Commit 1b8451c

Browse files
authored
refactor: port InListExpr to use try_to_proto/try_from_proto hooks (#22503)
## Which issue does this PR close? - Closes #22425 ## Rationale for this change This PR migrates InListExpr to use the new try_to_proto and try_from_proto hooks, following the modular serialization architecture established in #21835 . This moves serialization logic into the expression itself, improving encapsulation and decentralizing the datafusion-proto logic as part of the broader effort in #22418 ## What changes are included in this PR? - Implemented `PhysicalExpr::try_to_proto for InListExpr`. - Implemented `InListExpr::try_from_proto` inherent method. - Wired hooks in `from_proto.rs` and removed the central downcast arm in to_proto.rs. - Added isolated unit tests in `in_list.rs` using mock drivers to verify roundtrips and error handling. ## Are these changes tested? Yes, these changes are covered by both new and existing tests: - New Unit Tests: Added mod `proto_tests` to in_list.rs using mock drivers to verify `try_to_proto` and `try_from_proto` in isolation. These cover successful roundtrips, incorrect node types, and missing required fields. - Existing Integration Tests: Verified that the existing InList roundtrip tests in datafusion-proto continue to pass after removing the central downcast logic. - Linting: Verified that the changes pass cargo `clippy --all-targets --all-features` with zero warnings. ## Are there any user-facing changes? No.
1 parent 4c0f944 commit 1b8451c

3 files changed

Lines changed: 224 additions & 29 deletions

File tree

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,42 @@ impl InListExpr {
246246

247247
Ok(Self::new(expr, list, negated, static_filter))
248248
}
249+
250+
#[cfg(feature = "proto")]
251+
pub fn try_from_proto(
252+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
253+
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
254+
) -> Result<Arc<dyn PhysicalExpr>> {
255+
use datafusion_proto_models::protobuf;
256+
257+
let node = match &node.expr_type {
258+
Some(protobuf::physical_expr_node::ExprType::InList(n)) => n,
259+
_ => {
260+
return datafusion_common::internal_err!(
261+
"PhysicalExprNode is not an InList"
262+
);
263+
}
264+
};
265+
266+
let expr = ctx.decode(node.expr.as_deref().ok_or_else(|| {
267+
datafusion_common::DataFusionError::Internal(
268+
"InList is missing required field 'expr'".to_string(),
269+
)
270+
})?)?;
271+
272+
let list = node
273+
.list
274+
.iter()
275+
.map(|e| ctx.decode(e))
276+
.collect::<Result<Vec<_>>>()?;
277+
278+
Ok(Arc::new(InListExpr::try_new(
279+
expr,
280+
list,
281+
node.negated,
282+
ctx.schema(),
283+
)?))
284+
}
249285
}
250286
impl std::fmt::Display for InListExpr {
251287
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -442,6 +478,29 @@ impl PhysicalExpr for InListExpr {
442478
}
443479
write!(f, ")")
444480
}
481+
482+
#[cfg(feature = "proto")]
483+
fn try_to_proto(
484+
&self,
485+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
486+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
487+
use datafusion_proto_models::protobuf;
488+
489+
Ok(Some(protobuf::PhysicalExprNode {
490+
expr_id: None,
491+
expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
492+
protobuf::PhysicalInListNode {
493+
expr: Some(Box::new(ctx.encode_child(&self.expr)?)),
494+
list: self
495+
.list
496+
.iter()
497+
.map(|e| ctx.encode_child(e))
498+
.collect::<Result<Vec<_>>>()?,
499+
negated: self.negated,
500+
},
501+
))),
502+
}))
503+
}
445504
}
446505

447506
impl PartialEq for InListExpr {
@@ -3821,3 +3880,163 @@ mod tests {
38213880
Ok(())
38223881
}
38233882
}
3883+
3884+
#[cfg(all(test, feature = "proto"))]
3885+
mod proto_tests {
3886+
use super::*;
3887+
use crate::expressions::{Column, col, lit};
3888+
use crate::proto_test_util::{
3889+
StubDecoder, StubEncoder, UnreachableDecoder, column_node,
3890+
};
3891+
use arrow::datatypes::Field;
3892+
use datafusion_common::DataFusionError;
3893+
use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
3894+
use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
3895+
use datafusion_proto_models::protobuf::{
3896+
PhysicalExprNode, PhysicalInListNode, physical_expr_node,
3897+
};
3898+
3899+
/// Build an `InListExpr` proto node with the given children.
3900+
fn in_list_node(
3901+
expr: Option<Box<PhysicalExprNode>>,
3902+
list: Vec<PhysicalExprNode>,
3903+
negated: bool,
3904+
) -> PhysicalExprNode {
3905+
PhysicalExprNode {
3906+
expr_id: None,
3907+
expr_type: Some(physical_expr_node::ExprType::InList(Box::new(
3908+
PhysicalInListNode {
3909+
expr,
3910+
list,
3911+
negated,
3912+
},
3913+
))),
3914+
}
3915+
}
3916+
3917+
/// An `InListExpr` over a column with one literal value.
3918+
fn in_list_fixture() -> InListExpr {
3919+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
3920+
InListExpr::try_new(col("a", &schema).unwrap(), vec![lit(1)], false, &schema)
3921+
.unwrap()
3922+
}
3923+
3924+
#[test]
3925+
fn try_to_proto_encodes_in_list() {
3926+
let in_list = in_list_fixture();
3927+
let encoder = StubEncoder::ok();
3928+
let ctx = PhysicalExprEncodeCtx::new(&encoder);
3929+
3930+
let node = in_list
3931+
.try_to_proto(&ctx)
3932+
.unwrap()
3933+
.expect("InListExpr should encode to Some(node)");
3934+
3935+
// Built-in exprs never set expr_id; only dynamic filters do.
3936+
assert!(node.expr_id.is_none());
3937+
let in_list_node = match node.expr_type {
3938+
Some(physical_expr_node::ExprType::InList(boxed)) => *boxed,
3939+
other => panic!("expected an InList node, got {other:?}"),
3940+
};
3941+
assert!(!in_list_node.negated);
3942+
assert!(in_list_node.expr.is_some());
3943+
assert_eq!(in_list_node.list.len(), 1);
3944+
}
3945+
3946+
#[test]
3947+
fn try_to_proto_propagates_expr_encode_error() {
3948+
let in_list = in_list_fixture();
3949+
let encoder = StubEncoder::failing_on(1);
3950+
let ctx = PhysicalExprEncodeCtx::new(&encoder);
3951+
let err = in_list.try_to_proto(&ctx).unwrap_err();
3952+
assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 1")));
3953+
}
3954+
3955+
#[test]
3956+
fn try_to_proto_propagates_list_encode_error() {
3957+
let in_list = in_list_fixture();
3958+
// Call 1 is for `expr`, Call 2 is for the first element of `list`
3959+
let encoder = StubEncoder::failing_on(2);
3960+
let ctx = PhysicalExprEncodeCtx::new(&encoder);
3961+
let err = in_list.try_to_proto(&ctx).unwrap_err();
3962+
assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 2")));
3963+
}
3964+
3965+
#[test]
3966+
fn try_from_proto_decodes_in_list() {
3967+
let node = in_list_node(
3968+
Some(Box::new(column_node("a"))),
3969+
vec![column_node("b")],
3970+
true,
3971+
);
3972+
let schema = Schema::new(vec![Field::new("decoded", DataType::Int32, true)]);
3973+
let decoder = StubDecoder::ok();
3974+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
3975+
3976+
let decoded = InListExpr::try_from_proto(&node, &ctx).unwrap();
3977+
let in_list = decoded
3978+
.downcast_ref::<InListExpr>()
3979+
.expect("decoded expr should be an InListExpr");
3980+
3981+
assert!(in_list.negated());
3982+
assert!(in_list.expr().downcast_ref::<Column>().is_some());
3983+
assert_eq!(in_list.list().len(), 1);
3984+
}
3985+
3986+
#[test]
3987+
fn try_from_proto_rejects_non_in_list_node() {
3988+
let node = column_node("a");
3989+
let schema = Schema::empty();
3990+
let decoder = UnreachableDecoder;
3991+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
3992+
3993+
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
3994+
assert!(matches!(
3995+
err,
3996+
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not an InList")
3997+
));
3998+
}
3999+
4000+
#[test]
4001+
fn try_from_proto_rejects_missing_expr() {
4002+
let node = in_list_node(None, vec![column_node("b")], false);
4003+
let schema = Schema::empty();
4004+
let decoder = UnreachableDecoder;
4005+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
4006+
4007+
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
4008+
assert!(matches!(
4009+
err,
4010+
DataFusionError::Internal(msg) if msg.contains("InList is missing required field 'expr'")
4011+
));
4012+
}
4013+
4014+
#[test]
4015+
fn try_from_proto_propagates_expr_decode_error() {
4016+
let node = in_list_node(
4017+
Some(Box::new(column_node("a"))),
4018+
vec![column_node("b")],
4019+
false,
4020+
);
4021+
let schema = Schema::empty();
4022+
let decoder = StubDecoder::failing_on(1);
4023+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
4024+
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
4025+
assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 1")));
4026+
}
4027+
4028+
#[test]
4029+
fn try_from_proto_propagates_list_decode_error() {
4030+
let node = in_list_node(
4031+
Some(Box::new(column_node("a"))),
4032+
vec![column_node("b")],
4033+
false,
4034+
);
4035+
let schema = Schema::empty();
4036+
// Call 1 is `expr`, Call 2 is the first element of `list`
4037+
let decoder = StubDecoder::failing_on(2);
4038+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
4039+
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
4040+
assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 2")));
4041+
}
4042+
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
4343
use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
4444
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
4545
use datafusion_physical_plan::expressions::{
46-
BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
47-
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
46+
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
47+
LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4848
};
4949
use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
5050
use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
@@ -328,18 +328,7 @@ pub fn parse_physical_expr_with_converter(
328328
proto_converter,
329329
)?))
330330
}
331-
ExprType::InList(e) => in_list(
332-
parse_required_physical_expr(
333-
e.expr.as_deref(),
334-
ctx,
335-
"expr",
336-
input_schema,
337-
proto_converter,
338-
)?,
339-
parse_physical_exprs(&e.list, ctx, input_schema, proto_converter)?,
340-
&e.negated,
341-
input_schema,
342-
)?,
331+
ExprType::InList(_) => InListExpr::try_from_proto(proto, &decode_ctx)?,
343332
ExprType::Case(e) => Arc::new(CaseExpr::try_new(
344333
e.expr
345334
.as_ref()

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
3636
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr,
40-
Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
39+
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal,
40+
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4141
};
4242
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
@@ -412,19 +412,6 @@ pub fn serialize_physical_expr_with_converter(
412412
}),
413413
)),
414414
})
415-
} else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
416-
Ok(protobuf::PhysicalExprNode {
417-
expr_id,
418-
expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
419-
protobuf::PhysicalInListNode {
420-
expr: Some(Box::new(
421-
proto_converter.physical_expr_to_proto(expr.expr(), codec)?,
422-
)),
423-
list: serialize_physical_exprs(expr.list(), codec, proto_converter)?,
424-
negated: expr.negated(),
425-
},
426-
))),
427-
})
428415
} else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
429416
Ok(protobuf::PhysicalExprNode {
430417
expr_id,

0 commit comments

Comments
 (0)