@@ -78,8 +78,8 @@ use datafusion::physical_plan::expressions::{
7878} ;
7979use datafusion:: physical_plan:: filter:: FilterExec ;
8080use datafusion:: physical_plan:: joins:: {
81- HashJoinExec , NestedLoopJoinExec , PartitionMode , SortMergeJoinExec ,
82- StreamJoinPartitionMode , SymmetricHashJoinExec ,
81+ HashJoinExec , HashTableLookupExpr , NestedLoopJoinExec , PartitionMode ,
82+ SortMergeJoinExec , StreamJoinPartitionMode , SymmetricHashJoinExec ,
8383} ;
8484use datafusion:: physical_plan:: limit:: { GlobalLimitExec , LocalLimitExec } ;
8585use datafusion:: physical_plan:: placeholder_row:: PlaceholderRowExec ;
@@ -113,6 +113,7 @@ use datafusion_expr::{
113113use datafusion_functions_aggregate:: average:: avg_udaf;
114114use datafusion_functions_aggregate:: nth_value:: nth_value_udaf;
115115use datafusion_functions_aggregate:: string_agg:: string_agg_udaf;
116+ use datafusion_physical_plan:: joins:: join_hash_map:: JoinHashMapU32 ;
116117use datafusion_proto:: physical_plan:: {
117118 AsExecutionPlan , DefaultPhysicalExtensionCodec , PhysicalExtensionCodec ,
118119} ;
@@ -2264,3 +2265,48 @@ async fn roundtrip_listing_table_with_schema_metadata() -> Result<()> {
22642265
22652266 roundtrip_test ( plan)
22662267}
2268+
2269+ /// Test that HashTableLookupExpr serializes to lit(true)
2270+ ///
2271+ /// HashTableLookupExpr contains a runtime hash table that cannot be serialized.
2272+ /// The serialization code replaces it with lit(true) which is safe because
2273+ /// it's a performance optimization filter, not a correctness requirement.
2274+ #[ test]
2275+ fn roundtrip_hash_table_lookup_expr_to_lit ( ) -> Result < ( ) > {
2276+ // Create a simple schema and input plan
2277+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "col" , DataType :: Int64 , false ) ] ) ) ;
2278+ let input = Arc :: new ( EmptyExec :: new ( schema. clone ( ) ) ) ;
2279+
2280+ // Create a HashTableLookupExpr - it will be replaced with lit(true) during serialization
2281+ let hash_map = Arc :: new ( JoinHashMapU32 :: with_capacity ( 0 ) ) ;
2282+ let hash_expr: Arc < dyn PhysicalExpr > = Arc :: new ( Column :: new ( "col" , 0 ) ) ;
2283+ let lookup_expr: Arc < dyn PhysicalExpr > = Arc :: new ( HashTableLookupExpr :: new (
2284+ hash_expr,
2285+ hash_map,
2286+ "test_lookup" . to_string ( ) ,
2287+ ) ) ;
2288+
2289+ // Create a filter with the lookup expression
2290+ let filter = Arc :: new ( FilterExec :: try_new ( lookup_expr, input) ?) ;
2291+
2292+ // Serialize
2293+ let ctx = SessionContext :: new ( ) ;
2294+ let codec = DefaultPhysicalExtensionCodec { } ;
2295+ let proto: protobuf:: PhysicalPlanNode =
2296+ protobuf:: PhysicalPlanNode :: try_from_physical_plan ( filter. clone ( ) , & codec)
2297+ . expect ( "serialization should succeed" ) ;
2298+
2299+ // Deserialize
2300+ let result: Arc < dyn ExecutionPlan > = proto
2301+ . try_into_physical_plan ( & ctx. task_ctx ( ) , & codec)
2302+ . expect ( "deserialization should succeed" ) ;
2303+
2304+ // The deserialized plan should have lit(true) instead of HashTableLookupExpr
2305+ // Verify the filter predicate is a Literal(true)
2306+ let result_filter = result. as_any ( ) . downcast_ref :: < FilterExec > ( ) . unwrap ( ) ;
2307+ let predicate = result_filter. predicate ( ) ;
2308+ let literal = predicate. as_any ( ) . downcast_ref :: < Literal > ( ) . unwrap ( ) ;
2309+ assert_eq ! ( * literal. value( ) , ScalarValue :: Boolean ( Some ( true ) ) ) ;
2310+
2311+ Ok ( ( ) )
2312+ }
0 commit comments