Skip to content

Commit a7a3c63

Browse files
adriangbclaude
andcommitted
chore: deprecate HashExpr; mirror lit(true) proto serde for MultiMapLookupExpr
HashExpr was only used by the CASE-routing dynamic filter that 82f5ee5 removed. Nothing in the workspace constructs one anymore. Mark it `#[deprecated(since = "54.0.0", ...)]` per the API health guidelines, pointing users at HashTableLookupExpr / MultiMapLookupExpr. Keep the struct, proto wire format, re-exports, and unit tests intact so existing serialized plans and downstream consumers still work during the deprecation window; silence the resulting warnings with `#[expect(deprecated)]` at the impl blocks, re-exports, proto serializer/deserializer, and roundtrip test. MultiMapLookupExpr holds Arc<Map>s in exactly the same way HashTableLookupExpr does — those build-side hash tables can't cross the wire. Extend the existing lit(true) short-circuit in `serialize_physical_expr_with_converter` to cover MultiMapLookupExpr, update the explanatory comment, re-export it from joins/{mod, hash_join::mod}, and add a `roundtrip_multi_map_lookup_expr_to_lit` test mirroring the existing HashTableLookupExpr one. Refs: apache#21931 (comment) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e5b92ce commit a7a3c63

6 files changed

Lines changed: 85 additions & 10 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
//! [`HashJoinExec`] Partitioned Hash Join Operator
1919
2020
pub use exec::{HashJoinExec, HashJoinExecBuilder};
21-
pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState};
21+
#[expect(deprecated)]
22+
pub use partitioned_hash_eval::HashExpr;
23+
pub use partitioned_hash_eval::{
24+
HashTableLookupExpr, MultiMapLookupExpr, SeededRandomState,
25+
};
2226

2327
mod exec;
2428
mod inlist_builder;

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ impl SeededRandomState {
7272
/// This is used for:
7373
/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
7474
/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
75+
#[deprecated(
76+
since = "54.0.0",
77+
note = "Hash-join dynamic filters no longer route rows via a per-row hash + CASE. Use HashTableLookupExpr for single-map membership probes or MultiMapLookupExpr to OR across several maps in one hashing pass."
78+
)]
7579
pub struct HashExpr {
7680
/// Columns to hash
7781
on_columns: Vec<PhysicalExprRef>,
@@ -81,6 +85,7 @@ pub struct HashExpr {
8185
description: String,
8286
}
8387

88+
#[expect(deprecated)]
8489
impl HashExpr {
8590
/// Create a new HashExpr
8691
///
@@ -116,6 +121,7 @@ impl HashExpr {
116121
}
117122
}
118123

124+
#[expect(deprecated)]
119125
impl std::fmt::Debug for HashExpr {
120126
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121127
let cols = self
@@ -129,6 +135,7 @@ impl std::fmt::Debug for HashExpr {
129135
}
130136
}
131137

138+
#[expect(deprecated)]
132139
impl Hash for HashExpr {
133140
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
134141
self.on_columns.dyn_hash(state);
@@ -137,6 +144,7 @@ impl Hash for HashExpr {
137144
}
138145
}
139146

147+
#[expect(deprecated)]
140148
impl PartialEq for HashExpr {
141149
fn eq(&self, other: &Self) -> bool {
142150
self.on_columns == other.on_columns
@@ -145,14 +153,17 @@ impl PartialEq for HashExpr {
145153
}
146154
}
147155

156+
#[expect(deprecated)]
148157
impl Eq for HashExpr {}
149158

159+
#[expect(deprecated)]
150160
impl Display for HashExpr {
151161
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152162
write!(f, "{}", self.description)
153163
}
154164
}
155165

166+
#[expect(deprecated)]
156167
impl PhysicalExpr for HashExpr {
157168
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
158169
self.on_columns.iter().collect()
@@ -546,6 +557,7 @@ fn evaluate_columns(
546557
}
547558

548559
#[cfg(test)]
560+
#[expect(deprecated)]
549561
mod tests {
550562
use super::*;
551563
use crate::joins::join_hash_map::JoinHashMapU32;

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
use arrow::array::BooleanBufferBuilder;
2121
pub use cross_join::CrossJoinExec;
2222
use datafusion_physical_expr::PhysicalExprRef;
23+
#[expect(deprecated)]
24+
pub use hash_join::HashExpr;
2325
pub use hash_join::{
24-
HashExpr, HashJoinExec, HashJoinExecBuilder, HashTableLookupExpr, SeededRandomState,
26+
HashJoinExec, HashJoinExecBuilder, HashTableLookupExpr, MultiMapLookupExpr,
27+
SeededRandomState,
2528
};
2629
pub use nested_loop_join::{NestedLoopJoinExec, NestedLoopJoinExecBuilder};
2730
use parking_lot::Mutex;

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ use datafusion_physical_plan::expressions::{
4646
BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
4747
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
4848
};
49-
use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
49+
#[expect(deprecated)]
50+
use datafusion_physical_plan::joins::HashExpr;
51+
use datafusion_physical_plan::joins::SeededRandomState;
5052
use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field};
5153
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
5254
use datafusion_proto_common::common::proto_error;
@@ -255,6 +257,7 @@ pub fn parse_physical_expr(
255257
/// any scoped state needed during recursive deserialization.
256258
/// * `proto_converter` - Converter hooks used for recursive physical plan and
257259
/// expression deserialization.
260+
#[expect(deprecated)] // HashExpr branch — kept for proto wire compatibility.
258261
pub fn parse_physical_expr_with_converter(
259262
proto: &protobuf::PhysicalExprNode,
260263
input_schema: &Schema,

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use datafusion_physical_plan::expressions::{
4040
IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr,
4141
UnKnownColumn,
4242
};
43-
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
43+
#[expect(deprecated)]
44+
use datafusion_physical_plan::joins::HashExpr;
45+
use datafusion_physical_plan::joins::{HashTableLookupExpr, MultiMapLookupExpr};
4446
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
4547
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
4648
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
@@ -253,27 +255,30 @@ pub fn serialize_physical_expr(
253255
/// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`]).
254256
/// A [`PhysicalProtoConverterExtension`] can be provided to handle the
255257
/// conversion process (see [`PhysicalProtoConverterExtension::physical_expr_to_proto`]).
258+
#[expect(deprecated)] // HashExpr branch — kept for proto wire compatibility.
256259
pub fn serialize_physical_expr_with_converter(
257260
value: &Arc<dyn PhysicalExpr>,
258261
codec: &dyn PhysicalExtensionCodec,
259262
proto_converter: &dyn PhysicalProtoConverterExtension,
260263
) -> Result<protobuf::PhysicalExprNode> {
261264
let expr = value.as_ref();
262265
let expr_id = value.expression_id();
263-
// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
264-
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
265-
// cannot be serialized - the hash table is a runtime structure built during
266-
// execution on the build side.
266+
// HashTableLookupExpr and MultiMapLookupExpr are used for dynamic filter
267+
// pushdown in hash joins. They contain Arc<Map>s (the build-side hash tables)
268+
// which cannot be serialized - the hash tables are runtime structures built
269+
// during execution on the build side.
267270
//
268-
// We replace it with lit(true) which is safe because:
271+
// We replace them with lit(true) which is safe because:
269272
// 1. The filter is a performance optimization, not a correctness requirement
270273
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
271274
// 3. The join itself will still produce correct results, just without the
272275
// benefit of early filtering on the probe side
273276
//
274277
// In distributed execution, the remote worker won't have access to the hash
275278
// table anyway, so the best we can do is skip this optimization.
276-
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
279+
if expr.downcast_ref::<HashTableLookupExpr>().is_some()
280+
|| expr.downcast_ref::<MultiMapLookupExpr>().is_some()
281+
{
277282
let value = datafusion_proto_common::ScalarValue {
278283
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
279284
true,

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2466,6 +2466,53 @@ async fn roundtrip_async_func_exec() -> Result<()> {
24662466
Ok(())
24672467
}
24682468

2469+
/// Test that MultiMapLookupExpr serializes to lit(true)
2470+
///
2471+
/// MultiMapLookupExpr holds runtime build-side hash tables — same reasoning as
2472+
/// [`roundtrip_hash_table_lookup_expr_to_lit`]: the maps cannot cross the wire,
2473+
/// so the optimization is dropped to `lit(true)` and the join still produces
2474+
/// correct results without probe-side prefiltering.
2475+
#[test]
2476+
fn roundtrip_multi_map_lookup_expr_to_lit() -> Result<()> {
2477+
use datafusion::physical_plan::joins::join_hash_map::JoinHashMapU32;
2478+
use datafusion::physical_plan::joins::{Map, MultiMapLookupExpr};
2479+
2480+
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]));
2481+
let input = Arc::new(EmptyExec::new(schema.clone()));
2482+
2483+
let maps = vec![
2484+
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(0)))),
2485+
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(0)))),
2486+
];
2487+
let on_columns = vec![datafusion::physical_plan::expressions::col("col", &schema)?];
2488+
let lookup_expr: Arc<dyn PhysicalExpr> = Arc::new(MultiMapLookupExpr::new(
2489+
on_columns,
2490+
datafusion::physical_plan::joins::SeededRandomState::with_seed(0),
2491+
maps,
2492+
"test_multi_lookup".to_string(),
2493+
));
2494+
2495+
let filter = Arc::new(FilterExec::try_new(lookup_expr, input)?);
2496+
2497+
let ctx = SessionContext::new();
2498+
let codec = DefaultPhysicalExtensionCodec {};
2499+
2500+
let proto: PhysicalPlanNode =
2501+
PhysicalPlanNode::try_from_physical_plan(filter.clone(), &codec)
2502+
.expect("serialization should succeed");
2503+
2504+
let result: Arc<dyn ExecutionPlan> = proto
2505+
.try_into_physical_plan(&ctx.task_ctx(), &codec)
2506+
.expect("deserialization should succeed");
2507+
2508+
let result_filter = result.downcast_ref::<FilterExec>().unwrap();
2509+
let predicate = result_filter.predicate();
2510+
let literal = predicate.downcast_ref::<Literal>().unwrap();
2511+
assert_eq!(*literal.value(), ScalarValue::Boolean(Some(true)));
2512+
2513+
Ok(())
2514+
}
2515+
24692516
/// Test that HashTableLookupExpr serializes to lit(true)
24702517
///
24712518
/// HashTableLookupExpr contains a runtime hash table that cannot be serialized.
@@ -2517,6 +2564,7 @@ fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> {
25172564
}
25182565

25192566
#[test]
2567+
#[expect(deprecated)] // HashExpr is deprecated but still proto-serialized for wire compatibility.
25202568
fn roundtrip_hash_expr() -> Result<()> {
25212569
use datafusion::physical_plan::joins::{HashExpr, SeededRandomState};
25222570

0 commit comments

Comments
 (0)