From f6a9e73ae5ce7eb791913ac5dbeb2b0dc98c5e0a Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Fri, 22 May 2026 18:03:10 +0100 Subject: [PATCH 1/7] Migrate UnKnownColumn proto hooks Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/expressions/unknown_column.rs | 33 +++++++++++++++++++ .../proto/src/physical_plan/from_proto.rs | 2 +- .../proto/src/physical_plan/to_proto.rs | 13 ++------ 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index 4969fc33743c7..356dcaec67cad 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -27,6 +27,9 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{Result, internal_err}; + +#[cfg(feature = "proto")] +use datafusion_proto_models::protobuf; use datafusion_expr::ColumnarValue; #[derive(Debug, Clone, Eq)] @@ -84,6 +87,36 @@ impl PhysicalExpr for UnKnownColumn { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( + protobuf::UnknownColumn { + name: self.name.clone(), + }, + )), + })) + } +} + +#[cfg(feature = "proto")] +impl UnKnownColumn { + /// Reconstruct an [`UnKnownColumn`] from its protobuf representation. + pub fn try_from_proto( + node: &protobuf::PhysicalExprNode, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + let protobuf::UnknownColumn { name } = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::UnknownColumn(c)) => c, + _ => return internal_err!("PhysicalExprNode is not an UnKnownColumn"), + }; + Ok(Arc::new(UnKnownColumn::new(name))) + } } impl Hash for UnKnownColumn { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..7b776a2b16bd2 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -278,7 +278,7 @@ pub fn parse_physical_expr_with_converter( // their own `ExprType` variant — see #21835. This match only routes // to the right constructor. ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?, - ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)), + ExprType::UnknownColumn(_) => UnKnownColumn::try_from_proto(proto, &decode_ctx)?, ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?, ExprType::AggregateExpr(_) => { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..14bce786bd891 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + NegativeExpr, NotExpr, TryCastExpr, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -327,16 +327,7 @@ pub fn serialize_physical_expr_with_converter( }); } - if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( - protobuf::UnknownColumn { - name: expr.name().to_string(), - }, - )), - }) - } else if let Some(expr) = expr.downcast_ref::() { + if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id, expr_type: Some( From 89ccc4e8da7f77b7e21732a1612b4c17904c2531 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Sun, 24 May 2026 03:05:15 +0100 Subject: [PATCH 2/7] Improve unknown column proto errors Signed-off-by: Kanishk Sachan --- .../src/expressions/unknown_column.rs | 75 +++++++++++++++++-- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index 356dcaec67cad..6b31ffe674698 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -28,8 +28,6 @@ use arrow::{ }; use datafusion_common::{Result, internal_err}; -#[cfg(feature = "proto")] -use datafusion_proto_models::protobuf; use datafusion_expr::ColumnarValue; #[derive(Debug, Clone, Eq)] @@ -92,7 +90,9 @@ impl PhysicalExpr for UnKnownColumn { fn try_to_proto( &self, _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, - ) -> Result> { + ) -> Result> { + use datafusion_proto_models::protobuf; + Ok(Some(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( @@ -108,17 +108,82 @@ impl PhysicalExpr for UnKnownColumn { impl UnKnownColumn { /// Reconstruct an [`UnKnownColumn`] from its protobuf representation. pub fn try_from_proto( - node: &protobuf::PhysicalExprNode, + node: &datafusion_proto_models::protobuf::PhysicalExprNode, _ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result> { + use datafusion_proto_models::protobuf; + let protobuf::UnknownColumn { name } = match &node.expr_type { Some(protobuf::physical_expr_node::ExprType::UnknownColumn(c)) => c, - _ => return internal_err!("PhysicalExprNode is not an UnKnownColumn"), + other => { + return internal_err!( + "PhysicalExprNode is not an UnKnownColumn (expr_id={:?}, expr_type={other:?})", + node.expr_id + ); + } }; Ok(Arc::new(UnKnownColumn::new(name))) } } +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "proto")] + use std::sync::Arc; + + #[cfg(feature = "proto")] + use arrow::datatypes::Schema; + + #[cfg(feature = "proto")] + use datafusion_common::{Result, internal_err}; + + #[cfg(feature = "proto")] + use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + + #[cfg(feature = "proto")] + struct DummyDecode; + + #[cfg(feature = "proto")] + impl datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecode + for DummyDecode + { + fn decode( + &self, + _node: &datafusion_proto_models::protobuf::PhysicalExprNode, + _schema: &Schema, + ) -> Result> { + internal_err!("decode should not be called") + } + } + + #[cfg(feature = "proto")] + #[test] + fn try_from_proto_reports_found_variant() { + use datafusion_proto_models::protobuf; + + let node = protobuf::PhysicalExprNode { + expr_id: Some(7), + expr_type: Some(protobuf::physical_expr_node::ExprType::Column( + protobuf::PhysicalColumn { + name: "col_a".to_string(), + index: 0, + }, + )), + }; + let schema = Schema::empty(); + let decoder = DummyDecode; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = UnKnownColumn::try_from_proto(&node, &ctx).unwrap_err(); + let err = err.to_string(); + + assert!(err.contains("expr_id=Some(7)")); + assert!(err.contains("PhysicalColumn")); + } +} + impl Hash for UnKnownColumn { fn hash(&self, state: &mut H) { self.name.hash(state); From 2c297df4859b0abe92f389eb19a9a3508ccc9dbb Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Sun, 24 May 2026 12:28:13 +0100 Subject: [PATCH 3/7] fix(physical-expr): move test module after impls to satisfy clippy items-after-test-module lint Closes: address clippy failure in CI Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/expressions/unknown_column.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index 6b31ffe674698..a6b5d9ecfd5fb 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -126,6 +126,20 @@ impl UnKnownColumn { } } +impl Hash for UnKnownColumn { + fn hash(&self, state: &mut H) { + self.name.hash(state); + } +} + +impl PartialEq for UnKnownColumn { + fn eq(&self, _other: &Self) -> bool { + // UnknownColumn is not a valid expression, so it should not be equal to any other expression. + // See https://github.com/apache/datafusion/pull/11536 + false + } +} + #[cfg(test)] mod tests { use super::*; @@ -183,17 +197,3 @@ mod tests { assert!(err.contains("PhysicalColumn")); } } - -impl Hash for UnKnownColumn { - fn hash(&self, state: &mut H) { - self.name.hash(state); - } -} - -impl PartialEq for UnKnownColumn { - fn eq(&self, _other: &Self) -> bool { - // UnknownColumn is not a valid expression, so it should not be equal to any other expression. - // See https://github.com/apache/datafusion/pull/11536 - false - } -} From ad46c49a9bc09822ed0d66c9c8bd964b580ecc39 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Sun, 24 May 2026 12:31:13 +0100 Subject: [PATCH 4/7] test(proto): add roundtrip test for UnKnownColumn; fix fragile float equality in binary statistics test Closes: address requested roundtrip test and stabilize failing test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../physical-expr/src/expressions/binary.rs | 20 +++++++++++++---- .../src/expressions/unknown_column.rs | 22 +++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 712f8f58f3180..f9239d68b01e0 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -4889,10 +4889,22 @@ mod tests { ); // The probability of being distinct is 1 - 1 / 16 = 15 / 16. - assert_eq!( - neq.evaluate_statistics(&[left_stat, right_stat])?, - Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))? - ); + let got = neq.evaluate_statistics(&[left_stat, right_stat])?; + let expected = Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?; + match (got, expected) { + (Bernoulli(g), Bernoulli(e)) => { + let gp = match g.p_value() { + ScalarValue::Float64(Some(v)) => v, + _ => panic!("got unexpected p type"), + }; + let ep = match e.p_value() { + ScalarValue::Float64(Some(v)) => v, + _ => panic!("expected unexpected p type"), + }; + assert!((gp - ep).abs() < 1e-12); + } + _ => panic!("unexpected distribution variants"), + } Ok(()) } diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index a6b5d9ecfd5fb..6a6cd53b4c0de 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -196,4 +196,26 @@ mod tests { assert!(err.contains("expr_id=Some(7)")); assert!(err.contains("PhysicalColumn")); } + + #[cfg(feature = "proto")] + #[test] + fn unknown_column_proto_roundtrip() { + use datafusion_proto_models::protobuf; + + let name = "col_b".to_string(); + let node = protobuf::PhysicalExprNode { + expr_id: Some(42), + expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( + protobuf::UnknownColumn { name: name.clone() }, + )), + }; + + let schema = Schema::empty(); + let decoder = DummyDecode; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = UnKnownColumn::try_from_proto(&node, &ctx).unwrap(); + let col = decoded.as_ref().downcast_ref::().unwrap(); + assert_eq!(col.name(), name.as_str()); + } } From fbe475e0ea68b4fcb147b87489a2a638f51bf94a Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Sun, 24 May 2026 14:13:25 +0100 Subject: [PATCH 5/7] style: fix rustfmt indentation in binary.rs test --- datafusion/physical-expr/src/expressions/binary.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f9239d68b01e0..26dd1ef73a8b0 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -4892,7 +4892,7 @@ mod tests { let got = neq.evaluate_statistics(&[left_stat, right_stat])?; let expected = Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?; match (got, expected) { - (Bernoulli(g), Bernoulli(e)) => { + (Bernoulli(g), Bernoulli(e)) => { let gp = match g.p_value() { ScalarValue::Float64(Some(v)) => v, _ => panic!("got unexpected p type"), From dce996433c3e265add150961b0ea686316816a70 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Mon, 25 May 2026 14:16:28 +0100 Subject: [PATCH 6/7] revert: restore original assert_eq in binary statistics test --- .../physical-expr/src/expressions/binary.rs | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 26dd1ef73a8b0..712f8f58f3180 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -4889,22 +4889,10 @@ mod tests { ); // The probability of being distinct is 1 - 1 / 16 = 15 / 16. - let got = neq.evaluate_statistics(&[left_stat, right_stat])?; - let expected = Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?; - match (got, expected) { - (Bernoulli(g), Bernoulli(e)) => { - let gp = match g.p_value() { - ScalarValue::Float64(Some(v)) => v, - _ => panic!("got unexpected p type"), - }; - let ep = match e.p_value() { - ScalarValue::Float64(Some(v)) => v, - _ => panic!("expected unexpected p type"), - }; - assert!((gp - ep).abs() < 1e-12); - } - _ => panic!("unexpected distribution variants"), - } + assert_eq!( + neq.evaluate_statistics(&[left_stat, right_stat])?, + Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))? + ); Ok(()) } From 127d0750f6171bb95c81866cae62934c7c1c91e9 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Mon, 25 May 2026 16:25:02 +0100 Subject: [PATCH 7/7] test(proto): rewrite proto tests for UnKnownColumn following like.rs pattern --- .../src/expressions/unknown_column.rs | 130 ++++++++++-------- 1 file changed, 76 insertions(+), 54 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index 6a6cd53b4c0de..e6338f98519f1 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -140,82 +140,104 @@ impl PartialEq for UnKnownColumn { } } -#[cfg(test)] -mod tests { +/// Tests for the `try_to_proto` / `try_from_proto` hooks. +#[cfg(all(test, feature = "proto"))] +mod proto_tests { use super::*; - - #[cfg(feature = "proto")] - use std::sync::Arc; - - #[cfg(feature = "proto")] + use crate::proto_test_util::{StubEncoder, UnreachableDecoder, column_node}; use arrow::datatypes::Schema; - - #[cfg(feature = "proto")] - use datafusion_common::{Result, internal_err}; - - #[cfg(feature = "proto")] + use datafusion_common::DataFusionError; use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx; + use datafusion_proto_models::protobuf::{self, physical_expr_node}; - #[cfg(feature = "proto")] - struct DummyDecode; + // ── try_to_proto ───────────────────────────────────────────────────────── - #[cfg(feature = "proto")] - impl datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecode - for DummyDecode - { - fn decode( - &self, - _node: &datafusion_proto_models::protobuf::PhysicalExprNode, - _schema: &Schema, - ) -> Result> { - internal_err!("decode should not be called") - } + #[test] + fn try_to_proto_encodes_unknown_column() { + let expr = UnKnownColumn::new("my_col"); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = expr + .try_to_proto(&ctx) + .unwrap() + .expect("UnKnownColumn should encode to Some(node)"); + + // Built-in exprs never set expr_id; only dynamic filters do. + assert!(node.expr_id.is_none()); + + // Verify the encoded name matches the original. + let protobuf::UnknownColumn { name } = match node.expr_type { + Some(physical_expr_node::ExprType::UnknownColumn(c)) => c, + other => panic!("expected UnknownColumn proto node, got {other:?}"), + }; + assert_eq!(name, "my_col"); } - #[cfg(feature = "proto")] - #[test] - fn try_from_proto_reports_found_variant() { - use datafusion_proto_models::protobuf; + // ── try_from_proto ─────────────────────────────────────────────────────── + #[test] + fn try_from_proto_decodes_name() { let node = protobuf::PhysicalExprNode { - expr_id: Some(7), - expr_type: Some(protobuf::physical_expr_node::ExprType::Column( - protobuf::PhysicalColumn { - name: "col_a".to_string(), - index: 0, + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::UnknownColumn( + protobuf::UnknownColumn { + name: "my_col".to_string(), }, )), }; let schema = Schema::empty(); - let decoder = DummyDecode; + // UnKnownColumn has no child exprs so the decoder is never called. + let decoder = UnreachableDecoder; let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); - let err = UnKnownColumn::try_from_proto(&node, &ctx).unwrap_err(); - let err = err.to_string(); + let decoded = UnKnownColumn::try_from_proto(&node, &ctx).unwrap(); + let col = decoded + .downcast_ref::() + .expect("decoded expr should be an UnKnownColumn"); + assert_eq!(col.name(), "my_col"); + } - assert!(err.contains("expr_id=Some(7)")); - assert!(err.contains("PhysicalColumn")); + #[test] + fn try_from_proto_rejects_non_unknown_column_node() { + // column_node produces an ExprType::Column node, not UnknownColumn. + let node = column_node("a"); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = UnKnownColumn::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!( + err, + DataFusionError::Internal(ref msg) + if msg.contains("PhysicalExprNode is not an UnKnownColumn") + // The error includes the actual expr_type for easier diagnosis. + && msg.contains("PhysicalColumn") + )); } - #[cfg(feature = "proto")] + // ── roundtrip ──────────────────────────────────────────────────────────── + #[test] fn unknown_column_proto_roundtrip() { - use datafusion_proto_models::protobuf; + let expr = UnKnownColumn::new("col_b"); + let encoder = StubEncoder::ok(); + let enc_ctx = PhysicalExprEncodeCtx::new(&encoder); - let name = "col_b".to_string(); - let node = protobuf::PhysicalExprNode { - expr_id: Some(42), - expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( - protobuf::UnknownColumn { name: name.clone() }, - )), - }; + let node = expr + .try_to_proto(&enc_ctx) + .unwrap() + .expect("UnKnownColumn should encode to Some(node)"); let schema = Schema::empty(); - let decoder = DummyDecode; - let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); - - let decoded = UnKnownColumn::try_from_proto(&node, &ctx).unwrap(); - let col = decoded.as_ref().downcast_ref::().unwrap(); - assert_eq!(col.name(), name.as_str()); + // UnKnownColumn has no child exprs so the decoder is never called. + let decoder = UnreachableDecoder; + let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = UnKnownColumn::try_from_proto(&node, &dec_ctx).unwrap(); + let col = decoded + .downcast_ref::() + .expect("decoded expr should be an UnKnownColumn"); + assert_eq!(col.name(), "col_b"); } }