Skip to content

Commit eb3c564

Browse files
AnuragRaut08Anurag Tryambak Raut
andauthored
refactor: add try_to_proto to HashTableLookupExpr (#22451)
## Which issue does this PR close? Part of #22435 ## What changes are included? Adds `try_to_proto` to `HashTableLookupExpr` so it participates in the expression-local serialization pattern introduced in #21929. `HashTableLookupExpr` holds a runtime `Arc<Map>` that cannot be serialized, so `try_to_proto` replaces it with `lit(true)`. This is safe because the filter is a performance optimisation only — `lit(true)` passes all rows and the join produces correct results either way. The centralized arm in `to_proto.rs` remains as a fallback for now. Cleanup can follow in a separate PR once this lands. ## Are these changes tested? Yes — covered by the existing `roundtrip_hash_table_lookup_expr_to_lit` test in `datafusion/proto/tests/cases/roundtrip_physical_plan.rs`. ## Are there any user-facing changes? No. --------- Co-authored-by: Anurag Tryambak Raut <anuragtryambakraut@Anurags-MacBook-Air.local>
1 parent e4e8f23 commit eb3c564

5 files changed

Lines changed: 43 additions & 30 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-plan/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ force_hash_collisions = []
4242
test_utils = ["arrow/test_utils"]
4343
tokio_coop = []
4444
tokio_coop_fallback = []
45+
proto = [
46+
"dep:datafusion-proto-models",
47+
"dep:datafusion-proto-common",
48+
"datafusion-physical-expr-common/proto",
49+
]
4550

4651
[lib]
4752
name = "datafusion_physical_plan"
@@ -65,6 +70,8 @@ datafusion-functions-aggregate-common = { workspace = true }
6570
datafusion-functions-window-common = { workspace = true }
6671
datafusion-physical-expr = { workspace = true, default-features = true }
6772
datafusion-physical-expr-common = { workspace = true }
73+
datafusion-proto-common = { workspace = true, optional = true }
74+
datafusion-proto-models = { workspace = true, optional = true }
6875
futures = { workspace = true }
6976
half = { workspace = true }
7077
hashbrown = { workspace = true }

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ pub struct HashTableLookupExpr {
215215
/// Description for display
216216
description: String,
217217
}
218-
219218
impl HashTableLookupExpr {
220219
/// Create a new HashTableLookupExpr
221220
///
@@ -241,7 +240,6 @@ impl HashTableLookupExpr {
241240
}
242241
}
243242
}
244-
245243
impl std::fmt::Debug for HashTableLookupExpr {
246244
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247245
let cols = self
@@ -337,7 +335,38 @@ impl PhysicalExpr for HashTableLookupExpr {
337335
}
338336
}
339337
}
340-
338+
#[cfg(feature = "proto")]
339+
fn try_to_proto(
340+
&self,
341+
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
342+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
343+
use datafusion_proto_models::protobuf;
344+
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
345+
346+
// HashTableLookupExpr holds a runtime Arc<Map> (the build-side hash
347+
// table) that cannot be serialized, so it is replaced with lit(true).
348+
//
349+
// Dynamic filtering is a performance optimisation only — replacing the
350+
// lookup with lit(true) preserves correctness by allowing all rows
351+
// through.
352+
//
353+
// If a plan is serialized before execution, HashTableLookupExpr is not
354+
// yet present in the dynamic filter expression.
355+
//
356+
// If a plan is serialized after execution, any runtime-created
357+
// HashTableLookupExpr is replaced during serialization. Re-executing
358+
// the plan requires reset_state(), after which HashJoinExec rebuilds
359+
// fresh dynamic filters at runtime.
360+
let value = datafusion_proto_common::ScalarValue {
361+
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
362+
true,
363+
)),
364+
};
365+
Ok(Some(protobuf::PhysicalExprNode {
366+
expr_id: None,
367+
expr_type: Some(ExprType::Literal(value)),
368+
}))
369+
}
341370
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342371
write!(f, "{}", self.description)
343372
}

datafusion/proto/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ datafusion-expr = { workspace = true }
6565
datafusion-functions-table = { workspace = true }
6666
datafusion-physical-expr = { workspace = true, features = ["proto"] }
6767
datafusion-physical-expr-common = { workspace = true, features = ["proto"] }
68-
datafusion-physical-plan = { workspace = true }
68+
datafusion-physical-plan = { workspace = true, features = ["proto"] }
6969
datafusion-proto-common = { workspace = true }
7070
datafusion-proto-models = { workspace = true }
7171
object_store = { workspace = true }

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_physical_plan::expressions::{
3939
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal,
4040
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4141
};
42-
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
42+
use datafusion_physical_plan::joins::HashExpr;
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
4444
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
4545
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
@@ -302,31 +302,6 @@ pub fn serialize_physical_expr_with_converter(
302302
return Ok(node);
303303
}
304304

305-
// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
306-
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
307-
// cannot be serialized - the hash table is a runtime structure built during
308-
// execution on the build side.
309-
//
310-
// We replace it with lit(true) which is safe because:
311-
// 1. The filter is a performance optimization, not a correctness requirement
312-
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
313-
// 3. The join itself will still produce correct results, just without the
314-
// benefit of early filtering on the probe side
315-
//
316-
// In distributed execution, the remote worker won't have access to the hash
317-
// table anyway, so the best we can do is skip this optimization.
318-
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
319-
let value = datafusion_proto_common::ScalarValue {
320-
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
321-
true,
322-
)),
323-
};
324-
return Ok(protobuf::PhysicalExprNode {
325-
expr_id,
326-
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
327-
});
328-
}
329-
330305
if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
331306
Ok(protobuf::PhysicalExprNode {
332307
expr_id,

0 commit comments

Comments
 (0)