Skip to content

Commit f56833b

Browse files
LiaCastanedaahmed-mez
authored andcommitted
Add field to DynamicPhysicalExpr to indicate when the filter is complete or updated (apache#18799)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change Dynamic filter pushdown in DataFusion currently lacks an API to determine when filters are "complete" (all contributing partitions have reported), this creates an ambiguity issue where it's impossible to differentiate between: 1. **Complete filter with no data**: Build side produced 0 rows, filter remains as placeholder `lit(true)`, no more updates coming 2. **Incomplete filter**: Filter is still being computed, updates are pending I think this could be especially useful when we want to make the filter updates progressively in the future. ## What changes are included in this PR? - Calls `mark_complete()` after barrier completes, regardless of whether bounds exist. - Exposes` is_complete() f`unction on the `DynamicFilterPhysicalExpr`. ## Are these changes tested? I didn't add any tests because the change is minimal , and comprehensive testing would require making the `DynamicFilterPhysicalExpr` public or running through the full optimizer pipeline. ## Are there any user-facing changes? Exposing is_complete() function.
1 parent fd35a09 commit f56833b

6 files changed

Lines changed: 257 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] }
5555
parking_lot = { workspace = true }
5656
paste = "^1.0"
5757
petgraph = "0.8.3"
58+
tokio = { workspace = true }
5859

5960
[dev-dependencies]
6061
arrow = { workspace = true, features = ["test_utils"] }
@@ -79,3 +80,6 @@ name = "is_null"
7980
[[bench]]
8081
harness = false
8182
name = "binary_op"
83+
84+
[package.metadata.cargo-machete]
85+
ignored = ["half"]

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use parking_lot::RwLock;
1919
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
20+
use tokio::sync::watch;
2021

2122
use crate::PhysicalExpr;
2223
use arrow::datatypes::{DataType, Schema};
@@ -27,6 +28,24 @@ use datafusion_common::{
2728
use datafusion_expr::ColumnarValue;
2829
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
2930

31+
/// State of a dynamic filter, tracking both updates and completion.
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33+
enum FilterState {
34+
/// Filter is in progress and may receive more updates.
35+
InProgress { generation: u64 },
36+
/// Filter is complete and will not receive further updates.
37+
Complete { generation: u64 },
38+
}
39+
40+
impl FilterState {
41+
fn generation(&self) -> u64 {
42+
match self {
43+
FilterState::InProgress { generation }
44+
| FilterState::Complete { generation } => *generation,
45+
}
46+
}
47+
}
48+
3049
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
3150
///
3251
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
@@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr {
4463
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
4564
/// The source of dynamic filters.
4665
inner: Arc<RwLock<Inner>>,
66+
/// Broadcasts filter state (updates and completion) to all waiters.
67+
state_watch: watch::Sender<FilterState>,
4768
/// For testing purposes track the data type and nullability to make sure they don't change.
4869
/// If they do, there's a bug in the implementation.
4970
/// But this can have overhead in production, so it's only included in our tests.
@@ -57,6 +78,10 @@ struct Inner {
5778
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
5879
generation: u64,
5980
expr: Arc<dyn PhysicalExpr>,
81+
/// Flag for quick synchronous check if filter is complete.
82+
/// This is redundant with the watch channel state, but allows us to return immediately
83+
/// from `wait_complete()` without subscribing if already complete.
84+
is_complete: bool,
6085
}
6186

6287
impl Inner {
@@ -66,6 +91,7 @@ impl Inner {
6691
// This is not currently used anywhere but it seems useful to have this simple distinction.
6792
generation: 1,
6893
expr,
94+
is_complete: false,
6995
}
7096
}
7197

@@ -134,10 +160,12 @@ impl DynamicFilterPhysicalExpr {
134160
children: Vec<Arc<dyn PhysicalExpr>>,
135161
inner: Arc<dyn PhysicalExpr>,
136162
) -> Self {
163+
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
137164
Self {
138165
children,
139166
remapped_children: None, // Initially no remapped children
140167
inner: Arc::new(RwLock::new(Inner::new(inner))),
168+
state_watch,
141169
data_type: Arc::new(RwLock::new(None)),
142170
nullable: Arc::new(RwLock::new(None)),
143171
}
@@ -185,7 +213,7 @@ impl DynamicFilterPhysicalExpr {
185213
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
186214
}
187215

188-
/// Update the current expression.
216+
/// Update the current expression and notify all waiters.
189217
/// Any children of this expression must be a subset of the original children
190218
/// passed to the constructor.
191219
/// This should be called e.g.:
@@ -204,13 +232,68 @@ impl DynamicFilterPhysicalExpr {
204232

205233
// Load the current inner, increment generation, and store the new one
206234
let mut current = self.inner.write();
235+
let new_generation = current.generation + 1;
207236
*current = Inner {
208-
generation: current.generation + 1,
237+
generation: new_generation,
209238
expr: new_expr,
239+
is_complete: current.is_complete,
210240
};
241+
drop(current); // Release the lock before broadcasting
242+
243+
// Broadcast the new state to all waiters
244+
let _ = self.state_watch.send(FilterState::InProgress {
245+
generation: new_generation,
246+
});
211247
Ok(())
212248
}
213249

250+
/// Mark this dynamic filter as complete and broadcast to all waiters.
251+
///
252+
/// This signals that all expected updates have been received.
253+
/// Waiters using [`Self::wait_complete`] will be notified.
254+
pub fn mark_complete(&self) {
255+
let mut current = self.inner.write();
256+
let current_generation = current.generation;
257+
current.is_complete = true;
258+
drop(current);
259+
260+
// Broadcast completion to all waiters
261+
let _ = self.state_watch.send(FilterState::Complete {
262+
generation: current_generation,
263+
});
264+
}
265+
266+
/// Wait asynchronously for any update to this filter.
267+
///
268+
/// This method will return when [`Self::update`] is called and the generation increases.
269+
/// It does not guarantee that the filter is complete.
270+
pub async fn wait_update(&self) {
271+
let mut rx = self.state_watch.subscribe();
272+
// Get the current generation
273+
let current_gen = rx.borrow_and_update().generation();
274+
275+
// Wait until generation increases
276+
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
277+
}
278+
279+
/// Wait asynchronously until this dynamic filter is marked as complete.
280+
///
281+
/// This method returns immediately if the filter is already complete.
282+
/// Otherwise, it waits until [`Self::mark_complete`] is called.
283+
///
284+
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
285+
/// the filter is fully complete with no more updates expected.
286+
pub async fn wait_complete(&self) {
287+
if self.inner.read().is_complete {
288+
return;
289+
}
290+
291+
let mut rx = self.state_watch.subscribe();
292+
let _ = rx
293+
.wait_for(|state| matches!(state, FilterState::Complete { .. }))
294+
.await;
295+
}
296+
214297
fn render(
215298
&self,
216299
f: &mut std::fmt::Formatter<'_>,
@@ -253,6 +336,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
253336
children: self.children.clone(),
254337
remapped_children: Some(children),
255338
inner: Arc::clone(&self.inner),
339+
state_watch: self.state_watch.clone(),
256340
data_type: Arc::clone(&self.data_type),
257341
nullable: Arc::clone(&self.nullable),
258342
}))
@@ -509,4 +593,18 @@ mod test {
509593
"Expected err when evaluate is called after changing the expression."
510594
);
511595
}
596+
597+
#[tokio::test]
598+
async fn test_wait_complete_already_complete() {
599+
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
600+
vec![],
601+
lit(42) as Arc<dyn PhysicalExpr>,
602+
));
603+
604+
// Mark as complete immediately
605+
dynamic_filter.mark_complete();
606+
607+
// wait_complete should return immediately
608+
dynamic_filter.wait_complete().await;
609+
}
512610
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4500,4 +4500,103 @@ mod tests {
45004500
fn columns(schema: &Schema) -> Vec<String> {
45014501
schema.fields().iter().map(|f| f.name().clone()).collect()
45024502
}
4503+
4504+
/// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
4505+
#[tokio::test]
4506+
async fn test_hash_join_marks_filter_complete() -> Result<()> {
4507+
let task_ctx = Arc::new(TaskContext::default());
4508+
let left = build_table(
4509+
("a1", &vec![1, 2, 3]),
4510+
("b1", &vec![4, 5, 6]),
4511+
("c1", &vec![7, 8, 9]),
4512+
);
4513+
let right = build_table(
4514+
("a2", &vec![10, 20, 30]),
4515+
("b1", &vec![4, 5, 6]),
4516+
("c2", &vec![70, 80, 90]),
4517+
);
4518+
4519+
let on = vec![(
4520+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4521+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4522+
)];
4523+
4524+
// Create a dynamic filter manually
4525+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4526+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4527+
4528+
// Create HashJoinExec with the dynamic filter
4529+
let mut join = HashJoinExec::try_new(
4530+
left,
4531+
right,
4532+
on,
4533+
None,
4534+
&JoinType::Inner,
4535+
None,
4536+
PartitionMode::CollectLeft,
4537+
NullEquality::NullEqualsNothing,
4538+
)?;
4539+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4540+
filter: dynamic_filter,
4541+
build_accumulator: OnceLock::new(),
4542+
});
4543+
4544+
// Execute the join
4545+
let stream = join.execute(0, task_ctx)?;
4546+
let _batches = common::collect(stream).await?;
4547+
4548+
// After the join completes, the dynamic filter should be marked as complete
4549+
// wait_complete() should return immediately
4550+
dynamic_filter_clone.wait_complete().await;
4551+
4552+
Ok(())
4553+
}
4554+
4555+
/// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
4556+
#[tokio::test]
4557+
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
4558+
let task_ctx = Arc::new(TaskContext::default());
4559+
// Empty left side (build side)
4560+
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
4561+
let right = build_table(
4562+
("a2", &vec![10, 20, 30]),
4563+
("b1", &vec![4, 5, 6]),
4564+
("c2", &vec![70, 80, 90]),
4565+
);
4566+
4567+
let on = vec![(
4568+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4569+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4570+
)];
4571+
4572+
// Create a dynamic filter manually
4573+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4574+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4575+
4576+
// Create HashJoinExec with the dynamic filter
4577+
let mut join = HashJoinExec::try_new(
4578+
left,
4579+
right,
4580+
on,
4581+
None,
4582+
&JoinType::Inner,
4583+
None,
4584+
PartitionMode::CollectLeft,
4585+
NullEquality::NullEqualsNothing,
4586+
)?;
4587+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4588+
filter: dynamic_filter,
4589+
build_accumulator: OnceLock::new(),
4590+
});
4591+
4592+
// Execute the join
4593+
let stream = join.execute(0, task_ctx)?;
4594+
let _batches = common::collect(stream).await?;
4595+
4596+
// Even with empty build side, the dynamic filter should be marked as complete
4597+
// wait_complete() should return immediately
4598+
dynamic_filter_clone.wait_complete().await;
4599+
4600+
Ok(())
4601+
}
45034602
}

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ impl SharedBoundsAccumulator {
300300
self.create_filter_from_partition_bounds(&inner.bounds)?;
301301
self.dynamic_filter.update(filter_expr)?;
302302
}
303+
self.dynamic_filter.mark_complete();
303304
}
304305

305306
Ok(())

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,10 +589,13 @@ impl TopK {
589589
common_sort_prefix_converter: _,
590590
common_sort_prefix: _,
591591
finished: _,
592-
filter: _,
592+
filter,
593593
} = self;
594594
let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
595595

596+
// Mark the dynamic filter as complete now that TopK processing is finished.
597+
filter.read().expr().mark_complete();
598+
596599
// break into record batches as needed
597600
let mut batches = vec![];
598601
if let Some(mut batch) = heap.emit()? {
@@ -1196,4 +1199,52 @@ mod tests {
11961199

11971200
Ok(())
11981201
}
1202+
1203+
/// This test verifies that the dynamic filter is marked as complete after TopK processing finishes.
1204+
#[tokio::test]
1205+
async fn test_topk_marks_filter_complete() -> Result<()> {
1206+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1207+
1208+
let sort_expr = PhysicalSortExpr {
1209+
expr: col("a", schema.as_ref())?,
1210+
options: SortOptions::default(),
1211+
};
1212+
1213+
let full_expr = LexOrdering::from([sort_expr.clone()]);
1214+
let prefix = vec![sort_expr];
1215+
1216+
// Create a dummy runtime environment and metrics
1217+
let runtime = Arc::new(RuntimeEnv::default());
1218+
let metrics = ExecutionPlanMetricsSet::new();
1219+
1220+
// Create a dynamic filter that we'll check for completion
1221+
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true)));
1222+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
1223+
1224+
// Create a TopK instance
1225+
let mut topk = TopK::try_new(
1226+
0,
1227+
Arc::clone(&schema),
1228+
prefix,
1229+
full_expr,
1230+
2,
1231+
10,
1232+
runtime,
1233+
&metrics,
1234+
Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))),
1235+
)?;
1236+
1237+
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)]));
1238+
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?;
1239+
topk.insert_batch(batch)?;
1240+
1241+
// Call emit to finish TopK processing
1242+
let _results: Vec<_> = topk.emit()?.try_collect().await?;
1243+
1244+
// After emit is called, the dynamic filter should be marked as complete
1245+
// wait_complete() should return immediately
1246+
dynamic_filter_clone.wait_complete().await;
1247+
1248+
Ok(())
1249+
}
11991250
}

0 commit comments

Comments
 (0)