Skip to content

Commit 21a2dbc

Browse files
authored
Upgrade DataFusion to 54 (#8044)
## Summary This PR includes an upgrade of our DataFusion dependency/integration to the upcoming 54 release. It aims to make the minimal amount of changes, and implementing the new `Morselizer` API will be part of a future PR (I have an old PR that was based on an earlier PoC, I'll try and pull stuff from there when the time comes). - Upstream release issue: apache/datafusion#21080 Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent d2d2e29 commit 21a2dbc

12 files changed

Lines changed: 1010 additions & 675 deletions

File tree

Cargo.lock

Lines changed: 925 additions & 546 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,17 @@ anyhow = "1.0.97"
9999
arbitrary = "1.3.2"
100100
arc-swap = "1.9"
101101
arcref = "0.2.0"
102-
arrow-arith = "58"
103-
arrow-array = "58"
104-
arrow-buffer = "58"
105-
arrow-cast = "58"
106-
arrow-data = "58"
107-
arrow-ipc = "58"
108-
arrow-ord = "58"
109-
arrow-row = "58"
110-
arrow-schema = "58"
111-
arrow-select = "58"
112-
arrow-string = "58"
102+
arrow-arith = "58.3"
103+
arrow-array = "58.3"
104+
arrow-buffer = "58.3"
105+
arrow-cast = "58.3"
106+
arrow-data = "58.3"
107+
arrow-ipc = "58.3"
108+
arrow-ord = "58.3"
109+
arrow-row = "58.3"
110+
arrow-schema = "58.3"
111+
arrow-select = "58.3"
112+
arrow-string = "58.3"
113113
async-fs = "2.2.0"
114114
async-lock = "3.4"
115115
async-stream = "0.3.6"
@@ -137,20 +137,20 @@ cudarc = { version = "0.19.0", features = [
137137
] }
138138
custom-labels = "0.4.4"
139139
dashmap = "6.1.0"
140-
datafusion = { version = "53", default-features = false, features = ["sql"] }
141-
datafusion-catalog = { version = "53" }
142-
datafusion-common = { version = "53" }
143-
datafusion-common-runtime = { version = "53" }
144-
datafusion-datasource = { version = "53", default-features = false }
145-
datafusion-execution = { version = "53" }
146-
datafusion-expr = { version = "53" }
147-
datafusion-functions = { version = "53" }
148-
datafusion-physical-expr = { version = "53" }
149-
datafusion-physical-expr-adapter = { version = "53" }
150-
datafusion-physical-expr-common = { version = "53" }
151-
datafusion-physical-plan = { version = "53" }
152-
datafusion-pruning = { version = "53" }
153-
datafusion-sqllogictest = { version = "53" }
140+
datafusion = { version = "54", default-features = false, features = ["sql"] }
141+
datafusion-catalog = { version = "54" }
142+
datafusion-common = { version = "54" }
143+
datafusion-common-runtime = { version = "54" }
144+
datafusion-datasource = { version = "54", default-features = false }
145+
datafusion-execution = { version = "54" }
146+
datafusion-expr = { version = "54" }
147+
datafusion-functions = { version = "54" }
148+
datafusion-physical-expr = { version = "54" }
149+
datafusion-physical-expr-adapter = { version = "54" }
150+
datafusion-physical-expr-common = { version = "54" }
151+
datafusion-physical-plan = { version = "54" }
152+
datafusion-pruning = { version = "54" }
153+
datafusion-sqllogictest = { version = "54" }
154154
dirs = "6.0.0"
155155
divan = { package = "codspeed-divan-compat", version = "4.0.4" }
156156
enum-iterator = "2.0.0"
@@ -164,7 +164,7 @@ get_dir = "0.5.0"
164164
glob = "0.3.2"
165165
goldenfile = "1"
166166
half = { version = "2.7.1", features = ["std", "num-traits"] }
167-
hashbrown = "0.17.0"
167+
hashbrown = "0.17.1"
168168
humansize = "2.1.3"
169169
indicatif = "0.18.0"
170170
insta = "1.43"
@@ -190,17 +190,17 @@ noodles-bgzf = "0.47.0"
190190
noodles-vcf = { version = "0.88.0", features = ["async"] }
191191
num-traits = "0.2.19"
192192
num_enum = { version = "0.7.3", default-features = false }
193-
object_store = { version = "0.13.1", default-features = false }
193+
object_store = { version = "0.13.2", default-features = false }
194194
once_cell = "1.21"
195195
oneshot = { version = "0.2.0", features = ["async"] }
196196
onpair = { version = "0.0.4" }
197197
opentelemetry = "0.32.0"
198198
opentelemetry-otlp = "0.32.0"
199199
opentelemetry_sdk = "0.32.0"
200200
parking_lot = { version = "0.12.3", features = ["nightly"] }
201-
parquet = "58"
202-
parquet-variant = "58"
203-
parquet-variant-compute = "58"
201+
parquet = "58.3"
202+
parquet-variant = "58.3"
203+
parquet-variant-compute = "58.3"
204204
paste = "1.0.15"
205205
pco = "1.0.1"
206206
pin-project-lite = "0.2.15"
@@ -253,7 +253,7 @@ tempfile = "3"
253253
termtree = { version = "1.0" }
254254
test-with = "0.16"
255255
thiserror = "2.0.3"
256-
tokio = { version = "1.48" }
256+
tokio = { version = "1.52" }
257257
tokio-stream = "0.1.17"
258258
tokio-util = "0.7.17"
259259
vortex-array-macros = { version = "0.1.0", path = "./vortex-array-macros" }
@@ -264,7 +264,7 @@ tracing = { version = "0.1.41", default-features = false }
264264
tracing-perfetto = "0.1.5"
265265
tracing-subscriber = "0.3"
266266
url = "2.5.7"
267-
uuid = { version = "1.21", features = ["js"] }
267+
uuid = { version = "1.23", features = ["js"] }
268268
wasm-bindgen-futures = "0.4.54"
269269
xshell = "0.2.6"
270270
zigzag = "0.1.0"

benchmarks/datafusion-bench/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use datafusion::datasource::provider::DefaultTableFactory;
1414
use datafusion::execution::SessionStateBuilder;
1515
use datafusion::execution::cache::DefaultListFilesCache;
1616
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
17-
use datafusion::execution::cache::cache_unit::DefaultFileStatisticsCache;
17+
use datafusion::execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
1818
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
1919
use datafusion::prelude::SessionConfig;
2020
use datafusion::prelude::SessionContext;
@@ -37,7 +37,7 @@ pub fn get_session_context() -> SessionContext {
3737
let file_static_cache = Arc::new(DefaultFileStatisticsCache::default());
3838
let list_file_cache = Arc::new(DefaultListFilesCache::default());
3939
let cache_config = CacheManagerConfig::default()
40-
.with_files_statistics_cache(Some(file_static_cache))
40+
.with_file_statistics_cache(Some(file_static_cache))
4141
.with_list_files_cache(Some(list_file_cache));
4242
rt_builder = rt_builder.with_cache_manager(cache_config);
4343

benchmarks/datafusion-bench/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,14 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
278278
None => config.infer_schema(&session.state()).await?,
279279
};
280280

281-
let listing_table = Arc::new(ListingTable::try_new(config)?);
281+
let listing_table = Arc::new(
282+
ListingTable::try_new(config)?.with_cache(
283+
session
284+
.runtime_env()
285+
.cache_manager
286+
.get_file_statistic_cache(),
287+
),
288+
);
282289

283290
session.register_table(table.name, listing_table)?;
284291
}

vortex-datafusion/src/convert/exprs.rs

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ impl DefaultExpressionConvertor {
127127
let mut result = self.convert(source_expr.as_ref())?;
128128
for expr in field_names {
129129
let field_name = expr
130-
.as_any()
131130
.downcast_ref::<df_expr::Literal>()
132131
.ok_or_else(|| exec_datafusion_err!("get_field field name must be a literal"))?
133132
.value()
@@ -195,19 +194,19 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
195194
fn convert(&self, df: &dyn PhysicalExpr) -> DFResult<Expression> {
196195
// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
197196
// for that node, up to any `and` or `or` node.
198-
if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
197+
if let Some(binary_expr) = df.downcast_ref::<df_expr::BinaryExpr>() {
199198
let left = self.convert(binary_expr.left().as_ref())?;
200199
let right = self.convert(binary_expr.right().as_ref())?;
201200
let operator = try_operator_from_df(binary_expr.op())?;
202201

203202
return Ok(Binary.new_expr(operator, [left, right]));
204203
}
205204

206-
if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
205+
if let Some(col_expr) = df.downcast_ref::<df_expr::Column>() {
207206
return Ok(get_item(col_expr.name().to_owned(), root()));
208207
}
209208

210-
if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
209+
if let Some(like) = df.downcast_ref::<df_expr::LikeExpr>() {
211210
let child = self.convert(like.expr().as_ref())?;
212211
let pattern = self.convert(like.pattern().as_ref())?;
213212
return Ok(Like.new_expr(
@@ -219,42 +218,34 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
219218
));
220219
}
221220

222-
if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
221+
if let Some(literal) = df.downcast_ref::<df_expr::Literal>() {
223222
let value = Scalar::from_df(literal.value());
224223
return Ok(lit(value));
225224
}
226225

227-
if let Some(cast_expr) = df.as_any().downcast_ref::<df_expr::CastExpr>() {
228-
let cast_dtype = DType::from_arrow((cast_expr.cast_type(), Nullability::Nullable));
226+
if let Some(cast_expr) = df.downcast_ref::<df_expr::CastExpr>() {
227+
let cast_dtype = DType::from_arrow(cast_expr.target_field().as_ref());
229228
let child = self.convert(cast_expr.expr().as_ref())?;
230229
return Ok(cast(child, cast_dtype));
231230
}
232231

233-
if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
234-
let target = cast_col_expr.target_field();
235-
236-
let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
237-
let child = self.convert(cast_col_expr.expr().as_ref())?;
238-
return Ok(cast(child, target_dtype));
239-
}
240-
241-
if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
232+
if let Some(is_null_expr) = df.downcast_ref::<df_expr::IsNullExpr>() {
242233
let arg = self.convert(is_null_expr.arg().as_ref())?;
243234
return Ok(is_null(arg));
244235
}
245236

246-
if let Some(is_not_null_expr) = df.as_any().downcast_ref::<df_expr::IsNotNullExpr>() {
237+
if let Some(is_not_null_expr) = df.downcast_ref::<df_expr::IsNotNullExpr>() {
247238
let arg = self.convert(is_not_null_expr.arg().as_ref())?;
248239
return Ok(is_not_null(arg));
249240
}
250241

251-
if let Some(in_list) = df.as_any().downcast_ref::<df_expr::InListExpr>() {
242+
if let Some(in_list) = df.downcast_ref::<df_expr::InListExpr>() {
252243
let value = self.convert(in_list.expr().as_ref())?;
253244
let list_elements: Vec<_> = in_list
254245
.list()
255246
.iter()
256247
.map(|e| {
257-
if let Some(lit) = e.as_any().downcast_ref::<df_expr::Literal>() {
248+
if let Some(lit) = e.downcast_ref::<df_expr::Literal>() {
258249
Ok(Scalar::from_df(lit.value()))
259250
} else {
260251
Err(exec_datafusion_err!("Failed to cast sub-expression"))
@@ -272,11 +263,11 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
272263
return Ok(if in_list.negated() { not(expr) } else { expr });
273264
}
274265

275-
if let Some(scalar_fn) = df.as_any().downcast_ref::<ScalarFunctionExpr>() {
266+
if let Some(scalar_fn) = df.downcast_ref::<ScalarFunctionExpr>() {
276267
return self.try_convert_scalar_function(scalar_fn);
277268
}
278269

279-
if let Some(case_expr) = df.as_any().downcast_ref::<df_expr::CaseExpr>() {
270+
if let Some(case_expr) = df.downcast_ref::<df_expr::CaseExpr>() {
280271
return self.try_convert_case_expr(case_expr);
281272
}
282273

@@ -297,7 +288,7 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
297288
for projection_expr in source_projection.iter() {
298289
let r = projection_expr.expr.apply(|node| {
299290
// We only pull column children of scalar functions that we can't push into the scan.
300-
if let Some(scalar_fn_expr) = node.as_any().downcast_ref::<ScalarFunctionExpr>()
291+
if let Some(scalar_fn_expr) = node.downcast_ref::<ScalarFunctionExpr>()
301292
&& !can_scalar_fn_be_pushed_down(scalar_fn_expr)
302293
{
303294
scan_projection.extend(
@@ -312,7 +303,7 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
312303

313304
// DataFusion assumes different decimal types can be coerced.
314305
// Vortex expects a perfect match so we don't push it down.
315-
if let Some(binary_expr) = node.as_any().downcast_ref::<df_expr::BinaryExpr>()
306+
if let Some(binary_expr) = node.downcast_ref::<df_expr::BinaryExpr>()
316307
&& binary_expr.op().is_numerical_operators()
317308
&& (is_decimal(&binary_expr.left().data_type(input_schema)?)
318309
&& is_decimal(&binary_expr.right().data_type(input_schema)?))
@@ -406,14 +397,13 @@ fn try_operator_from_df(value: &DFOperator) -> DFResult<Operator> {
406397
}
407398
}
408399

409-
fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
400+
fn can_be_pushed_down_impl(expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
410401
// We currently do not support pushdown of dynamic expressions in DF.
411402
// See issue: https://github.com/vortex-data/vortex/issues/4034
412-
if is_dynamic_physical_expr(df_expr) {
403+
if is_dynamic_physical_expr(expr) {
413404
return false;
414405
}
415406

416-
let expr = df_expr.as_any();
417407
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
418408
can_binary_be_pushed_down(binary, schema)
419409
} else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
@@ -429,9 +419,6 @@ fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) ->
429419
} else if let Some(cast_expr) = expr.downcast_ref::<df_expr::CastExpr>() {
430420
// CastExpr child must be an expression type that convert() can handle
431421
is_convertible_expr(cast_expr.expr())
432-
} else if let Some(cast_col_expr) = expr.downcast_ref::<df_expr::CastColumnExpr>() {
433-
// CastColumnExpr child must be an expression type that convert() can handle
434-
is_convertible_expr(cast_col_expr.expr())
435422
} else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
436423
can_be_pushed_down_impl(is_null.arg(), schema)
437424
} else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
@@ -447,17 +434,15 @@ fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) ->
447434
} else if let Some(case_expr) = expr.downcast_ref::<df_expr::CaseExpr>() {
448435
can_case_be_pushed_down(case_expr, schema)
449436
} else {
450-
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
437+
tracing::debug!(%expr, "DataFusion expression can't be pushed down");
451438
false
452439
}
453440
}
454441

455442
/// Checks if an expression type is one that convert() can handle.
456443
/// This is less restrictive than can_be_pushed_down since it only checks
457444
/// expression types, not data type support.
458-
fn is_convertible_expr(df_expr: &Arc<dyn PhysicalExpr>) -> bool {
459-
let expr = df_expr.as_any();
460-
445+
fn is_convertible_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
461446
// Expression types that convert() handles
462447
expr.downcast_ref::<df_expr::BinaryExpr>().is_some()
463448
|| expr.downcast_ref::<df_expr::Column>().is_some()
@@ -466,9 +451,6 @@ fn is_convertible_expr(df_expr: &Arc<dyn PhysicalExpr>) -> bool {
466451
|| expr
467452
.downcast_ref::<df_expr::CastExpr>()
468453
.is_some_and(|e| is_convertible_expr(e.expr()))
469-
|| expr
470-
.downcast_ref::<df_expr::CastColumnExpr>()
471-
.is_some_and(|e| is_convertible_expr(e.expr()))
472454
|| expr.downcast_ref::<df_expr::IsNullExpr>().is_some()
473455
|| expr.downcast_ref::<df_expr::IsNotNullExpr>().is_some()
474456
|| expr.downcast_ref::<df_expr::InListExpr>().is_some()
@@ -568,6 +550,8 @@ mod tests {
568550
use arrow_schema::Field;
569551
use arrow_schema::Schema;
570552
use arrow_schema::TimeUnit as ArrowTimeUnit;
553+
use datafusion::arrow::array::AsArray;
554+
use datafusion::arrow::datatypes::Int32Type;
571555
use datafusion_common::ScalarValue;
572556
use datafusion_expr::Operator as DFOperator;
573557
use datafusion_physical_expr::PhysicalExpr;
@@ -988,12 +972,7 @@ mod tests {
988972
let vortex_as_arrow = vortex_result.into_primitive().as_slice::<i32>().to_vec();
989973

990974
// Convert DataFusion result to Vec for comparison
991-
let df_as_arrow: Vec<i32> = df_array
992-
.as_any()
993-
.downcast_ref::<Int32Array>()
994-
.unwrap()
995-
.values()
996-
.to_vec();
975+
let df_as_arrow: Vec<i32> = df_array.as_primitive::<Int32Type>().values().to_vec();
997976

998977
// Compare results
999978
// Expected: [0, 0, 50, 100, 100] for values [1, 5, 10, 15, 20]

vortex-datafusion/src/persistent/format.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::any::Any;
54
use std::fmt::Debug;
65
use std::fmt::Formatter;
76
use std::sync::Arc;
@@ -299,10 +298,6 @@ impl FileFormatFactory for VortexFormatFactory {
299298
fn default(&self) -> Arc<dyn FileFormat> {
300299
Arc::new(VortexFormat::new(self.session.clone()))
301300
}
302-
303-
fn as_any(&self) -> &dyn Any {
304-
self
305-
}
306301
}
307302

308303
impl VortexFormat {
@@ -330,10 +325,6 @@ impl VortexFormat {
330325

331326
#[async_trait]
332327
impl FileFormat for VortexFormat {
333-
fn as_any(&self) -> &dyn Any {
334-
self
335-
}
336-
337328
fn compression_type(&self) -> Option<FileCompressionType> {
338329
None
339330
}
@@ -594,7 +585,6 @@ impl FileFormat for VortexFormat {
594585
) -> DFResult<Arc<dyn ExecutionPlan>> {
595586
let mut source = file_scan_config
596587
.file_source()
597-
.as_any()
598588
.downcast_ref::<VortexSource>()
599589
.cloned()
600590
.ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;

0 commit comments

Comments
 (0)