Skip to content

Commit c7f35d6

Browse files
authored
refactor(physical-expr-common): add proto helpers for the recurring shapes in #22418, port already-migrated exprs (#22596)
## Which issue does this PR close? Part of #22418 / follow-up to #22513. ## Rationale for this change Every PR migrating a `PhysicalExpr` to `try_to_proto` / `try_from_proto` under #22418 re-implements the same two shapes that don't fit the existing helpers from #22513: 1. The outer `match &node.expr_type { ... }` that opens every `try_from_proto`: ```rust let try_cast = match &node.expr_type { Some(protobuf::physical_expr_node::ExprType::TryCast(x)) => x.as_ref(), _ => return internal_err!("PhysicalExprNode is not a TryCastExpr"), }; ``` 2. The hand-rolled "missing required field 'X'" error for non-expression fields like `arrow_type` on `CastExpr` / `TryCastExpr`. Each shape leaks across the 7+ remaining open migration PRs. Adding small helpers in `physical-expr-common` keeps the per-expression diff minimal and the error messages consistent. ## What changes are included in this PR? **Commit 1 — `feat(physical-expr-common): add proto helpers ...`** Two new helpers in `datafusion-physical-expr-common`, both gated on `feature = "proto"`: - `expect_expr_variant!` macro (re-exported at crate root) — matches `Option<ExprType>`, returns inner payload, errors with `"PhysicalExprNode is not a {variant}"`. - `proto_decode::require_proto_field<T>(opt, expr_name, field)` — mirrors `decode_required_expression` for non-`PhysicalExprNode` fields. Five unit tests cover the helpers (success + the two reject paths for the macro). **Commit 2 — `refactor(physical-expr): adopt new proto helpers in already-migrated expressions`** Ports every expression already on the new hooks: - `Column`, `BinaryExpr` (originally #21929) - `LikeExpr` (#22471) - `InListExpr` (#22503) - `NegativeExpr` (#22483) `BinaryExpr` additionally adopts `decode_required_expression` for its legacy `l`/`r` arms and `encode_children_expressions` / `decode_children_expressions` for the linearized `operands` path, removing two more hand-rolled "missing required field" strings. One existing test changes assertion text — `InListExpr`'s rejected-variant message was the only one using the article "an" instead of "a"; the macro emits article-free "a {Variant}" uniformly. The two commits are stacked for review: commit 1 is the helper addition only; commit 2 is the adoption. Either can be reviewed in isolation. ## Are these changes tested? Yes: - `cargo test -p datafusion-physical-expr-common --features proto` — new helper unit tests pass. - `cargo test -p datafusion-physical-expr --features proto proto_tests` — 23 / 23 per-expression proto tests pass (1 assertion-string update in InList). - `cargo test -p datafusion-proto --test proto_integration` — 173 / 173 pass; no wire-format change. - `cargo clippy --all-targets --all-features -- -D warnings` clean on the touched crates. ## Are there any user-facing changes? No. New API surface in `datafusion-physical-expr-common` (helpers gated on `feature = "proto"`); no change to serialized output. The macro `expect_expr_variant!` is exported at the crate root.
1 parent 69786d8 commit c7f35d6

6 files changed

Lines changed: 191 additions & 57 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,48 @@ pub mod proto_decode {
619619

620620
use super::PhysicalExpr;
621621

622+
/// Open the outer [`PhysicalExprNode`] and assert it carries the expected
623+
/// `ExprType` variant, returning the inner payload (auto-derefs through
624+
/// `Box`) or bailing with an `Internal` error.
625+
///
626+
/// Every `try_from_proto` starts with the same six-line `match`:
627+
///
628+
/// ```ignore
629+
/// let try_cast = match &node.expr_type {
630+
/// Some(protobuf::physical_expr_node::ExprType::TryCast(x)) => x.as_ref(),
631+
/// _ => return internal_err!("PhysicalExprNode is not a TryCastExpr"),
632+
/// };
633+
/// ```
634+
///
635+
/// With this macro that collapses to:
636+
///
637+
/// ```ignore
638+
/// let try_cast = expect_expr_variant!(
639+
/// node,
640+
/// protobuf::physical_expr_node::ExprType::TryCast,
641+
/// "TryCastExpr",
642+
/// );
643+
/// ```
644+
///
645+
/// Pass the variant as a `::` path so the macro stays agnostic to how
646+
/// the caller imports the proto types.
647+
#[macro_export]
648+
macro_rules! expect_expr_variant {
649+
($node:expr, $variant:path, $expr_name:literal $(,)?) => {{
650+
match &$node.expr_type {
651+
::core::option::Option::Some($variant(inner)) => inner,
652+
_ => {
653+
return ::datafusion_common::internal_err!(concat!(
654+
"PhysicalExprNode is not a ",
655+
$expr_name
656+
));
657+
}
658+
}
659+
}};
660+
}
661+
#[doc(inline)]
662+
pub use expect_expr_variant;
663+
622664
/// Decoder context handed to per-expression `try_from_proto` constructors.
623665
///
624666
/// Wraps an internal [`PhysicalExprDecode`] trait object plus a borrowed
@@ -693,6 +735,33 @@ pub mod proto_decode {
693735
}
694736
}
695737

738+
/// Unwrap a required non-expression proto field.
739+
///
740+
/// Mirrors [`PhysicalExprDecodeCtx::decode_required_expression`] for proto
741+
/// fields that aren't [`PhysicalExprNode`]s — e.g. the `arrow_type` of a
742+
/// `PhysicalCastNode` or the `scalar` of a `PhysicalLiteralNode`. Keeps
743+
/// the "missing required field" message format identical across
744+
/// expressions:
745+
///
746+
/// ```ignore
747+
/// let arrow_type = require_proto_field(
748+
/// cast_expr.arrow_type.as_ref(),
749+
/// "CastExpr",
750+
/// "arrow_type",
751+
/// )?;
752+
/// ```
753+
pub fn require_proto_field<T>(
754+
opt: Option<T>,
755+
expr_name: &str,
756+
field: &str,
757+
) -> Result<T> {
758+
opt.ok_or_else(|| {
759+
datafusion_common::internal_datafusion_err!(
760+
"{expr_name} is missing required field '{field}'"
761+
)
762+
})
763+
}
764+
696765
/// Internal dispatch trait. Implementors live in `datafusion-proto`.
697766
/// Expression authors should use [`PhysicalExprDecodeCtx`] instead of
698767
/// calling this directly.
@@ -1143,3 +1212,83 @@ mod test {
11431212
);
11441213
}
11451214
}
1215+
1216+
#[cfg(all(test, feature = "proto"))]
1217+
mod proto_helper_tests {
1218+
use datafusion_common::DataFusionError;
1219+
use datafusion_proto_models::protobuf::{
1220+
self, PhysicalColumn, PhysicalExprNode, physical_expr_node,
1221+
};
1222+
1223+
use crate::expect_expr_variant;
1224+
use crate::physical_expr::proto_decode::require_proto_field;
1225+
1226+
fn column_node() -> PhysicalExprNode {
1227+
PhysicalExprNode {
1228+
expr_id: None,
1229+
expr_type: Some(physical_expr_node::ExprType::Column(PhysicalColumn {
1230+
name: "a".to_string(),
1231+
index: 0,
1232+
})),
1233+
}
1234+
}
1235+
1236+
#[test]
1237+
fn require_proto_field_returns_inner() {
1238+
let v = require_proto_field(Some(7_u32), "FooExpr", "answer").unwrap();
1239+
assert_eq!(v, 7);
1240+
}
1241+
1242+
#[test]
1243+
fn require_proto_field_reports_missing() {
1244+
let err = require_proto_field::<u32>(None, "FooExpr", "answer").unwrap_err();
1245+
assert!(matches!(
1246+
err,
1247+
DataFusionError::Internal(msg)
1248+
if msg.contains("FooExpr is missing required field 'answer'")
1249+
));
1250+
}
1251+
1252+
fn expect_column(
1253+
node: &PhysicalExprNode,
1254+
) -> Result<&PhysicalColumn, DataFusionError> {
1255+
let inner =
1256+
expect_expr_variant!(node, physical_expr_node::ExprType::Column, "Column",);
1257+
Ok(inner)
1258+
}
1259+
1260+
#[test]
1261+
fn expect_expr_variant_returns_inner_payload() {
1262+
let node = column_node();
1263+
let col = expect_column(&node).unwrap();
1264+
assert_eq!(col.name, "a");
1265+
}
1266+
1267+
#[test]
1268+
fn expect_expr_variant_rejects_wrong_variant() {
1269+
let node = PhysicalExprNode {
1270+
expr_id: None,
1271+
expr_type: Some(physical_expr_node::ExprType::Negative(Box::new(
1272+
protobuf::PhysicalNegativeNode { expr: None },
1273+
))),
1274+
};
1275+
let err = expect_column(&node).unwrap_err();
1276+
assert!(matches!(
1277+
err,
1278+
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a Column")
1279+
));
1280+
}
1281+
1282+
#[test]
1283+
fn expect_expr_variant_rejects_missing_expr_type() {
1284+
let node = PhysicalExprNode {
1285+
expr_id: None,
1286+
expr_type: None,
1287+
};
1288+
let err = expect_column(&node).unwrap_err();
1289+
assert!(matches!(
1290+
err,
1291+
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a Column")
1292+
));
1293+
}
1294+
}

datafusion/physical-expr/src/expressions/binary.rs

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -638,10 +638,7 @@ impl PhysicalExpr for BinaryExpr {
638638
// Reverse so operands are ordered from left innermost to right outermost.
639639
operand_refs.reverse();
640640

641-
let operands = operand_refs
642-
.iter()
643-
.map(|e| ctx.encode_child(e))
644-
.collect::<Result<Vec<_>>>()?;
641+
let operands = ctx.encode_children_expressions(operand_refs)?;
645642

646643
Ok(Some(protobuf::PhysicalExprNode {
647644
expr_id: None,
@@ -675,11 +672,13 @@ impl BinaryExpr {
675672
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
676673
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
677674
) -> Result<Arc<dyn PhysicalExpr>> {
675+
use datafusion_physical_expr_common::expect_expr_variant;
678676
use datafusion_proto_models::protobuf;
679-
let node = match &node.expr_type {
680-
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(b)) => b.as_ref(),
681-
_ => return internal_err!("PhysicalExprNode is not a BinaryExpr"),
682-
};
677+
let node = expect_expr_variant!(
678+
node,
679+
protobuf::physical_expr_node::ExprType::BinaryExpr,
680+
"BinaryExpr",
681+
);
683682
let op = Operator::from_proto_name(&node.op).ok_or_else(|| {
684683
datafusion_common::DataFusionError::Internal(format!(
685684
"Unsupported binary operator '{}'",
@@ -690,17 +689,12 @@ impl BinaryExpr {
690689
if !node.operands.is_empty() {
691690
// New linearized format: reduce the flat operands list back into
692691
// a nested binary expression tree.
693-
let operands = node
694-
.operands
695-
.iter()
696-
.map(|e| ctx.decode(e))
697-
.collect::<Result<Vec<_>>>()?;
692+
let operands = ctx.decode_children_expressions(&node.operands)?;
698693

699694
if operands.len() < 2 {
700-
return Err(datafusion_common::DataFusionError::Internal(
695+
return internal_err!(
701696
"A binary expression must always have at least 2 operands"
702-
.to_string(),
703-
));
697+
);
704698
}
705699

706700
Ok(operands
@@ -711,21 +705,11 @@ impl BinaryExpr {
711705
.expect("Binary expression could not be reduced to a single expression."))
712706
} else {
713707
// Legacy format with l/r fields.
714-
let left = node.l.as_deref().ok_or_else(|| {
715-
datafusion_common::DataFusionError::Internal(
716-
"BinaryExpr is missing required field 'left'".to_string(),
717-
)
718-
})?;
719-
let right = node.r.as_deref().ok_or_else(|| {
720-
datafusion_common::DataFusionError::Internal(
721-
"BinaryExpr is missing required field 'right'".to_string(),
722-
)
723-
})?;
724-
Ok(Arc::new(BinaryExpr::new(
725-
ctx.decode(left)?,
726-
op,
727-
ctx.decode(right)?,
728-
)))
708+
let left =
709+
ctx.decode_required_expression(node.l.as_deref(), "BinaryExpr", "left")?;
710+
let right =
711+
ctx.decode_required_expression(node.r.as_deref(), "BinaryExpr", "right")?;
712+
Ok(Arc::new(BinaryExpr::new(left, op, right)))
729713
}
730714
}
731715
}

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,13 @@ impl Column {
182182
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
183183
_ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
184184
) -> Result<Arc<dyn PhysicalExpr>> {
185+
use datafusion_physical_expr_common::expect_expr_variant;
185186
use datafusion_proto_models::protobuf;
186-
let protobuf::PhysicalColumn { name, index } = match &node.expr_type {
187-
Some(protobuf::physical_expr_node::ExprType::Column(c)) => c,
188-
_ => return internal_err!("PhysicalExprNode is not a Column"),
189-
};
187+
let protobuf::PhysicalColumn { name, index } = expect_expr_variant!(
188+
node,
189+
protobuf::physical_expr_node::ExprType::Column,
190+
"Column",
191+
);
190192
Ok(Arc::new(Column::new(name, *index as usize)))
191193
}
192194
}

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,14 @@ impl InListExpr {
252252
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
253253
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
254254
) -> Result<Arc<dyn PhysicalExpr>> {
255+
use datafusion_physical_expr_common::expect_expr_variant;
255256
use datafusion_proto_models::protobuf;
256257

257-
let node = match &node.expr_type {
258-
Some(protobuf::physical_expr_node::ExprType::InList(n)) => n,
259-
_ => {
260-
return datafusion_common::internal_err!(
261-
"PhysicalExprNode is not an InList"
262-
);
263-
}
264-
};
258+
let node = expect_expr_variant!(
259+
node,
260+
protobuf::physical_expr_node::ExprType::InList,
261+
"InList",
262+
);
265263

266264
let expr =
267265
ctx.decode_required_expression(node.expr.as_deref(), "InListExpr", "expr")?;
@@ -3981,7 +3979,7 @@ mod proto_tests {
39813979
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
39823980
assert!(matches!(
39833981
err,
3984-
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not an InList")
3982+
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a InList")
39853983
));
39863984
}
39873985

datafusion/physical-expr/src/expressions/like.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,14 @@ impl LikeExpr {
180180
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
181181
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
182182
) -> Result<Arc<dyn PhysicalExpr>> {
183-
use datafusion_common::internal_err;
183+
use datafusion_physical_expr_common::expect_expr_variant;
184184
use datafusion_proto_models::protobuf;
185185

186-
let like_expr = match &node.expr_type {
187-
Some(protobuf::physical_expr_node::ExprType::LikeExpr(like_expr)) => {
188-
like_expr.as_ref()
189-
}
190-
_ => return internal_err!("PhysicalExprNode is not a LikeExpr"),
191-
};
186+
let like_expr = expect_expr_variant!(
187+
node,
188+
protobuf::physical_expr_node::ExprType::LikeExpr,
189+
"LikeExpr",
190+
);
192191

193192
Ok(Arc::new(LikeExpr::new(
194193
like_expr.negated,

datafusion/physical-expr/src/expressions/negative.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,16 @@ impl NegativeExpr {
200200
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
201201
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
202202
) -> Result<Arc<dyn PhysicalExpr>> {
203+
use datafusion_physical_expr_common::expect_expr_variant;
203204
use datafusion_proto_models::protobuf;
204205

205-
let expr = match &node.expr_type {
206-
Some(protobuf::physical_expr_node::ExprType::Negative(n)) => {
207-
ctx.decode_required_expression(n.expr.as_deref(), "NegativeExpr", "expr")?
208-
}
209-
_ => return internal_err!("PhysicalExprNode is not a Negative"),
210-
};
206+
let n = expect_expr_variant!(
207+
node,
208+
protobuf::physical_expr_node::ExprType::Negative,
209+
"Negative",
210+
);
211+
let expr =
212+
ctx.decode_required_expression(n.expr.as_deref(), "NegativeExpr", "expr")?;
211213

212214
Ok(Arc::new(NegativeExpr::new(expr)))
213215
}

0 commit comments

Comments
 (0)