Skip to content

Commit 50aa138

Browse files
[branch-54] Support transparent ExecutionPlan downcasts (#22565)
## Which issue does this PR close? - Refs #22557. - Companion PR to #22559, targeting `branch-54`. ## Rationale for this change DataFusion 54 changed `ExecutionPlan` downcasting to use the `Any` supertrait directly. That removes `ExecutionPlan::as_any`, which had also served as a customization point for wrapper nodes: wrappers could identify as themselves internally while exposing the wrapped plan type to normal downcast-based inspection. This PR adds an explicit `ExecutionPlan::downcast_delegate()` hook for wrapper nodes that want their public `ExecutionPlan` downcast identity to be delegated to another plan. The proposed behavior intentionally preserves the old `as_any` override semantics: when a node opts into downcast delegation, intermediate delegating wrappers are invisible to `dyn ExecutionPlan::is::<T>()` and `downcast_ref::<T>()`. ## What changes are included in this PR? - Adds `ExecutionPlan::downcast_delegate()` with a default implementation returning `None`. - Updates `dyn ExecutionPlan::is::<T>()` and `downcast_ref::<T>()` to delegate to `downcast_delegate()` when present, otherwise use the current concrete plan type. - Documents that `downcast_delegate()` is only for type introspection and is independent from `children()` / plan traversal. - Adds tests for direct and nested downcast-delegating wrappers, including that intermediate delegating wrappers remain invisible to normal downcast-based inspection. ## Are these changes tested? Yes. - `cargo test -p datafusion-physical-plan execution_plan_downcast` - `cargo test -p datafusion-physical-plan --lib` - `cargo fmt --all -- --check` - `git diff --check` ## Are there any user-facing changes? Yes. This adds a new public `ExecutionPlan` trait method with a default implementation, and it changes `ExecutionPlan` downcast helpers to honor wrappers that explicitly opt into delegating public downcast identity.
1 parent 1321d60 commit 50aa138

1 file changed

Lines changed: 108 additions & 2 deletions

File tree

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
115115
}
116116
}
117117

118+
/// Returns the plan that provides this plan's public
119+
/// [`ExecutionPlan`] downcast identity.
120+
///
121+
/// This hook is for wrapper nodes that delegate their public downcast
122+
/// identity to another plan while adding cross-cutting behavior such as
123+
/// instrumentation. The default implementation returns `None`, meaning this
124+
/// plan's concrete type is used for type introspection.
125+
///
126+
/// Most `ExecutionPlan` implementations should use the default `None`;
127+
/// override this only for wrapper plans that intentionally delegate their
128+
/// public downcast identity to another plan.
129+
///
130+
/// The `is` and `downcast_ref` helpers follow the returned delegate instead
131+
/// of checking the current concrete type, making intermediate delegating
132+
/// wrappers invisible to normal downcast-based inspection.
133+
///
134+
/// Implementations that opt in should return the delegate plan, not `self`.
135+
///
136+
/// This is independent from [`Self::children`] and should not be used for
137+
/// plan traversal or optimizer rewrites.
138+
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
139+
None
140+
}
141+
118142
/// Get the schema for this execution plan
119143
fn schema(&self) -> SchemaRef {
120144
Arc::clone(self.properties().schema())
@@ -718,20 +742,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
718742
impl dyn ExecutionPlan {
719743
/// Returns `true` if the plan is of type `T`.
720744
///
745+
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
746+
/// to it.
747+
///
721748
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
722749
/// called on `Arc<dyn ExecutionPlan>` via auto-deref.
723750
pub fn is<T: ExecutionPlan>(&self) -> bool {
724-
(self as &dyn Any).is::<T>()
751+
match self.downcast_delegate() {
752+
Some(delegate) => delegate.is::<T>(),
753+
None => (self as &dyn Any).is::<T>(),
754+
}
725755
}
726756

727757
/// Attempts to downcast this plan to a concrete type `T`, returning `None`
728758
/// if the plan is not of that type.
729759
///
760+
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
761+
/// to it.
762+
///
730763
/// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
731764
/// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
732765
/// downcast the `Arc` itself.
733766
pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
734-
(self as &dyn Any).downcast_ref()
767+
match self.downcast_delegate() {
768+
Some(delegate) => delegate.downcast_ref::<T>(),
769+
None => (self as &dyn Any).downcast_ref(),
770+
}
735771
}
736772
}
737773

@@ -1642,6 +1678,58 @@ mod tests {
16421678
}
16431679
}
16441680

1681+
#[derive(Debug)]
1682+
struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);
1683+
1684+
impl DisplayAs for DowncastDelegatingExec {
1685+
fn fmt_as(
1686+
&self,
1687+
_t: DisplayFormatType,
1688+
_f: &mut std::fmt::Formatter,
1689+
) -> std::fmt::Result {
1690+
unimplemented!()
1691+
}
1692+
}
1693+
1694+
impl ExecutionPlan for DowncastDelegatingExec {
1695+
fn name(&self) -> &'static str {
1696+
Self::static_name()
1697+
}
1698+
1699+
fn properties(&self) -> &Arc<PlanProperties> {
1700+
unimplemented!()
1701+
}
1702+
1703+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1704+
vec![]
1705+
}
1706+
1707+
fn with_new_children(
1708+
self: Arc<Self>,
1709+
_: Vec<Arc<dyn ExecutionPlan>>,
1710+
) -> Result<Arc<dyn ExecutionPlan>> {
1711+
unimplemented!()
1712+
}
1713+
1714+
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
1715+
Some(self.0.as_ref())
1716+
}
1717+
1718+
fn execute(
1719+
&self,
1720+
_partition: usize,
1721+
_context: Arc<TaskContext>,
1722+
) -> Result<SendableRecordBatchStream> {
1723+
unimplemented!()
1724+
}
1725+
1726+
fn partition_statistics(
1727+
&self,
1728+
_partition: Option<usize>,
1729+
) -> Result<Arc<Statistics>> {
1730+
unimplemented!()
1731+
}
1732+
}
16451733
#[test]
16461734
fn test_execution_plan_name() {
16471735
let schema1 = Arc::new(Schema::empty());
@@ -1654,6 +1742,24 @@ mod tests {
16541742
assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
16551743
}
16561744

1745+
#[test]
1746+
fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
1747+
let schema = Arc::new(Schema::empty());
1748+
let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
1749+
let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner));
1750+
let nested: Arc<dyn ExecutionPlan> =
1751+
Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));
1752+
1753+
for plan in [wrapped.as_ref(), nested.as_ref()] {
1754+
assert!(!plan.is::<DowncastDelegatingExec>());
1755+
assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
1756+
assert!(plan.is::<EmptyExec>());
1757+
assert!(plan.downcast_ref::<EmptyExec>().is_some());
1758+
assert!(!plan.is::<RenamedEmptyExec>());
1759+
assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
1760+
}
1761+
}
1762+
16571763
/// A compilation test to ensure that the `ExecutionPlan::name()` method can
16581764
/// be called from a trait object.
16591765
/// Related ticket: https://github.com/apache/datafusion/pull/11047

0 commit comments

Comments
 (0)