Skip to content

Commit 03b390d

Browse files
timsaucerclaude
andauthored
Remove trait function as_any from datafusion-datasource (#21576)
## Which issue does this PR close? This addresses part of #21572 ## Rationale for this change This PR reduces the amount of boilerplate code that users need to write. ## What changes are included in this PR? Now that we have [trait upcasting](https://blog.rust-lang.org/2025/04/03/Rust-1.86.0/) since rust 1.86, we no longer need every implementation of these traits to have the as_any function that returns &self. This PR makes Any an supertrait and makes the appropriate casts when necessary. ## Are these changes tested? Existing unit tests ## Are there any user-facing changes? Yes, the users simply need to remove the as_any function. The upgrade guide is updated. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6cd7e83 commit 03b390d

File tree

39 files changed

+132
-320
lines changed

39 files changed

+132
-320
lines changed

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,7 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
318318
) -> Result<PhysicalPlanNode> {
319319
// Check if this is a DataSourceExec with adapter
320320
if let Some(exec) = plan.downcast_ref::<DataSourceExec>()
321-
&& let Some(config) =
322-
exec.data_source().as_any().downcast_ref::<FileScanConfig>()
321+
&& let Some(config) = exec.data_source().downcast_ref::<FileScanConfig>()
323322
&& let Some(adapter_factory) = &config.expr_adapter_factory
324323
&& let Some(tag) = extract_adapter_tag(adapter_factory.as_ref())
325324
{
@@ -482,7 +481,7 @@ fn inject_adapter_into_plan(
482481
adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
483482
) -> Result<Arc<dyn ExecutionPlan>> {
484483
if let Some(exec) = plan.downcast_ref::<DataSourceExec>()
485-
&& let Some(config) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
484+
&& let Some(config) = exec.data_source().downcast_ref::<FileScanConfig>()
486485
{
487486
let new_config = FileScanConfigBuilder::from(config.clone())
488487
.with_expr_adapter(Some(adapter_factory))
@@ -498,8 +497,7 @@ fn verify_adapter_in_plan(plan: &Arc<dyn ExecutionPlan>, label: &str) -> bool {
498497
// Walk the plan tree to find DataSourceExec with adapter
499498
fn check_plan(plan: &dyn ExecutionPlan) -> bool {
500499
if let Some(exec) = plan.downcast_ref::<DataSourceExec>()
501-
&& let Some(config) =
502-
exec.data_source().as_any().downcast_ref::<FileScanConfig>()
500+
&& let Some(config) = exec.data_source().downcast_ref::<FileScanConfig>()
503501
&& config.expr_adapter_factory.is_some()
504502
{
505503
return true;

datafusion-examples/examples/custom_data_source/custom_file_format.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! See `main.rs` for how to run it.
1919
20-
use std::{any::Any, sync::Arc};
20+
use std::sync::Arc;
2121

2222
use arrow::{
2323
array::{AsArray, RecordBatch, StringArray, UInt8Array},
@@ -104,10 +104,6 @@ impl TSVFileFormat {
104104

105105
#[async_trait::async_trait]
106106
impl FileFormat for TSVFileFormat {
107-
fn as_any(&self) -> &dyn Any {
108-
self
109-
}
110-
111107
fn get_ext(&self) -> String {
112108
"tsv".to_string()
113109
}
@@ -207,10 +203,6 @@ impl FileFormatFactory for TSVFileFactory {
207203
fn default(&self) -> Arc<dyn FileFormat> {
208204
todo!()
209205
}
210-
211-
fn as_any(&self) -> &dyn Any {
212-
self
213-
}
214206
}
215207

216208
impl GetExt for TSVFileFactory {

datafusion-examples/examples/sql_ops/frontend.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use datafusion::optimizer::{
3131
use datafusion::sql::planner::{ContextProvider, SqlToRel};
3232
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
3333
use datafusion::sql::sqlparser::parser::Parser;
34-
use std::any::Any;
3534
use std::sync::Arc;
3635

3736
/// This example shows how to use DataFusion's SQL planner to parse SQL text and
@@ -190,10 +189,6 @@ struct MyTableSource {
190189
}
191190

192191
impl TableSource for MyTableSource {
193-
fn as_any(&self) -> &dyn Any {
194-
self
195-
}
196-
197192
fn schema(&self) -> SchemaRef {
198193
self.schema.clone()
199194
}

datafusion/catalog/src/default_table_source.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! Default TableSource implementation used in DataFusion physical plans
1919
20+
use std::borrow::Cow;
2021
use std::sync::Arc;
21-
use std::{any::Any, borrow::Cow};
2222

2323
use crate::TableProvider;
2424

@@ -46,12 +46,6 @@ impl DefaultTableSource {
4646
}
4747

4848
impl TableSource for DefaultTableSource {
49-
/// Returns the table source as [`Any`] so that it can be
50-
/// downcast to a specific implementation.
51-
fn as_any(&self) -> &dyn Any {
52-
self
53-
}
54-
5549
/// Get a reference to the schema for this table
5650
fn schema(&self) -> SchemaRef {
5751
self.table_provider.schema()
@@ -97,11 +91,7 @@ pub fn provider_as_source(
9791
pub fn source_as_provider(
9892
source: &Arc<dyn TableSource>,
9993
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
100-
match source
101-
.as_ref()
102-
.as_any()
103-
.downcast_ref::<DefaultTableSource>()
104-
{
94+
match source.as_ref().downcast_ref::<DefaultTableSource>() {
10595
Some(source) => Ok(Arc::clone(&source.table_provider)),
10696
_ => internal_err!("TableSource was not DefaultTableSource"),
10797
}

datafusion/catalog/src/stream.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
//! TableProvider for stream sources, such as FIFO files
1919
20-
use std::any::Any;
2120
use std::fmt::Formatter;
2221
use std::fs::{File, OpenOptions};
2322
use std::io::BufReader;
@@ -401,10 +400,6 @@ impl DisplayAs for StreamWrite {
401400

402401
#[async_trait]
403402
impl DataSink for StreamWrite {
404-
fn as_any(&self) -> &dyn Any {
405-
self
406-
}
407-
408403
fn schema(&self) -> &SchemaRef {
409404
self.0.source.schema()
410405
}

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ mod tests {
297297
let listing_table = table_provider.downcast_ref::<ListingTable>().unwrap();
298298

299299
let format = listing_table.options().format.clone();
300-
let csv_format = format.as_any().downcast_ref::<CsvFormat>().unwrap();
300+
let csv_format = format.downcast_ref::<CsvFormat>().unwrap();
301301
let csv_options = csv_format.options().clone();
302302
assert_eq!(csv_options.schema_infer_max_rec, Some(1000));
303303
let listing_options = listing_table.options();
@@ -332,7 +332,7 @@ mod tests {
332332

333333
// Verify compression is used
334334
let format = listing_table.options().format.clone();
335-
let csv_format = format.as_any().downcast_ref::<CsvFormat>().unwrap();
335+
let csv_format = format.downcast_ref::<CsvFormat>().unwrap();
336336
let csv_options = csv_format.options().clone();
337337
assert_eq!(csv_options.compression, CompressionTypeVariant::GZIP);
338338

datafusion/core/src/physical_planner.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ pub trait ExtensionPlanner {
205205
/// _session_state: &SessionState,
206206
/// ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
207207
/// // Check if this is your custom table source
208-
/// if scan.source.as_any().is::<MyCustomTableSource>() {
208+
/// if scan.source.is::<MyCustomTableSource>() {
209209
/// // Create a custom execution plan for your table source
210210
/// let exec = MyCustomExec::new(
211211
/// scan.table_name.clone(),
@@ -732,9 +732,7 @@ impl DefaultPhysicalPlanner {
732732
op: WriteOp::Insert(insert_op),
733733
..
734734
}) => {
735-
if let Some(provider) =
736-
target.as_any().downcast_ref::<DefaultTableSource>()
737-
{
735+
if let Some(provider) = target.downcast_ref::<DefaultTableSource>() {
738736
let input_exec = children.one()?;
739737
provider
740738
.table_provider
@@ -753,9 +751,7 @@ impl DefaultPhysicalPlanner {
753751
input,
754752
..
755753
}) => {
756-
if let Some(provider) =
757-
target.as_any().downcast_ref::<DefaultTableSource>()
758-
{
754+
if let Some(provider) = target.downcast_ref::<DefaultTableSource>() {
759755
let filters = extract_dml_filters(input, table_name)?;
760756
provider
761757
.table_provider
@@ -777,9 +773,7 @@ impl DefaultPhysicalPlanner {
777773
input,
778774
..
779775
}) => {
780-
if let Some(provider) =
781-
target.as_any().downcast_ref::<DefaultTableSource>()
782-
{
776+
if let Some(provider) = target.downcast_ref::<DefaultTableSource>() {
783777
// For UPDATE, the assignments are encoded in the projection of input
784778
// We pass the filters and let the provider handle the projection
785779
let filters = extract_dml_filters(input, table_name)?;
@@ -804,9 +798,7 @@ impl DefaultPhysicalPlanner {
804798
op: WriteOp::Truncate,
805799
..
806800
}) => {
807-
if let Some(provider) =
808-
target.as_any().downcast_ref::<DefaultTableSource>()
809-
{
801+
if let Some(provider) = target.downcast_ref::<DefaultTableSource>() {
810802
provider
811803
.table_provider
812804
.truncate(session_state)
@@ -3112,7 +3104,6 @@ impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
31123104

31133105
#[cfg(test)]
31143106
mod tests {
3115-
use std::any::Any;
31163107
use std::cmp::Ordering;
31173108
use std::fmt::{self, Debug};
31183109
use std::ops::{BitAnd, Not};
@@ -4680,10 +4671,6 @@ digraph {
46804671
}
46814672

46824673
impl TableSource for MockTableSource {
4683-
fn as_any(&self) -> &dyn Any {
4684-
self
4685-
}
4686-
46874674
fn schema(&self) -> SchemaRef {
46884675
Arc::clone(&self.schema)
46894676
}
@@ -4710,7 +4697,7 @@ digraph {
47104697
scan: &TableScan,
47114698
_session_state: &SessionState,
47124699
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
4713-
if scan.source.as_any().is::<MockTableSource>() {
4700+
if scan.source.is::<MockTableSource>() {
47144701
Ok(Some(Arc::new(EmptyExec::new(Arc::clone(
47154702
scan.projected_schema.inner(),
47164703
)))))

datafusion/core/tests/optimizer/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
//! datafusion-functions crate.
2020
2121
use insta::assert_snapshot;
22-
use std::any::Any;
2322
use std::collections::HashMap;
2423
use std::sync::Arc;
2524

@@ -251,10 +250,6 @@ struct MyTableSource {
251250
}
252251

253252
impl TableSource for MyTableSource {
254-
fn as_any(&self) -> &dyn Any {
255-
self
256-
}
257-
258253
fn schema(&self) -> SchemaRef {
259254
self.schema.clone()
260255
}

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,7 @@ async fn list_files_with_session_level_cache() {
198198
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
199199
let data_source_exec = exec1.downcast_ref::<DataSourceExec>().unwrap();
200200
let data_source = data_source_exec.data_source();
201-
let parquet1 = data_source
202-
.as_any()
203-
.downcast_ref::<FileScanConfig>()
204-
.unwrap();
201+
let parquet1 = data_source.downcast_ref::<FileScanConfig>().unwrap();
205202

206203
assert_eq!(get_list_file_cache_size(&state1), 1);
207204
let fg = &parquet1.file_groups;
@@ -214,10 +211,7 @@ async fn list_files_with_session_level_cache() {
214211
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
215212
let data_source_exec = exec2.downcast_ref::<DataSourceExec>().unwrap();
216213
let data_source = data_source_exec.data_source();
217-
let parquet2 = data_source
218-
.as_any()
219-
.downcast_ref::<FileScanConfig>()
220-
.unwrap();
214+
let parquet2 = data_source.downcast_ref::<FileScanConfig>().unwrap();
221215

222216
assert_eq!(get_list_file_cache_size(&state2), 1);
223217
let fg2 = &parquet2.file_groups;
@@ -230,10 +224,7 @@ async fn list_files_with_session_level_cache() {
230224
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
231225
let data_source_exec = exec3.downcast_ref::<DataSourceExec>().unwrap();
232226
let data_source = data_source_exec.data_source();
233-
let parquet3 = data_source
234-
.as_any()
235-
.downcast_ref::<FileScanConfig>()
236-
.unwrap();
227+
let parquet3 = data_source.downcast_ref::<FileScanConfig>().unwrap();
237228

238229
assert_eq!(get_list_file_cache_size(&state1), 1);
239230
let fg = &parquet3.file_groups;

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,6 @@ fn test_memory_after_projection() -> Result<()> {
518518
.downcast_ref::<DataSourceExec>()
519519
.unwrap()
520520
.data_source()
521-
.as_any()
522521
.downcast_ref::<MemorySourceConfig>()
523522
.unwrap()
524523
.projection()

0 commit comments

Comments
 (0)