Skip to content

Commit 2e7c261

Browse files
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Removing `Arc`-based deduplication. We now only dedupe on `expression_id` if the `PhysicalExpr` reports a `expression_id`. After this change, only `DynamicFilterPhysicalExpr` reports an `expression_id` to be deduped. 4. `expression_id` is now just a random u64. Since a given query likely only has a few `DynamicFilterPhysicalExpr` instances, the odds of a collision are very low 5. There's no need for a `DedupingSerializer` anymore since the `expression_id` is already stored in the dynamic filter proto itself Testing - adds tests which roundtrip dynamic filters and assert that referential integrity is maintained - removes tests that test `Arc`-based deduplication and session id rotation
1 parent 720aaff commit 2e7c261

13 files changed

Lines changed: 831 additions & 524 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
163163
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
164164

165165
/// Returns a new PhysicalExpr where all children were replaced by new exprs.
166+
///
167+
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
168+
/// the identifier should be preserved by the new expression.
166169
fn with_new_children(
167170
self: Arc<Self>,
168171
children: Vec<Arc<dyn PhysicalExpr>>,
@@ -446,6 +449,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
446449
fn placement(&self) -> ExpressionPlacement {
447450
ExpressionPlacement::KeepInPlace
448451
}
452+
453+
/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
454+
/// has one.
455+
///
456+
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
457+
/// where two expressions may be different but store the same mutable inner state). Tracking
458+
/// connected expressions helps preserve referential integrity within plan nodes
459+
/// during serialization and deserialization.
460+
///
461+
/// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other
462+
/// methods which may want to preserve identity.
463+
///
464+
/// Default is `None`: the expression has no identity worth preserving across a
465+
/// serialization boundary.
466+
fn expression_id(&self) -> Option<u64> {
467+
None
468+
}
449469
}
450470

451471
#[deprecated(

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ indexmap = { workspace = true }
5555
itertools = { workspace = true, features = ["use_std"] }
5656
parking_lot = { workspace = true }
5757
petgraph = "0.8.3"
58+
rand = { workspace = true }
5859
recursive = { workspace = true, optional = true }
5960
tokio = { workspace = true }
6061
half = { workspace = true }
@@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] }
6465
criterion = { workspace = true }
6566
datafusion-functions = { workspace = true }
6667
insta = { workspace = true }
67-
rand = { workspace = true }
6868
rstest = { workspace = true }
6969

7070
[[bench]]

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 219 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion_common::{
2727
};
2828
use datafusion_expr::ColumnarValue;
2929
use datafusion_physical_expr_common::physical_expr::DynHash;
30+
use rand::random;
3031

3132
/// State of a dynamic filter, tracking both updates and completion.
3233
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -55,7 +56,6 @@ impl FilterState {
5556
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
5657
///
5758
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
58-
#[derive(Debug)]
5959
pub struct DynamicFilterPhysicalExpr {
6060
/// The original children of this PhysicalExpr, if any.
6161
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
@@ -65,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr {
6565
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
6666
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
6767
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
/// Unique identifier for this dynamic filter.
69+
///
70+
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
71+
expression_id: u64,
6872
/// The source of dynamic filters.
6973
inner: Arc<RwLock<Inner>>,
7074
/// Broadcasts filter state (updates and completion) to all waiters.
@@ -76,16 +80,40 @@ pub struct DynamicFilterPhysicalExpr {
7680
nullable: Arc<RwLock<Option<bool>>>,
7781
}
7882

79-
#[derive(Debug)]
80-
struct Inner {
83+
/// Atomic internal state of a [`DynamicFilterPhysicalExpr`].
84+
///
85+
/// **Warning:** exposed publicly solely so that proto (de)serialization in
86+
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
87+
/// or its layout as a stable API.
88+
#[derive(Debug, Clone)]
89+
pub struct Inner {
8190
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
8291
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
83-
generation: u64,
84-
expr: Arc<dyn PhysicalExpr>,
92+
pub generation: u64,
93+
pub expr: Arc<dyn PhysicalExpr>,
8594
/// Flag for quick synchronous check if filter is complete.
8695
/// This is redundant with the watch channel state, but allows us to return immediately
8796
/// from `wait_complete()` without subscribing if already complete.
88-
is_complete: bool,
97+
pub is_complete: bool,
98+
}
99+
100+
// TODO: Include expression_id in debug output.
101+
//
102+
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
103+
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
104+
// dynamic filter. This causes round trips to fail on the `expression_id`
105+
// because it is regenerated on deserialization.
106+
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
107+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108+
f.debug_struct("DynamicFilterPhysicalExpr")
109+
.field("children", &self.children)
110+
.field("remapped_children", &self.remapped_children)
111+
.field("inner", &self.inner)
112+
.field("state_watch", &self.state_watch)
113+
.field("data_type", &self.data_type)
114+
.field("nullable", &self.nullable)
115+
.finish()
116+
}
89117
}
90118

91119
impl Inner {
@@ -173,6 +201,7 @@ impl DynamicFilterPhysicalExpr {
173201
Self {
174202
children,
175203
remapped_children: None, // Initially no remapped children
204+
expression_id: Self::new_expression_id(),
176205
inner: Arc::new(RwLock::new(Inner::new(inner))),
177206
state_watch,
178207
data_type: Arc::new(RwLock::new(None)),
@@ -346,6 +375,75 @@ impl DynamicFilterPhysicalExpr {
346375

347376
write!(f, " ]")
348377
}
378+
379+
/// Generate a new expression id for this filter.
380+
fn new_expression_id() -> u64 {
381+
random::<u64>()
382+
}
383+
384+
/// Return the filter's original children (before any remapping).
385+
///
386+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
387+
/// Not a stable API.
388+
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
389+
&self.children
390+
}
391+
392+
/// Return the filter's remapped children, if any have been set via
393+
/// [`PhysicalExpr::with_new_children`].
394+
///
395+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
396+
/// Not a stable API.
397+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
398+
self.remapped_children.as_deref()
399+
}
400+
401+
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
402+
/// proto deserialization to preserve `expression_id` across a roundtrip
403+
/// rather than minting a fresh one.
404+
///
405+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
406+
/// Not a stable API.
407+
pub fn from_parts(
408+
expression_id: u64,
409+
children: Vec<Arc<dyn PhysicalExpr>>,
410+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
411+
inner: Inner,
412+
) -> Self {
413+
let state = if inner.is_complete {
414+
FilterState::Complete {
415+
generation: inner.generation,
416+
}
417+
} else {
418+
FilterState::InProgress {
419+
generation: inner.generation,
420+
}
421+
};
422+
let (state_watch, _) = watch::channel(state);
423+
424+
Self {
425+
children,
426+
remapped_children,
427+
expression_id,
428+
inner: Arc::new(RwLock::new(inner)),
429+
state_watch,
430+
data_type: Arc::new(RwLock::new(None)),
431+
nullable: Arc::new(RwLock::new(None)),
432+
}
433+
}
434+
435+
/// Return a clone of the atomically-captured `Inner` state.
436+
///
437+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
438+
/// Not a stable API.
439+
pub fn inner(&self) -> Inner {
440+
let guard = self.inner.read();
441+
Inner {
442+
generation: guard.generation,
443+
expr: Arc::clone(&guard.expr),
444+
is_complete: guard.is_complete,
445+
}
446+
}
349447
}
350448

351449
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -364,6 +462,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
364462
Ok(Arc::new(Self {
365463
children: self.children.clone(),
366464
remapped_children: Some(children),
465+
// Note that we ensure the derived expression linked to `self`
466+
// via the unique identifier.
467+
expression_id: self.expression_id,
367468
inner: Arc::clone(&self.inner),
368469
state_watch: self.state_watch.clone(),
369470
data_type: Arc::clone(&self.data_type),
@@ -444,6 +545,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
444545
// Return the current generation of the expression.
445546
self.inner.read().generation
446547
}
548+
549+
fn expression_id(&self) -> Option<u64> {
550+
Some(self.expression_id)
551+
}
447552
}
448553

449554
#[cfg(test)]
@@ -861,4 +966,112 @@ mod test {
861966
"Hash should be stable after update (identity-based)"
862967
);
863968
}
969+
970+
/// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr`
971+
/// whose observable state (original children, remapped children,
972+
/// expression id, inner generation/expr/is_complete) matches the source
973+
/// filter.
974+
#[test]
975+
fn test_from_parts_preserves_state() {
976+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
977+
let col_a = col("a", &schema).unwrap();
978+
979+
// Create a dynamic filter with children
980+
let expr = Arc::new(BinaryExpr::new(
981+
Arc::clone(&col_a),
982+
datafusion_expr::Operator::Gt,
983+
lit(10) as Arc<dyn PhysicalExpr>,
984+
));
985+
let filter = DynamicFilterPhysicalExpr::new(
986+
vec![Arc::clone(&col_a)],
987+
expr as Arc<dyn PhysicalExpr>,
988+
);
989+
990+
// Add remapped children.
991+
let reassigned_schema = Arc::new(Schema::new(vec![
992+
Field::new("b", DataType::Int32, false),
993+
Field::new("a", DataType::Int32, false),
994+
]));
995+
let reassigned = reassign_expr_columns(
996+
Arc::new(filter) as Arc<dyn PhysicalExpr>,
997+
&reassigned_schema,
998+
)
999+
.expect("reassign_expr_columns should succeed");
1000+
let reassigned = reassigned
1001+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1002+
.expect("Expected dynamic filter after reassignment");
1003+
1004+
reassigned
1005+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1006+
.expect("Update should succeed");
1007+
reassigned.mark_complete();
1008+
1009+
// Capture the parts and reconstruct.
1010+
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
1011+
reassigned.expression_id,
1012+
reassigned.original_children().to_vec(),
1013+
reassigned.remapped_children().map(|r| r.to_vec()),
1014+
reassigned.inner(),
1015+
);
1016+
1017+
assert_eq!(
1018+
reassigned.original_children(),
1019+
reconstructed.original_children(),
1020+
);
1021+
assert_eq!(
1022+
reassigned.remapped_children(),
1023+
reconstructed.remapped_children(),
1024+
);
1025+
assert_eq!(reassigned.expression_id(), reconstructed.expression_id());
1026+
let r = reassigned.inner();
1027+
let c = reconstructed.inner();
1028+
assert_eq!(r.generation, c.generation);
1029+
assert_eq!(r.is_complete, c.is_complete);
1030+
assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr));
1031+
}
1032+
1033+
#[tokio::test]
1034+
async fn test_expression_id() {
1035+
let source_schema =
1036+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1037+
let col_a = col("a", &source_schema).unwrap();
1038+
1039+
// Create a source filter
1040+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1041+
vec![Arc::clone(&col_a)],
1042+
lit(true) as Arc<dyn PhysicalExpr>,
1043+
));
1044+
let source_clone = Arc::clone(&source);
1045+
1046+
// Create a derived filter by reassigning the source filter to a different schema.
1047+
let derived_schema = Arc::new(Schema::new(vec![
1048+
Field::new("x", DataType::Int32, false),
1049+
Field::new("a", DataType::Int32, false),
1050+
]));
1051+
let derived = reassign_expr_columns(
1052+
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
1053+
&derived_schema,
1054+
)
1055+
.expect("reassign_expr_columns should succeed");
1056+
1057+
let derived_expression_id = derived
1058+
.expression_id()
1059+
.expect("derived filter should have an expression id");
1060+
let source_expression_id = source
1061+
.expression_id()
1062+
.expect("source filter should have an expression id");
1063+
let source_clone_expression_id = source_clone
1064+
.expression_id()
1065+
.expect("source clone should have an expression id");
1066+
1067+
assert_eq!(
1068+
source_clone_expression_id, source_expression_id,
1069+
"cloned filter should preserve its expression id",
1070+
);
1071+
1072+
assert_eq!(
1073+
derived_expression_id, source_expression_id,
1074+
"derived filters should carry forward the source expression id",
1075+
);
1076+
}
8641077
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use case::{CaseExpr, case};
4545
pub use cast::{CastExpr, cast};
4646
pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
48-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
48+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
4949
pub use in_list::{InListExpr, in_list};
5050
pub use is_not_null::{IsNotNullExpr, is_not_null};
5151
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ datafusion-proto-common = { workspace = true }
6666
object_store = { workspace = true }
6767
pbjson = { workspace = true, optional = true }
6868
prost = { workspace = true }
69-
rand = { workspace = true }
7069
serde = { version = "1.0", optional = true }
7170
serde_json = { workspace = true, optional = true }
7271

0 commit comments

Comments
 (0)