From c5c78b24af6959ac02bcebd701b96daefc331b93 Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 12:20:58 +0530 Subject: [PATCH 1/5] refactor: add try_to_proto to HashTableLookupExpr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part of #22435. Adds try_to_proto to HashTableLookupExpr so it participates in the expression-local serialization pattern introduced in #21929. HashTableLookupExpr holds a runtime Arc that cannot be serialized, so try_to_proto replaces it with lit(true). This is safe because the filter is a performance optimisation only — lit(true) passes all rows and the join produces correct results either way. The centralized arm in to_proto.rs remains as a fallback for now. --- .../joins/hash_join/partitioned_hash_eval.rs | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 0daac0bb86a75..09a42191afa93 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -242,6 +242,8 @@ impl HashTableLookupExpr { } } + + impl std::fmt::Debug for HashTableLookupExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let cols = self @@ -289,6 +291,38 @@ impl PartialEq for HashTableLookupExpr { impl Eq for HashTableLookupExpr {} +#[cfg(feature = "proto")] +impl HashTableLookupExpr { + /// Serialize this expression to protobuf. + /// + /// `HashTableLookupExpr` holds an `Arc` (a runtime hash table built + /// on the build side) which cannot be serialized. We replace it with + /// `lit(true)`, which is safe because: + /// + /// - The filter is a performance optimisation, not a correctness requirement. + /// - `lit(true)` passes all rows so no valid rows are lost. + /// - In distributed execution the remote worker has no access to the + /// build-side hash table anyway. + pub fn try_to_proto( + &self, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> datafusion_common::Result> + { + use datafusion_proto_common::scalar_value::Value; + use datafusion_proto_common::ScalarValue; + use datafusion_proto_models::protobuf; + use datafusion_proto_models::protobuf::physical_expr_node::ExprType; + + let value = ScalarValue { + value: Some(Value::BoolValue(true)), + }; + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(ExprType::Literal(value)), + })) + } +} + impl Display for HashTableLookupExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.description) From 67bebc0d7d01f118973ea1b9f33333912af82625 Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 12:58:13 +0530 Subject: [PATCH 2/5] style: apply rustfmt --- .../src/joins/hash_join/partitioned_hash_eval.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 09a42191afa93..b99454e8971f3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -242,8 +242,6 @@ impl HashTableLookupExpr { } } - - impl std::fmt::Debug for HashTableLookupExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let cols = self @@ -306,10 +304,11 @@ impl HashTableLookupExpr { pub fn try_to_proto( &self, _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, - ) -> datafusion_common::Result> - { - use datafusion_proto_common::scalar_value::Value; + ) -> datafusion_common::Result< + Option, + > { use datafusion_proto_common::ScalarValue; + use datafusion_proto_common::scalar_value::Value; use datafusion_proto_models::protobuf; use datafusion_proto_models::protobuf::physical_expr_node::ExprType; From d24abe87c1fb19a5a081eded717b94576031a067 Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 19:01:02 +0530 Subject: [PATCH 3/5] fix: remove invalid cfg(feature = "proto") in HashTableLookupExpr --- .../src/joins/hash_join/partitioned_hash_eval.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index b99454e8971f3..39b9fa2c110df 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -289,7 +289,7 @@ impl PartialEq for HashTableLookupExpr { impl Eq for HashTableLookupExpr {} -#[cfg(feature = "proto")] + impl HashTableLookupExpr { /// Serialize this expression to protobuf. /// @@ -304,11 +304,10 @@ impl HashTableLookupExpr { pub fn try_to_proto( &self, _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, - ) -> datafusion_common::Result< - Option, - > { - use datafusion_proto_common::ScalarValue; + ) -> datafusion_common::Result> + { use datafusion_proto_common::scalar_value::Value; + use datafusion_proto_common::ScalarValue; use datafusion_proto_models::protobuf; use datafusion_proto_models::protobuf::physical_expr_node::ExprType; From 74f61fe1cbb33ecfebdadda34a883b283c35dc67 Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 19:09:20 +0530 Subject: [PATCH 4/5] fix: restore cfg(feature = proto) guard and apply cargo fmt --- .../src/joins/hash_join/partitioned_hash_eval.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 39b9fa2c110df..3cb27fbcd21ff 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -289,6 +289,7 @@ impl PartialEq for HashTableLookupExpr { impl Eq for HashTableLookupExpr {} +#[cfg(feature = "proto")] impl HashTableLookupExpr { /// Serialize this expression to protobuf. @@ -304,10 +305,11 @@ impl HashTableLookupExpr { pub fn try_to_proto( &self, _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, - ) -> datafusion_common::Result> - { - use datafusion_proto_common::scalar_value::Value; + ) -> datafusion_common::Result< + Option, + > { use datafusion_proto_common::ScalarValue; + use datafusion_proto_common::scalar_value::Value; use datafusion_proto_models::protobuf; use datafusion_proto_models::protobuf::physical_expr_node::ExprType; From 63d14ff105463dffa7970edff2ac5f7e3668f650 Mon Sep 17 00:00:00 2001 From: Anurag Tryambak Raut Date: Fri, 22 May 2026 21:12:52 +0530 Subject: [PATCH 5/5] fix: remove invalid proto cfg gate from HashTableLookupExpr --- .../physical-plan/src/joins/hash_join/partitioned_hash_eval.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 3cb27fbcd21ff..85a79a821f5d4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -289,8 +289,6 @@ impl PartialEq for HashTableLookupExpr { impl Eq for HashTableLookupExpr {} -#[cfg(feature = "proto")] - impl HashTableLookupExpr { /// Serialize this expression to protobuf. ///