Skip to content

Commit 5c8a154

Browse files
timsaucerclaude
andcommitted
Use ExecutionPlan::downcast_ref and is() instead of as &dyn Any patterns
Replaces remaining `(x.as_ref() as &dyn Any).downcast_ref::<T>()` and `(x as &dyn std::any::Any).downcast_ref::<T>()` patterns with the new inherent methods on `dyn ExecutionPlan`. Also cleans up redundant `.as_ref()` calls before `.downcast_ref()` / `.is()` on Arc values, and simplifies `.downcast_ref::<T>().is_some()` to `.is::<T>()` where applicable. Removes now-unused `std::any::Any` imports. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 0a2881f commit 5c8a154

29 files changed

Lines changed: 127 additions & 218 deletions

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
//! DeltaScan
3333
//! ```
3434
35-
use std::any::Any;
3635
use std::fmt::Debug;
3736
use std::sync::Arc;
3837

@@ -157,10 +156,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
157156
}
158157

159158
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
160-
if (node.as_ref() as &dyn Any)
161-
.downcast_ref::<ParentExec>()
162-
.is_some()
163-
{
159+
if node.is::<ParentExec>() {
164160
buf.extend_from_slice("ParentExec".as_bytes());
165161
Ok(())
166162
} else {
@@ -239,10 +235,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
239235
}
240236

241237
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
242-
if (node.as_ref() as &dyn Any)
243-
.downcast_ref::<ChildExec>()
244-
.is_some()
245-
{
238+
if node.is::<ChildExec>() {
246239
buf.extend_from_slice("ChildExec".as_bytes());
247240
Ok(())
248241
} else {

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3624,7 +3624,7 @@ mod tests {
36243624
.build()?;
36253625

36263626
let execution_plan = plan(&logical_plan).await?;
3627-
let final_hash_agg = (execution_plan.as_ref() as &dyn Any)
3627+
let final_hash_agg = execution_plan
36283628
.downcast_ref::<AggregateExec>()
36293629
.expect("hash aggregate");
36303630
assert_eq!(
@@ -3651,7 +3651,7 @@ mod tests {
36513651
.build()?;
36523652

36533653
let execution_plan = plan(&logical_plan).await?;
3654-
let final_hash_agg = (execution_plan.as_ref() as &dyn Any)
3654+
let final_hash_agg = execution_plan
36553655
.downcast_ref::<AggregateExec>()
36563656
.expect("hash aggregate");
36573657
assert_eq!(

datafusion/core/src/test_util/parquet.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,7 @@ impl TestParquetFile {
196196
/// Recursively searches for DataSourceExec and returns the metrics
197197
/// on the first one it finds
198198
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
199-
if let Some(data_source_exec) =
200-
(plan.as_ref() as &dyn std::any::Any).downcast_ref::<DataSourceExec>()
199+
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
201200
&& data_source_exec
202201
.downcast_to_file_source::<ParquetSource>()
203202
.is_some()

datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -547,9 +547,7 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) {
547547
type Node = Arc<dyn ExecutionPlan>;
548548

549549
fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
550-
if let Some(exec) =
551-
(node.as_ref() as &dyn std::any::Any).downcast_ref::<AggregateExec>()
552-
{
550+
if let Some(exec) = node.downcast_ref::<AggregateExec>() {
553551
if self.expected_sort {
554552
assert!(matches!(
555553
exec.input_order_mode(),

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
1918
use std::fs;
2019
use std::sync::Arc;
2120

@@ -197,9 +196,7 @@ async fn list_files_with_session_level_cache() {
197196
//Session 1 first time list files
198197
assert_eq!(get_list_file_cache_size(&state1), 0);
199198
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
200-
let data_source_exec = (exec1.as_ref() as &dyn Any)
201-
.downcast_ref::<DataSourceExec>()
202-
.unwrap();
199+
let data_source_exec = exec1.downcast_ref::<DataSourceExec>().unwrap();
203200
let data_source = data_source_exec.data_source();
204201
let parquet1 = data_source
205202
.as_any()
@@ -215,9 +212,7 @@ async fn list_files_with_session_level_cache() {
215212
//check session 1 cache result not show in session 2
216213
assert_eq!(get_list_file_cache_size(&state2), 0);
217214
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
218-
let data_source_exec = (exec2.as_ref() as &dyn Any)
219-
.downcast_ref::<DataSourceExec>()
220-
.unwrap();
215+
let data_source_exec = exec2.downcast_ref::<DataSourceExec>().unwrap();
221216
let data_source = data_source_exec.data_source();
222217
let parquet2 = data_source
223218
.as_any()
@@ -233,9 +228,7 @@ async fn list_files_with_session_level_cache() {
233228
//check session 1 cache result not show in session 2
234229
assert_eq!(get_list_file_cache_size(&state1), 1);
235230
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
236-
let data_source_exec = (exec3.as_ref() as &dyn Any)
237-
.downcast_ref::<DataSourceExec>()
238-
.unwrap();
231+
let data_source_exec = exec3.downcast_ref::<DataSourceExec>().unwrap();
239232
let data_source = data_source_exec.data_source();
240233
let parquet3 = data_source
241234
.as_any()

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
1918
use std::fmt::Debug;
2019
use std::ops::Deref;
2120
use std::sync::Arc;
@@ -3892,7 +3891,8 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
38923891
let result = replace_order_preserving_variants(dist_context)?;
38933892

38943893
// Verify the plan was transformed to CoalescePartitionsExec
3895-
(result.plan.as_ref() as &dyn Any)
3894+
result
3895+
.plan
38963896
.downcast_ref::<CoalescePartitionsExec>()
38973897
.expect("Expected CoalescePartitionsExec");
38983898

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
1918
use std::sync::{Arc, LazyLock};
2019

2120
use arrow::{
@@ -4324,7 +4323,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
43244323
.unwrap();
43254324

43264325
// Get the HashJoinExec to check the dynamic filter
4327-
let hash_join = (plan.as_ref() as &dyn Any)
4326+
let hash_join = plan
43284327
.downcast_ref::<HashJoinExec>()
43294328
.expect("Plan should be HashJoinExec");
43304329

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ async fn test_join_with_swap() {
232232
.unwrap();
233233

234234
let swapping_projection = optimized_join
235-
.as_ref()
236235
.downcast_ref::<ProjectionExec>()
237236
.expect("A proj is required to swap columns back to their original order");
238237

@@ -246,7 +245,6 @@ async fn test_join_with_swap() {
246245

247246
let swapped_join = swapping_projection
248247
.input()
249-
.as_ref()
250248
.downcast_ref::<HashJoinExec>()
251249
.expect("The type of the plan should not be changed");
252250

@@ -295,7 +293,6 @@ async fn test_left_join_no_swap() {
295293
.unwrap();
296294

297295
let swapped_join = optimized_join
298-
.as_ref()
299296
.downcast_ref::<HashJoinExec>()
300297
.expect("The type of the plan should not be changed");
301298

@@ -345,12 +342,9 @@ async fn test_join_with_swap_semi() {
345342
.optimize(Arc::new(join), &ConfigOptions::new())
346343
.unwrap();
347344

348-
let swapped_join = optimized_join
349-
.as_ref()
350-
.downcast_ref::<HashJoinExec>()
351-
.expect(
352-
"A proj is not required to swap columns back to their original order",
353-
);
345+
let swapped_join = optimized_join.downcast_ref::<HashJoinExec>().expect(
346+
"A proj is not required to swap columns back to their original order",
347+
);
354348

355349
assert_eq!(swapped_join.schema().fields().len(), 1);
356350
assert_eq!(
@@ -401,12 +395,9 @@ async fn test_join_with_swap_mark() {
401395
.optimize(Arc::new(join), &ConfigOptions::new())
402396
.unwrap();
403397

404-
let swapped_join = optimized_join
405-
.as_ref()
406-
.downcast_ref::<HashJoinExec>()
407-
.expect(
408-
"A proj is not required to swap columns back to their original order",
409-
);
398+
let swapped_join = optimized_join.downcast_ref::<HashJoinExec>().expect(
399+
"A proj is not required to swap columns back to their original order",
400+
);
410401

411402
assert_eq!(swapped_join.schema().fields().len(), 2);
412403
assert_eq!(
@@ -534,7 +525,6 @@ async fn test_join_no_swap() {
534525
.unwrap();
535526

536527
let swapped_join = optimized_join
537-
.as_ref()
538528
.downcast_ref::<HashJoinExec>()
539529
.expect("The type of the plan should not be changed");
540530

@@ -583,7 +573,6 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
583573
.unwrap();
584574

585575
let swapping_projection = optimized_join
586-
.as_ref()
587576
.downcast_ref::<ProjectionExec>()
588577
.expect("A proj is required to swap columns back to their original order");
589578

@@ -597,7 +586,6 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
597586

598587
let swapped_join = swapping_projection
599588
.input()
600-
.as_ref()
601589
.downcast_ref::<NestedLoopJoinExec>()
602590
.expect("The type of the plan should not be changed");
603591

@@ -664,7 +652,6 @@ async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
664652
.unwrap();
665653

666654
let swapped_join = optimized_join
667-
.as_ref()
668655
.downcast_ref::<NestedLoopJoinExec>()
669656
.expect("The type of the plan should not be changed");
670657

@@ -759,7 +746,6 @@ async fn test_hash_join_swap_on_joins_with_projections(
759746
.swap_inputs(PartitionMode::Partitioned)
760747
.expect("swap_hash_join must support joins with projections");
761748
let swapped_join = swapped
762-
.as_ref()
763749
.downcast_ref::<HashJoinExec>()
764750
.expect(
765751
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
@@ -928,18 +914,15 @@ fn check_join_partition_mode(
928914

929915
if !is_swapped {
930916
let swapped_join = optimized_join
931-
.as_ref()
932917
.downcast_ref::<HashJoinExec>()
933918
.expect("The type of the plan should not be changed");
934919
assert_eq!(*swapped_join.partition_mode(), expected_mode);
935920
} else {
936921
let swapping_projection = optimized_join
937-
.as_ref()
938922
.downcast_ref::<ProjectionExec>()
939923
.expect("A proj is required to swap columns back to their original order");
940924
let swapped_join = swapping_projection
941925
.input()
942-
.as_ref()
943926
.downcast_ref::<HashJoinExec>()
944927
.expect("The type of the plan should not be changed");
945928

@@ -1589,10 +1572,9 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
15891572
JoinSelection::new().optimize(Arc::clone(&join), &ConfigOptions::new())?;
15901573

15911574
// If swap did happen
1592-
let projection_added = optimized_join_plan.as_ref().is::<ProjectionExec>();
1575+
let projection_added = optimized_join_plan.is::<ProjectionExec>();
15931576
let plan = if projection_added {
15941577
let proj = optimized_join_plan
1595-
.as_ref()
15961578
.downcast_ref::<ProjectionExec>()
15971579
.expect("A proj is required to swap columns back to their original order");
15981580
Arc::<dyn ExecutionPlan>::clone(proj.input())
@@ -1606,7 +1588,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
16061588
join_type,
16071589
mode,
16081590
..
1609-
}) = plan.as_ref().downcast_ref::<HashJoinExec>()
1591+
}) = plan.downcast_ref::<HashJoinExec>()
16101592
{
16111593
let left_changed = Arc::ptr_eq(left, &right_exec);
16121594
let right_changed = Arc::ptr_eq(right, &left_exec);

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
1918
use std::sync::Arc;
2019

2120
use arrow::compute::SortOptions;
@@ -514,7 +513,8 @@ fn test_memory_after_projection() -> Result<()> {
514513
);
515514

516515
assert_eq!(
517-
(after_optimize.clone().as_ref() as &dyn Any)
516+
after_optimize
517+
.clone()
518518
.downcast_ref::<DataSourceExec>()
519519
.unwrap()
520520
.data_source()
@@ -597,9 +597,7 @@ fn test_streaming_table_after_projection() -> Result<()> {
597597
let after_optimize =
598598
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
599599

600-
let result = (after_optimize.as_ref() as &dyn Any)
601-
.downcast_ref::<StreamingTableExec>()
602-
.unwrap();
600+
let result = after_optimize.downcast_ref::<StreamingTableExec>().unwrap();
603601
assert_eq!(
604602
result.partition_schema(),
605603
&Arc::new(Schema::new(vec![
@@ -789,7 +787,7 @@ fn test_output_req_after_projection() -> Result<()> {
789787
.into(),
790788
);
791789
assert_eq!(
792-
(after_optimize.as_ref() as &dyn Any)
790+
after_optimize
793791
.downcast_ref::<OutputRequirementExec>()
794792
.unwrap()
795793
.required_input_ordering()[0]
@@ -801,7 +799,7 @@ fn test_output_req_after_projection() -> Result<()> {
801799
Arc::new(Column::new("new_a", 1)),
802800
Arc::new(Column::new("b", 2)),
803801
];
804-
if let Distribution::HashPartitioned(vec) = (after_optimize.as_ref() as &dyn Any)
802+
if let Distribution::HashPartitioned(vec) = after_optimize
805803
.downcast_ref::<OutputRequirementExec>()
806804
.unwrap()
807805
.required_input_distribution()[0]
@@ -1031,7 +1029,7 @@ fn test_join_after_projection() -> Result<()> {
10311029

10321030
assert_eq!(
10331031
expected_filter_col_ind,
1034-
(after_optimize.as_ref() as &dyn Any)
1032+
after_optimize
10351033
.downcast_ref::<SymmetricHashJoinExec>()
10361034
.unwrap()
10371035
.filter()
@@ -1394,7 +1392,7 @@ fn test_repartition_after_projection() -> Result<()> {
13941392
);
13951393

13961394
assert_eq!(
1397-
(after_optimize.as_ref() as &dyn Any)
1395+
after_optimize
13981396
.downcast_ref::<RepartitionExec>()
13991397
.unwrap()
14001398
.partitioning()

datafusion/core/tests/user_defined/insert_operation.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ async fn insert_operation_is_passed_correctly_to_table_provider() {
5858
async fn assert_insert_op(ctx: &SessionContext, sql: &str, insert_op: InsertOp) {
5959
let df = ctx.sql(sql).await.unwrap();
6060
let plan = df.create_physical_plan().await.unwrap();
61-
let exec = (plan.as_ref() as &dyn Any)
62-
.downcast_ref::<TestInsertExec>()
63-
.unwrap();
61+
let exec = plan.downcast_ref::<TestInsertExec>().unwrap();
6462
assert_eq!(exec.op, insert_op);
6563
}
6664

0 commit comments

Comments
 (0)