diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index 4969fc33743c7..e6338f98519f1 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -27,6 +27,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{Result, internal_err}; + use datafusion_expr::ColumnarValue; #[derive(Debug, Clone, Eq)] @@ -84,6 +85,45 @@ impl PhysicalExpr for UnKnownColumn { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( + protobuf::UnknownColumn { + name: self.name.clone(), + }, + )), + })) + } +} + +#[cfg(feature = "proto")] +impl UnKnownColumn { + /// Reconstruct an [`UnKnownColumn`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + let protobuf::UnknownColumn { name } = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::UnknownColumn(c)) => c, + other => { + return internal_err!( + "PhysicalExprNode is not an UnKnownColumn (expr_id={:?}, expr_type={other:?})", + node.expr_id + ); + } + }; + Ok(Arc::new(UnKnownColumn::new(name))) + } } impl Hash for UnKnownColumn { @@ -99,3 +139,105 @@ impl PartialEq for UnKnownColumn { false } } + +/// Tests for the `try_to_proto` / `try_from_proto` hooks. +#[cfg(all(test, feature = "proto"))] +mod proto_tests { + use super::*; + use crate::proto_test_util::{StubEncoder, UnreachableDecoder, column_node}; + use arrow::datatypes::Schema; + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx; + use datafusion_proto_models::protobuf::{self, physical_expr_node}; + + // ── try_to_proto ───────────────────────────────────────────────────────── + + #[test] + fn try_to_proto_encodes_unknown_column() { + let expr = UnKnownColumn::new("my_col"); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = expr + .try_to_proto(&ctx) + .unwrap() + .expect("UnKnownColumn should encode to Some(node)"); + + // Built-in exprs never set expr_id; only dynamic filters do. + assert!(node.expr_id.is_none()); + + // Verify the encoded name matches the original. + let protobuf::UnknownColumn { name } = match node.expr_type { + Some(physical_expr_node::ExprType::UnknownColumn(c)) => c, + other => panic!("expected UnknownColumn proto node, got {other:?}"), + }; + assert_eq!(name, "my_col"); + } + + // ── try_from_proto ─────────────────────────────────────────────────────── + + #[test] + fn try_from_proto_decodes_name() { + let node = protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::UnknownColumn( + protobuf::UnknownColumn { + name: "my_col".to_string(), + }, + )), + }; + let schema = Schema::empty(); + // UnKnownColumn has no child exprs so the decoder is never called. + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = UnKnownColumn::try_from_proto(&node, &ctx).unwrap(); + let col = decoded + .downcast_ref::() + .expect("decoded expr should be an UnKnownColumn"); + assert_eq!(col.name(), "my_col"); + } + + #[test] + fn try_from_proto_rejects_non_unknown_column_node() { + // column_node produces an ExprType::Column node, not UnknownColumn. + let node = column_node("a"); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = UnKnownColumn::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(ref msg) + if msg.contains("PhysicalExprNode is not an UnKnownColumn") + // The error includes the actual expr_type for easier diagnosis. + && msg.contains("PhysicalColumn") + )); + } + + // ── roundtrip ──────────────────────────────────────────────────────────── + + #[test] + fn unknown_column_proto_roundtrip() { + let expr = UnKnownColumn::new("col_b"); + let encoder = StubEncoder::ok(); + let enc_ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = expr + .try_to_proto(&enc_ctx) + .unwrap() + .expect("UnKnownColumn should encode to Some(node)"); + + let schema = Schema::empty(); + // UnKnownColumn has no child exprs so the decoder is never called. + let decoder = UnreachableDecoder; + let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = UnKnownColumn::try_from_proto(&node, &dec_ctx).unwrap(); + let col = decoded + .downcast_ref::() + .expect("decoded expr should be an UnKnownColumn"); + assert_eq!(col.name(), "col_b"); + } +} diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..7b776a2b16bd2 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -278,7 +278,7 @@ pub fn parse_physical_expr_with_converter( // their own `ExprType` variant — see #21835. This match only routes // to the right constructor. ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?, - ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)), + ExprType::UnknownColumn(_) => UnKnownColumn::try_from_proto(proto, &decode_ctx)?, ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?, ExprType::AggregateExpr(_) => { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..14bce786bd891 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + NegativeExpr, NotExpr, TryCastExpr, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -327,16 +327,7 @@ pub fn serialize_physical_expr_with_converter( }); } - if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( - protobuf::UnknownColumn { - name: expr.name().to_string(), - }, - )), - }) - } else if let Some(expr) = expr.downcast_ref::() { + if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id, expr_type: Some(