Skip to content

Commit 8ce71f5

Browse files
Anurag Tryambak RautAnurag Tryambak Raut
authored andcommitted
refactor: move try_to_proto into PhysicalExpr impl for HashTableLookupExpr
1 parent 63d14ff commit 8ce71f5

5 files changed

Lines changed: 35 additions & 61 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-plan/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ force_hash_collisions = []
4242
test_utils = ["arrow/test_utils"]
4343
tokio_coop = []
4444
tokio_coop_fallback = []
45+
proto = [
46+
"dep:datafusion-proto-models",
47+
"dep:datafusion-proto-common",
48+
"datafusion-physical-expr-common/proto",
49+
]
4550

4651
[lib]
4752
name = "datafusion_physical_plan"
@@ -65,6 +70,8 @@ datafusion-functions-aggregate-common = { workspace = true }
6570
datafusion-functions-window-common = { workspace = true }
6671
datafusion-physical-expr = { workspace = true, default-features = true }
6772
datafusion-physical-expr-common = { workspace = true }
73+
datafusion-proto-common = { workspace = true, optional = true }
74+
datafusion-proto-models = { workspace = true, optional = true }
6875
futures = { workspace = true }
6976
half = { workspace = true }
7077
hashbrown = { workspace = true }

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ pub struct HashTableLookupExpr {
215215
/// Description for display
216216
description: String,
217217
}
218-
219218
impl HashTableLookupExpr {
220219
/// Create a new HashTableLookupExpr
221220
///
@@ -241,7 +240,6 @@ impl HashTableLookupExpr {
241240
}
242241
}
243242
}
244-
245243
impl std::fmt::Debug for HashTableLookupExpr {
246244
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247245
let cols = self
@@ -289,38 +287,6 @@ impl PartialEq for HashTableLookupExpr {
289287

290288
impl Eq for HashTableLookupExpr {}
291289

292-
impl HashTableLookupExpr {
293-
/// Serialize this expression to protobuf.
294-
///
295-
/// `HashTableLookupExpr` holds an `Arc<Map>` (a runtime hash table built
296-
/// on the build side) which cannot be serialized. We replace it with
297-
/// `lit(true)`, which is safe because:
298-
///
299-
/// - The filter is a performance optimisation, not a correctness requirement.
300-
/// - `lit(true)` passes all rows so no valid rows are lost.
301-
/// - In distributed execution the remote worker has no access to the
302-
/// build-side hash table anyway.
303-
pub fn try_to_proto(
304-
&self,
305-
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
306-
) -> datafusion_common::Result<
307-
Option<datafusion_proto_models::protobuf::PhysicalExprNode>,
308-
> {
309-
use datafusion_proto_common::ScalarValue;
310-
use datafusion_proto_common::scalar_value::Value;
311-
use datafusion_proto_models::protobuf;
312-
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
313-
314-
let value = ScalarValue {
315-
value: Some(Value::BoolValue(true)),
316-
};
317-
Ok(Some(protobuf::PhysicalExprNode {
318-
expr_id: None,
319-
expr_type: Some(ExprType::Literal(value)),
320-
}))
321-
}
322-
}
323-
324290
impl Display for HashTableLookupExpr {
325291
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326292
write!(f, "{}", self.description)
@@ -369,7 +335,31 @@ impl PhysicalExpr for HashTableLookupExpr {
369335
}
370336
}
371337
}
338+
#[cfg(feature = "proto")]
339+
fn try_to_proto(
340+
&self,
341+
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
342+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
343+
use datafusion_proto_models::protobuf;
344+
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
372345

346+
// HashTableLookupExpr holds a runtime Arc<Map> (the build-side hash
347+
// table) that cannot be serialized. We replace it with lit(true).
348+
//
349+
// This is safe because dynamic filtering is a performance optimisation
350+
// only — lit(true) passes all rows so correctness is preserved.
351+
// When a serialized plan is re-executed, HashJoinExec reconstructs
352+
// fresh dynamic filters at runtime anyway.
353+
let value = datafusion_proto_common::ScalarValue {
354+
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
355+
true,
356+
)),
357+
};
358+
Ok(Some(protobuf::PhysicalExprNode {
359+
expr_id: None,
360+
expr_type: Some(ExprType::Literal(value)),
361+
}))
362+
}
373363
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374364
write!(f, "{}", self.description)
375365
}

datafusion/proto/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ datafusion-expr = { workspace = true }
6565
datafusion-functions-table = { workspace = true }
6666
datafusion-physical-expr = { workspace = true, features = ["proto"] }
6767
datafusion-physical-expr-common = { workspace = true, features = ["proto"] }
68-
datafusion-physical-plan = { workspace = true }
68+
datafusion-physical-plan = { workspace = true, features = ["proto"] }
6969
datafusion-proto-common = { workspace = true }
7070
datafusion-proto-models = { workspace = true }
7171
object_store = { workspace = true }

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_physical_plan::expressions::{
3939
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr,
4040
LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4141
};
42-
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
42+
use datafusion_physical_plan::joins::HashExpr;
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
4444
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
4545
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
@@ -302,31 +302,6 @@ pub fn serialize_physical_expr_with_converter(
302302
return Ok(node);
303303
}
304304

305-
// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
306-
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
307-
// cannot be serialized - the hash table is a runtime structure built during
308-
// execution on the build side.
309-
//
310-
// We replace it with lit(true) which is safe because:
311-
// 1. The filter is a performance optimization, not a correctness requirement
312-
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
313-
// 3. The join itself will still produce correct results, just without the
314-
// benefit of early filtering on the probe side
315-
//
316-
// In distributed execution, the remote worker won't have access to the hash
317-
// table anyway, so the best we can do is skip this optimization.
318-
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
319-
let value = datafusion_proto_common::ScalarValue {
320-
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
321-
true,
322-
)),
323-
};
324-
return Ok(protobuf::PhysicalExprNode {
325-
expr_id,
326-
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
327-
});
328-
}
329-
330305
if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
331306
Ok(protobuf::PhysicalExprNode {
332307
expr_id,

0 commit comments

Comments
 (0)