Skip to content

Commit 0766648

Browse files
move expression_id to Inner
1 parent 2e7c261 commit 0766648

2 files changed

Lines changed: 47 additions & 40 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl FilterState {
5656
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
5757
///
5858
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
59+
#[derive(Debug)]
5960
pub struct DynamicFilterPhysicalExpr {
6061
/// The original children of this PhysicalExpr, if any.
6162
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
@@ -65,10 +66,6 @@ pub struct DynamicFilterPhysicalExpr {
6566
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
6667
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
6768
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
68-
/// Unique identifier for this dynamic filter.
69-
///
70-
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
71-
expression_id: u64,
7269
/// The source of dynamic filters.
7370
inner: Arc<RwLock<Inner>>,
7471
/// Broadcasts filter state (updates and completion) to all waiters.
@@ -82,11 +79,17 @@ pub struct DynamicFilterPhysicalExpr {
8279

8380
/// Atomic internal state of a [`DynamicFilterPhysicalExpr`].
8481
///
82+
/// `expression_id` lives here because it identifies the actual filter expression `expr`.
83+
/// Derived `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) are
84+
/// the same logical filter and must report the same `expression_id`.
85+
///
8586
/// **Warning:** exposed publicly solely so that proto (de)serialization in
8687
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
8788
/// or its layout as a stable API.
88-
#[derive(Debug, Clone)]
89+
#[derive(Clone)]
8990
pub struct Inner {
91+
/// A unique identifier for the expression.
92+
pub expression_id: u64,
9093
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
9194
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
9295
pub generation: u64,
@@ -97,28 +100,28 @@ pub struct Inner {
97100
pub is_complete: bool,
98101
}
99102

100-
// TODO: Include expression_id in debug output.
103+
// TODO: Include expression_id in Debug output.
101104
//
102105
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
103-
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
104-
// dynamic filter. This causes round trips to fail on the `expression_id`
105-
// because it is regenerated on deserialization.
106-
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
106+
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
107+
// dynamic filter. They auto-create one on decode with a fresh `expression_id`,
108+
// so a round-trip Debug comparison would diverge purely on the id even when
109+
// the rest of the state is preserved. Excluding it from Debug keeps those
110+
// roundtrip equality assertions meaningful until that work lands.
111+
impl std::fmt::Debug for Inner {
107112
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108-
f.debug_struct("DynamicFilterPhysicalExpr")
109-
.field("children", &self.children)
110-
.field("remapped_children", &self.remapped_children)
111-
.field("inner", &self.inner)
112-
.field("state_watch", &self.state_watch)
113-
.field("data_type", &self.data_type)
114-
.field("nullable", &self.nullable)
113+
f.debug_struct("Inner")
114+
.field("generation", &self.generation)
115+
.field("expr", &self.expr)
116+
.field("is_complete", &self.is_complete)
115117
.finish()
116118
}
117119
}
118120

119121
impl Inner {
120122
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
121123
Self {
124+
expression_id: random::<u64>(),
122125
// Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0.
123126
// This is not currently used anywhere but it seems useful to have this simple distinction.
124127
generation: 1,
@@ -201,7 +204,6 @@ impl DynamicFilterPhysicalExpr {
201204
Self {
202205
children,
203206
remapped_children: None, // Initially no remapped children
204-
expression_id: Self::new_expression_id(),
205207
inner: Arc::new(RwLock::new(Inner::new(inner))),
206208
state_watch,
207209
data_type: Arc::new(RwLock::new(None)),
@@ -272,6 +274,8 @@ impl DynamicFilterPhysicalExpr {
272274
let mut current = self.inner.write();
273275
let new_generation = current.generation + 1;
274276
*current = Inner {
277+
// Preserve the expression id across updates.
278+
expression_id: current.expression_id,
275279
generation: new_generation,
276280
expr: new_expr,
277281
is_complete: current.is_complete,
@@ -376,11 +380,6 @@ impl DynamicFilterPhysicalExpr {
376380
write!(f, " ]")
377381
}
378382

379-
/// Generate a new expression id for this filter.
380-
fn new_expression_id() -> u64 {
381-
random::<u64>()
382-
}
383-
384383
/// Return the filter's original children (before any remapping).
385384
///
386385
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
@@ -399,13 +398,11 @@ impl DynamicFilterPhysicalExpr {
399398
}
400399

401400
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
402-
/// proto deserialization to preserve `expression_id` across a roundtrip
403-
/// rather than minting a fresh one.
401+
/// proto deserialization.
404402
///
405403
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
406404
/// Not a stable API.
407405
pub fn from_parts(
408-
expression_id: u64,
409406
children: Vec<Arc<dyn PhysicalExpr>>,
410407
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
411408
inner: Inner,
@@ -424,7 +421,6 @@ impl DynamicFilterPhysicalExpr {
424421
Self {
425422
children,
426423
remapped_children,
427-
expression_id,
428424
inner: Arc::new(RwLock::new(inner)),
429425
state_watch,
430426
data_type: Arc::new(RwLock::new(None)),
@@ -437,12 +433,7 @@ impl DynamicFilterPhysicalExpr {
437433
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
438434
/// Not a stable API.
439435
pub fn inner(&self) -> Inner {
440-
let guard = self.inner.read();
441-
Inner {
442-
generation: guard.generation,
443-
expr: Arc::clone(&guard.expr),
444-
is_complete: guard.is_complete,
445-
}
436+
self.inner.read().clone()
446437
}
447438
}
448439

@@ -462,9 +453,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
462453
Ok(Arc::new(Self {
463454
children: self.children.clone(),
464455
remapped_children: Some(children),
465-
// Note that we ensure the derived expression linked to `self`
466-
// via the unique identifier.
467-
expression_id: self.expression_id,
456+
// Note: expression_id is preserved
468457
inner: Arc::clone(&self.inner),
469458
state_watch: self.state_watch.clone(),
470459
data_type: Arc::clone(&self.data_type),
@@ -547,7 +536,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
547536
}
548537

549538
fn expression_id(&self) -> Option<u64> {
550-
Some(self.expression_id)
539+
Some(self.inner.read().expression_id)
551540
}
552541
}
553542

@@ -1006,9 +995,8 @@ mod test {
1006995
.expect("Update should succeed");
1007996
reassigned.mark_complete();
1008997

1009-
// Capture the parts and reconstruct.
998+
// Capture the parts and reconstruct. `expression_id` rides in `inner`.
1010999
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
1011-
reassigned.expression_id,
10121000
reassigned.original_children().to_vec(),
10131001
reassigned.remapped_children().map(|r| r.to_vec()),
10141002
reassigned.inner(),
@@ -1073,5 +1061,24 @@ mod test {
10731061
derived_expression_id, source_expression_id,
10741062
"derived filters should carry forward the source expression id",
10751063
);
1064+
1065+
// `update()` rewrites the entire `Inner` struct in place; pin down
1066+
// that the rewrite preserves `expression_id`.
1067+
source
1068+
.update(lit(99) as Arc<dyn PhysicalExpr>)
1069+
.expect("update should succeed");
1070+
assert_eq!(
1071+
source.expression_id().unwrap(),
1072+
source_expression_id,
1073+
"update() must not change expression_id",
1074+
);
1075+
1076+
// `mark_complete()` also touches `Inner`; same invariant.
1077+
source.mark_complete();
1078+
assert_eq!(
1079+
source.expression_id().unwrap(),
1080+
source_expression_id,
1081+
"mark_complete() must not change expression_id",
1082+
);
10761083
}
10771084
}

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,10 @@ pub fn parse_physical_expr_with_converter(
550550

551551
let base_filter: Arc<dyn PhysicalExpr> =
552552
Arc::new(DynamicFilterPhysicalExpr::from_parts(
553-
expression_id,
554553
children,
555554
remapped_children,
556555
DynamicFilterInner {
556+
expression_id,
557557
generation: dynamic_filter.generation,
558558
expr: inner_expr,
559559
is_complete: dynamic_filter.is_complete,

0 commit comments

Comments
 (0)