Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions datafusion/physical-expr/src/expressions/unknown_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{Result, internal_err};

use datafusion_expr::ColumnarValue;

#[derive(Debug, Clone, Eq)]
Expand Down Expand Up @@ -84,6 +85,45 @@ impl PhysicalExpr for UnKnownColumn {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
}

#[cfg(feature = "proto")]
fn try_to_proto(
&self,
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
use datafusion_proto_models::protobuf;

Ok(Some(protobuf::PhysicalExprNode {
expr_id: None,
expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
protobuf::UnknownColumn {
name: self.name.clone(),
},
)),
}))
Comment on lines +96 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the previous inlined serialization path in to_proto.rs populated expr_id from the serializer’s expr_id

I believe this variable was an option and was None since only dynamic filters set expr_id

}
}

#[cfg(feature = "proto")]
impl UnKnownColumn {
/// Reconstruct an [`UnKnownColumn`] from its protobuf representation.
pub fn try_from_proto(
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
_ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
) -> Result<Arc<dyn PhysicalExpr>> {
use datafusion_proto_models::protobuf;

let protobuf::UnknownColumn { name } = match &node.expr_type {
Some(protobuf::physical_expr_node::ExprType::UnknownColumn(c)) => c,
other => {
return internal_err!(
"PhysicalExprNode is not an UnKnownColumn (expr_id={:?}, expr_type={other:?})",
node.expr_id
);
}
};
Comment on lines +116 to +124
Ok(Arc::new(UnKnownColumn::new(name)))
}
}

impl Hash for UnKnownColumn {
Expand All @@ -99,3 +139,105 @@ impl PartialEq for UnKnownColumn {
false
}
}

/// Tests for the `try_to_proto` / `try_from_proto` hooks.
#[cfg(all(test, feature = "proto"))]
mod proto_tests {
use super::*;
use crate::proto_test_util::{StubEncoder, UnreachableDecoder, column_node};
use arrow::datatypes::Schema;
use datafusion_common::DataFusionError;
use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx;
use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx;
use datafusion_proto_models::protobuf::{self, physical_expr_node};

// ── try_to_proto ─────────────────────────────────────────────────────────

#[test]
fn try_to_proto_encodes_unknown_column() {
let expr = UnKnownColumn::new("my_col");
let encoder = StubEncoder::ok();
let ctx = PhysicalExprEncodeCtx::new(&encoder);

let node = expr
.try_to_proto(&ctx)
.unwrap()
.expect("UnKnownColumn should encode to Some(node)");

// Built-in exprs never set expr_id; only dynamic filters do.
assert!(node.expr_id.is_none());

// Verify the encoded name matches the original.
let protobuf::UnknownColumn { name } = match node.expr_type {
Some(physical_expr_node::ExprType::UnknownColumn(c)) => c,
other => panic!("expected UnknownColumn proto node, got {other:?}"),
};
assert_eq!(name, "my_col");
}

// ── try_from_proto ───────────────────────────────────────────────────────

#[test]
fn try_from_proto_decodes_name() {
let node = protobuf::PhysicalExprNode {
expr_id: None,
expr_type: Some(physical_expr_node::ExprType::UnknownColumn(
protobuf::UnknownColumn {
name: "my_col".to_string(),
},
)),
};
let schema = Schema::empty();
// UnKnownColumn has no child exprs so the decoder is never called.
let decoder = UnreachableDecoder;
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);

let decoded = UnKnownColumn::try_from_proto(&node, &ctx).unwrap();
let col = decoded
.downcast_ref::<UnKnownColumn>()
.expect("decoded expr should be an UnKnownColumn");
assert_eq!(col.name(), "my_col");
}

#[test]
fn try_from_proto_rejects_non_unknown_column_node() {
// column_node produces an ExprType::Column node, not UnknownColumn.
let node = column_node("a");
let schema = Schema::empty();
let decoder = UnreachableDecoder;
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
let err = UnKnownColumn::try_from_proto(&node, &ctx).unwrap_err();
assert!(matches!(
err,
DataFusionError::Internal(ref msg)
if msg.contains("PhysicalExprNode is not an UnKnownColumn")
// The error includes the actual expr_type for easier diagnosis.
&& msg.contains("PhysicalColumn")
));
}

// ── roundtrip ────────────────────────────────────────────────────────────

#[test]
fn unknown_column_proto_roundtrip() {
let expr = UnKnownColumn::new("col_b");
let encoder = StubEncoder::ok();
let enc_ctx = PhysicalExprEncodeCtx::new(&encoder);

let node = expr
.try_to_proto(&enc_ctx)
.unwrap()
.expect("UnKnownColumn should encode to Some(node)");

let schema = Schema::empty();
// UnKnownColumn has no child exprs so the decoder is never called.
let decoder = UnreachableDecoder;
let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);

let decoded = UnKnownColumn::try_from_proto(&node, &dec_ctx).unwrap();
let col = decoded
.downcast_ref::<UnKnownColumn>()
.expect("decoded expr should be an UnKnownColumn");
assert_eq!(col.name(), "col_b");
}
}
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub fn parse_physical_expr_with_converter(
// their own `ExprType` variant — see #21835. This match only routes
// to the right constructor.
ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?,
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
ExprType::UnknownColumn(_) => UnKnownColumn::try_from_proto(proto, &decode_ctx)?,
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?,
ExprType::AggregateExpr(_) => {
Expand Down
13 changes: 2 additions & 11 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_plan::expressions::{
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal,
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
NegativeExpr, NotExpr, TryCastExpr,
};
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
Expand Down Expand Up @@ -327,16 +327,7 @@ pub fn serialize_physical_expr_with_converter(
});
}

if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
Ok(protobuf::PhysicalExprNode {
expr_id,
expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
protobuf::UnknownColumn {
name: expr.name().to_string(),
},
)),
})
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_id,
expr_type: Some(
Expand Down