Skip to content

Commit 1650a56

Browse files
adriangbLiaCastaneda
authored andcommitted
fix bounds accumulator reset in HashJoinExec dynamic filter pushdown (apache#17371)
1 parent 43ec445 commit 1650a56

2 files changed

Lines changed: 43 additions & 37 deletions

File tree

  • datafusion

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,11 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
823823
let plan = FilterPushdown::new_post_optimization()
824824
.optimize(plan, &config)
825825
.unwrap();
826+
827+
// Test for https://github.com/apache/datafusion/pull/17371: dynamic filter linking survives `with_new_children`
828+
let children = plan.children().into_iter().map(Arc::clone).collect();
829+
let plan = plan.with_new_children(children).unwrap();
830+
826831
let config = SessionConfig::new().with_batch_size(10);
827832
let session_ctx = SessionContext::new_with_config(config);
828833
session_ctx.register_object_store(

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -625,12 +625,18 @@ pub struct HashJoinExec {
625625
/// Cache holding plan properties like equivalences, output partitioning etc.
626626
cache: PlanProperties,
627627
/// Dynamic filter for pushing down to the probe side
628-
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result
629-
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
630-
/// Shared bounds accumulator for coordinating dynamic filter updates across partitions
631-
/// Only created when dynamic filter pushdown is enabled.
632-
/// Lazily initialized at execution time to use actual runtime partition counts
633-
bounds_accumulator: Option<OnceLock<Arc<SharedBoundsAccumulator>>>,
628+
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
629+
/// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
630+
dynamic_filter: Option<HashJoinExecDynamicFilter>,
631+
}
632+
633+
#[derive(Clone)]
634+
struct HashJoinExecDynamicFilter {
635+
/// Dynamic filter that we'll update with the results of the build side once that is done.
636+
filter: Arc<DynamicFilterPhysicalExpr>,
637+
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
638+
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
639+
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
634640
}
635641

636642
impl fmt::Debug for HashJoinExec {
@@ -718,7 +724,6 @@ impl HashJoinExec {
718724
null_equality,
719725
cache,
720726
dynamic_filter: None,
721-
bounds_accumulator: None,
722727
})
723728
}
724729

@@ -1095,7 +1100,6 @@ impl ExecutionPlan for HashJoinExec {
10951100
)?,
10961101
// Keep the dynamic filter, bounds accumulator will be reset
10971102
dynamic_filter: self.dynamic_filter.clone(),
1098-
bounds_accumulator: None,
10991103
}))
11001104
}
11011105

@@ -1118,7 +1122,6 @@ impl ExecutionPlan for HashJoinExec {
11181122
cache: self.cache.clone(),
11191123
// Reset dynamic filter and bounds accumulator to initial state
11201124
dynamic_filter: None,
1121-
bounds_accumulator: None,
11221125
}))
11231126
}
11241127

@@ -1200,32 +1203,28 @@ impl ExecutionPlan for HashJoinExec {
12001203
let batch_size = context.session_config().batch_size();
12011204

12021205
// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
1203-
let bounds_accumulator = if enable_dynamic_filter_pushdown
1204-
&& self.dynamic_filter.is_some()
1205-
{
1206-
if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator {
1207-
let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap());
1208-
let on_right = self
1209-
.on
1210-
.iter()
1211-
.map(|(_, right_expr)| Arc::clone(right_expr))
1212-
.collect::<Vec<_>>();
1213-
1214-
Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| {
1215-
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
1216-
self.mode,
1217-
self.left.as_ref(),
1218-
self.right.as_ref(),
1219-
dynamic_filter,
1220-
on_right,
1221-
))
1222-
})))
1223-
} else {
1224-
None
1225-
}
1226-
} else {
1227-
None
1228-
};
1206+
let bounds_accumulator = enable_dynamic_filter_pushdown
1207+
.then(|| {
1208+
self.dynamic_filter.as_ref().map(|df| {
1209+
let filter = Arc::clone(&df.filter);
1210+
let on_right = self
1211+
.on
1212+
.iter()
1213+
.map(|(_, right_expr)| Arc::clone(right_expr))
1214+
.collect::<Vec<_>>();
1215+
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
1216+
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
1217+
self.mode,
1218+
self.left.as_ref(),
1219+
self.right.as_ref(),
1220+
filter,
1221+
on_right,
1222+
))
1223+
})))
1224+
})
1225+
})
1226+
.flatten()
1227+
.flatten();
12291228

12301229
// we have the batches and the hash map with their keys. We can how create a stream
12311230
// over the right that uses this information to issue new batches.
@@ -1419,8 +1418,10 @@ impl ExecutionPlan for HashJoinExec {
14191418
column_indices: self.column_indices.clone(),
14201419
null_equality: self.null_equality,
14211420
cache: self.cache.clone(),
1422-
dynamic_filter: Some(dynamic_filter),
1423-
bounds_accumulator: Some(OnceLock::new()),
1421+
dynamic_filter: Some(HashJoinExecDynamicFilter {
1422+
filter: dynamic_filter,
1423+
bounds_accumulator: OnceLock::new(),
1424+
}),
14241425
});
14251426
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
14261427
}

0 commit comments

Comments
 (0)