Skip to content

Commit 857eb4a

Browse files
Migrate UnKnownColumn proto hooks (#22464)
## Which issue does this PR close? - Closes #22420. ## Rationale for this change This issue is part of the migration to the new proto hooks ( ry_to_proto / ry_from_proto). UnKnownColumn was still handled in the legacy match arms, so this ports it to the hook-based path for consistency with other migrated expressions (e.g. Column, BinaryExpr). ## What changes are included in this PR? - Add ry_to_proto / ry_from_proto implementations for UnKnownColumn in physical-expr. - Remove the legacy UnKnownColumn match arms in datafusion/proto physical plan conversion. ## Are these changes tested? - cargo test -p datafusion-proto --lib ## Are there any user-facing changes? No user-facing changes. --------- Signed-off-by: Kanishk Sachan <koopatroopa787> Co-authored-by: Kanishk Sachan <koopatroopa787> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d5643ae commit 857eb4a

3 files changed

Lines changed: 139 additions & 12 deletions

File tree

datafusion/physical-expr/src/expressions/unknown_column.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use arrow::{
2727
record_batch::RecordBatch,
2828
};
2929
use datafusion_common::{Result, internal_err};
30+
3031
use datafusion_expr::ColumnarValue;
3132

3233
#[derive(Debug, Clone, Eq)]
@@ -84,6 +85,42 @@ impl PhysicalExpr for UnKnownColumn {
8485
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8586
std::fmt::Display::fmt(self, f)
8687
}
88+
89+
#[cfg(feature = "proto")]
90+
fn try_to_proto(
91+
&self,
92+
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
93+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
94+
use datafusion_proto_models::protobuf;
95+
96+
Ok(Some(protobuf::PhysicalExprNode {
97+
expr_id: None,
98+
expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
99+
protobuf::UnknownColumn {
100+
name: self.name.clone(),
101+
},
102+
)),
103+
}))
104+
}
105+
}
106+
107+
#[cfg(feature = "proto")]
108+
impl UnKnownColumn {
109+
/// Reconstruct an [`UnKnownColumn`] from its protobuf representation.
110+
pub fn try_from_proto(
111+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
112+
_ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
113+
) -> Result<Arc<dyn PhysicalExpr>> {
114+
use datafusion_physical_expr_common::expect_expr_variant;
115+
use datafusion_proto_models::protobuf;
116+
117+
let unknown_col = expect_expr_variant!(
118+
node,
119+
protobuf::physical_expr_node::ExprType::UnknownColumn,
120+
"UnKnownColumn",
121+
);
122+
Ok(Arc::new(UnKnownColumn::new(&unknown_col.name)))
123+
}
87124
}
88125

89126
impl Hash for UnKnownColumn {
@@ -99,3 +136,103 @@ impl PartialEq for UnKnownColumn {
99136
false
100137
}
101138
}
139+
140+
/// Tests for the `try_to_proto` / `try_from_proto` hooks.
141+
#[cfg(all(test, feature = "proto"))]
142+
mod proto_tests {
143+
use super::*;
144+
use crate::proto_test_util::{StubEncoder, UnreachableDecoder, column_node};
145+
use arrow::datatypes::Schema;
146+
use datafusion_common::DataFusionError;
147+
use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
148+
use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
149+
use datafusion_proto_models::protobuf::{self, physical_expr_node};
150+
151+
// ── try_to_proto ─────────────────────────────────────────────────────────
152+
153+
#[test]
154+
fn try_to_proto_encodes_unknown_column() {
155+
let expr = UnKnownColumn::new("my_col");
156+
let encoder = StubEncoder::ok();
157+
let ctx = PhysicalExprEncodeCtx::new(&encoder);
158+
159+
let node = expr
160+
.try_to_proto(&ctx)
161+
.unwrap()
162+
.expect("UnKnownColumn should encode to Some(node)");
163+
164+
// Built-in exprs never set expr_id; only dynamic filters do.
165+
assert!(node.expr_id.is_none());
166+
167+
// Verify the encoded name matches the original.
168+
let protobuf::UnknownColumn { name } = match node.expr_type {
169+
Some(physical_expr_node::ExprType::UnknownColumn(c)) => c,
170+
other => panic!("expected UnknownColumn proto node, got {other:?}"),
171+
};
172+
assert_eq!(name, "my_col");
173+
}
174+
175+
// ── try_from_proto ───────────────────────────────────────────────────────
176+
177+
#[test]
178+
fn try_from_proto_decodes_name() {
179+
let node = protobuf::PhysicalExprNode {
180+
expr_id: None,
181+
expr_type: Some(physical_expr_node::ExprType::UnknownColumn(
182+
protobuf::UnknownColumn {
183+
name: "my_col".to_string(),
184+
},
185+
)),
186+
};
187+
let schema = Schema::empty();
188+
// UnKnownColumn has no child exprs so the decoder is never called.
189+
let decoder = UnreachableDecoder;
190+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
191+
192+
let decoded = UnKnownColumn::try_from_proto(&node, &ctx).unwrap();
193+
let col = decoded
194+
.downcast_ref::<UnKnownColumn>()
195+
.expect("decoded expr should be an UnKnownColumn");
196+
assert_eq!(col.name(), "my_col");
197+
}
198+
199+
#[test]
200+
fn try_from_proto_rejects_non_unknown_column_node() {
201+
// column_node produces an ExprType::Column node, not UnknownColumn.
202+
let node = column_node("a");
203+
let schema = Schema::empty();
204+
let decoder = UnreachableDecoder;
205+
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
206+
let err = UnKnownColumn::try_from_proto(&node, &ctx).unwrap_err();
207+
assert!(matches!(
208+
err,
209+
DataFusionError::Internal(ref msg)
210+
if msg.contains("PhysicalExprNode is not a UnKnownColumn")
211+
));
212+
}
213+
214+
// ── roundtrip ────────────────────────────────────────────────────────────
215+
216+
#[test]
217+
fn unknown_column_proto_roundtrip() {
218+
let expr = UnKnownColumn::new("col_b");
219+
let encoder = StubEncoder::ok();
220+
let enc_ctx = PhysicalExprEncodeCtx::new(&encoder);
221+
222+
let node = expr
223+
.try_to_proto(&enc_ctx)
224+
.unwrap()
225+
.expect("UnKnownColumn should encode to Some(node)");
226+
227+
let schema = Schema::empty();
228+
// UnKnownColumn has no child exprs so the decoder is never called.
229+
let decoder = UnreachableDecoder;
230+
let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
231+
232+
let decoded = UnKnownColumn::try_from_proto(&node, &dec_ctx).unwrap();
233+
let col = decoded
234+
.downcast_ref::<UnKnownColumn>()
235+
.expect("decoded expr should be an UnKnownColumn");
236+
assert_eq!(col.name(), "col_b");
237+
}
238+
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ pub fn parse_physical_expr_with_converter(
282282
// their own `ExprType` variant — see #21835. This match only routes
283283
// to the right constructor.
284284
ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?,
285-
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
285+
ExprType::UnknownColumn(_) => UnKnownColumn::try_from_proto(proto, &decode_ctx)?,
286286
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
287287
ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?,
288288
ExprType::AggregateExpr(_) => {

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
3939
CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, TryCastExpr,
40-
UnKnownColumn,
4140
};
4241
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
4342
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
@@ -303,16 +302,7 @@ pub fn serialize_physical_expr_with_converter(
303302
return Ok(node);
304303
}
305304

306-
if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
307-
Ok(protobuf::PhysicalExprNode {
308-
expr_id,
309-
expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
310-
protobuf::UnknownColumn {
311-
name: expr.name().to_string(),
312-
},
313-
)),
314-
})
315-
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
305+
if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
316306
Ok(protobuf::PhysicalExprNode {
317307
expr_id,
318308
expr_type: Some(

0 commit comments

Comments
 (0)