Skip to content

Commit 9429c4b

Browse files
adriangbLiaCastaneda
authored andcommitted
replace HashTableLookupExpr with lit(true) in proto serialization (apache#19300)
*errors* when serializing now, and would break any users using joins + protobuf.
1 parent e738580 commit 9429c4b

5 files changed

Lines changed: 148 additions & 6 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! [`HashJoinExec`] Partitioned Hash Join Operator
1919
2020
pub use exec::HashJoinExec;
21+
pub use partitioned_hash_eval::HashTableLookupExpr;
2122

2223
mod exec;
2324
mod inlist_builder;

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
4242
/// This is used for:
4343
/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
4444
/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
45-
pub(super) struct HashExpr {
45+
pub struct HashExpr {
4646
/// Columns to hash
4747
on_columns: Vec<PhysicalExprRef>,
4848
/// Random state for hashing
@@ -179,7 +179,11 @@ impl HashTableLookupExpr {
179179
/// * `hash_expr` - Expression that computes hash values
180180
/// * `hash_map` - Hash table to check membership
181181
/// * `description` - Description for debugging
182-
pub(super) fn new(
182+
///
183+
/// # Note
184+
/// This is public for internal testing purposes only and is not
185+
/// guaranteed to be stable across versions.
186+
pub fn new(
183187
hash_expr: PhysicalExprRef,
184188
hash_map: Arc<dyn JoinHashMapType>,
185189
description: String,

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use arrow::array::BooleanBufferBuilder;
2121
pub use cross_join::CrossJoinExec;
2222
use datafusion_physical_expr::PhysicalExprRef;
23-
pub use hash_join::HashJoinExec;
23+
pub use hash_join::{HashJoinExec, HashTableLookupExpr};
2424
pub use nested_loop_join::NestedLoopJoinExec;
2525
use parking_lot::Mutex;
2626
// Note: SortMergeJoin is not used in plans yet
@@ -37,7 +37,11 @@ mod symmetric_hash_join;
3737
pub mod utils;
3838

3939
mod join_filter;
40-
mod join_hash_map;
40+
/// Hash map implementations for join operations.
41+
///
42+
/// Note: This module is public for internal testing purposes only
43+
/// and is not guaranteed to be stable across versions.
44+
pub mod join_hash_map;
4145

4246
#[cfg(test)]
4347
pub mod test_utils;

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion_physical_plan::expressions::{
4141
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
4242
Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4343
};
44+
use datafusion_physical_plan::joins::HashTableLookupExpr;
4445
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
4546
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
4647
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
@@ -226,6 +227,30 @@ pub fn serialize_physical_expr(
226227
let value = snapshot_physical_expr(Arc::clone(value))?;
227228
let expr = value.as_any();
228229

230+
// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
231+
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
232+
// cannot be serialized - the hash table is a runtime structure built during
233+
// execution on the build side.
234+
//
235+
// We replace it with lit(true) which is safe because:
236+
// 1. The filter is a performance optimization, not a correctness requirement
237+
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
238+
// 3. The join itself will still produce correct results, just without the
239+
// benefit of early filtering on the probe side
240+
//
241+
// In distributed execution, the remote worker won't have access to the hash
242+
// table anyway, so the best we can do is skip this optimization.
243+
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
244+
let value = datafusion_proto_common::ScalarValue {
245+
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
246+
true,
247+
)),
248+
};
249+
return Ok(protobuf::PhysicalExprNode {
250+
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
251+
});
252+
}
253+
229254
if let Some(expr) = expr.downcast_ref::<Column>() {
230255
Ok(protobuf::PhysicalExprNode {
231256
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ use datafusion::physical_plan::expressions::{
7878
};
7979
use datafusion::physical_plan::filter::FilterExec;
8080
use datafusion::physical_plan::joins::{
81-
HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
82-
StreamJoinPartitionMode, SymmetricHashJoinExec,
81+
HashJoinExec, HashTableLookupExpr, NestedLoopJoinExec, PartitionMode,
82+
SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
8383
};
8484
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
8585
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
@@ -113,6 +113,7 @@ use datafusion_expr::{
113113
use datafusion_functions_aggregate::average::avg_udaf;
114114
use datafusion_functions_aggregate::nth_value::nth_value_udaf;
115115
use datafusion_functions_aggregate::string_agg::string_agg_udaf;
116+
use datafusion_physical_plan::joins::join_hash_map::JoinHashMapU32;
116117
use datafusion_proto::physical_plan::{
117118
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
118119
};
@@ -2264,3 +2265,110 @@ async fn roundtrip_listing_table_with_schema_metadata() -> Result<()> {
22642265

22652266
roundtrip_test(plan)
22662267
}
2268+
2269+
#[tokio::test]
2270+
async fn roundtrip_async_func_exec() -> Result<()> {
2271+
#[derive(Debug, PartialEq, Eq, Hash)]
2272+
struct TestAsyncUDF {
2273+
signature: Signature,
2274+
}
2275+
2276+
impl TestAsyncUDF {
2277+
fn new() -> Self {
2278+
Self {
2279+
signature: Signature::exact(vec![DataType::Int64], Volatility::Volatile),
2280+
}
2281+
}
2282+
}
2283+
2284+
impl ScalarUDFImpl for TestAsyncUDF {
2285+
fn as_any(&self) -> &dyn Any {
2286+
self
2287+
}
2288+
2289+
fn name(&self) -> &str {
2290+
"test_async_udf"
2291+
}
2292+
2293+
fn signature(&self) -> &Signature {
2294+
&self.signature
2295+
}
2296+
2297+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
2298+
Ok(DataType::Int64)
2299+
}
2300+
2301+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
2302+
not_impl_err!("Must call from `invoke_async_with_args`")
2303+
}
2304+
}
2305+
2306+
#[async_trait::async_trait]
2307+
impl AsyncScalarUDFImpl for TestAsyncUDF {
2308+
async fn invoke_async_with_args(
2309+
&self,
2310+
args: ScalarFunctionArgs,
2311+
) -> Result<ColumnarValue> {
2312+
Ok(args.args[0].clone())
2313+
}
2314+
}
2315+
2316+
let ctx = SessionContext::new();
2317+
let async_udf = AsyncScalarUDF::new(Arc::new(TestAsyncUDF::new()));
2318+
ctx.register_udf(async_udf.into_scalar_udf());
2319+
2320+
let physical_plan = ctx
2321+
.sql("select test_async_udf(1)")
2322+
.await?
2323+
.create_physical_plan()
2324+
.await?;
2325+
2326+
roundtrip_test_with_context(physical_plan, &ctx)?;
2327+
2328+
Ok(())
2329+
}
2330+
2331+
/// Test that HashTableLookupExpr serializes to lit(true)
2332+
///
2333+
/// HashTableLookupExpr contains a runtime hash table that cannot be serialized.
2334+
/// The serialization code replaces it with lit(true) which is safe because
2335+
/// it's a performance optimization filter, not a correctness requirement.
2336+
#[test]
2337+
fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> {
2338+
// Create a simple schema and input plan
2339+
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]));
2340+
let input = Arc::new(EmptyExec::new(schema.clone()));
2341+
2342+
// Create a HashTableLookupExpr - it will be replaced with lit(true) during serialization
2343+
let hash_map = Arc::new(JoinHashMapU32::with_capacity(0));
2344+
let hash_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("col", 0));
2345+
let lookup_expr: Arc<dyn PhysicalExpr> = Arc::new(HashTableLookupExpr::new(
2346+
hash_expr,
2347+
hash_map,
2348+
"test_lookup".to_string(),
2349+
));
2350+
2351+
// Create a filter with the lookup expression
2352+
let filter = Arc::new(FilterExec::try_new(lookup_expr, input)?);
2353+
2354+
// Serialize
2355+
let ctx = SessionContext::new();
2356+
let codec = DefaultPhysicalExtensionCodec {};
2357+
let proto: protobuf::PhysicalPlanNode =
2358+
protobuf::PhysicalPlanNode::try_from_physical_plan(filter.clone(), &codec)
2359+
.expect("serialization should succeed");
2360+
2361+
// Deserialize
2362+
let result: Arc<dyn ExecutionPlan> = proto
2363+
.try_into_physical_plan(&ctx.task_ctx(), &codec)
2364+
.expect("deserialization should succeed");
2365+
2366+
// The deserialized plan should have lit(true) instead of HashTableLookupExpr
2367+
// Verify the filter predicate is a Literal(true)
2368+
let result_filter = result.as_any().downcast_ref::<FilterExec>().unwrap();
2369+
let predicate = result_filter.predicate();
2370+
let literal = predicate.as_any().downcast_ref::<Literal>().unwrap();
2371+
assert_eq!(*literal.value(), ScalarValue::Boolean(Some(true)));
2372+
2373+
Ok(())
2374+
}

0 commit comments

Comments
 (0)