Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ force_hash_collisions = []
test_utils = ["arrow/test_utils"]
tokio_coop = []
tokio_coop_fallback = []
proto = [
"dep:datafusion-proto-models",
"dep:datafusion-proto-common",
"datafusion-physical-expr-common/proto",
]

[lib]
name = "datafusion_physical_plan"
Expand All @@ -65,6 +70,8 @@ datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-proto-common = { workspace = true, optional = true }
datafusion-proto-models = { workspace = true, optional = true }
futures = { workspace = true }
half = { workspace = true }
hashbrown = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ pub struct HashTableLookupExpr {
/// Description for display
description: String,
}

impl HashTableLookupExpr {
/// Create a new HashTableLookupExpr
///
Expand All @@ -241,7 +240,6 @@ impl HashTableLookupExpr {
}
}
}

impl std::fmt::Debug for HashTableLookupExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let cols = self
Expand Down Expand Up @@ -337,7 +335,31 @@ impl PhysicalExpr for HashTableLookupExpr {
}
}
}

#[cfg(feature = "proto")]
fn try_to_proto(
&self,
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
use datafusion_proto_models::protobuf;
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;

// HashTableLookupExpr holds a runtime Arc<Map> (the build-side hash
// table) that cannot be serialized. We replace it with lit(true).
//
// This is safe because dynamic filtering is a performance optimisation
// only — lit(true) passes all rows so correctness is preserved.
// When a serialized plan is re-executed, HashJoinExec reconstructs
// fresh dynamic filters at runtime anyway.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is helpful and explains why lit(true) is correctness-safe. One small optional tweak would be to mirror Jay's pre-execution and post-execution wording a bit more directly, and maybe mention reset_state since that is where the runtime reconstruction behavior is tied together.

Not blocking from my side, just a readability suggestion.

let value = datafusion_proto_common::ScalarValue {
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
true,
)),
};
Ok(Some(protobuf::PhysicalExprNode {
expr_id: None,
expr_type: Some(ExprType::Literal(value)),
}))
}
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ datafusion-expr = { workspace = true }
datafusion-functions-table = { workspace = true }
datafusion-physical-expr = { workspace = true, features = ["proto"] }
datafusion-physical-expr-common = { workspace = true, features = ["proto"] }
datafusion-physical-plan = { workspace = true }
datafusion-physical-plan = { workspace = true, features = ["proto"] }
datafusion-proto-common = { workspace = true }
datafusion-proto-models = { workspace = true }
object_store = { workspace = true }
Expand Down
27 changes: 1 addition & 26 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion_physical_plan::expressions::{
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr,
LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
};
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
use datafusion_physical_plan::joins::HashExpr;
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
Expand Down Expand Up @@ -302,31 +302,6 @@ pub fn serialize_physical_expr_with_converter(
return Ok(node);
}

// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
// cannot be serialized - the hash table is a runtime structure built during
// execution on the build side.
//
// We replace it with lit(true) which is safe because:
// 1. The filter is a performance optimization, not a correctness requirement
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
// 3. The join itself will still produce correct results, just without the
// benefit of early filtering on the probe side
//
// In distributed execution, the remote worker won't have access to the hash
// table anyway, so the best we can do is skip this optimization.
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
let value = datafusion_proto_common::ScalarValue {
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
true,
)),
};
return Ok(protobuf::PhysicalExprNode {
expr_id,
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
});
}

if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
Ok(protobuf::PhysicalExprNode {
expr_id,
Expand Down
Loading