Skip to content

Commit d66824f

Browse files
Upgrade DataFusion fork to 53 (#56)
Co-authored-by: Qi Zhu <qi.zhu@polygon.io>
1 parent eae7bf4 commit d66824f

34 files changed

Lines changed: 2164 additions & 340 deletions

File tree

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ jobs:
431431
sudo apt-get update -qq
432432
sudo apt-get install -y -qq clang
433433
- name: Setup wasm-pack
434-
uses: taiki-e/install-action@cfdb446e391c69574ebc316dfb7d7849ec12b940 # v2.68.8
434+
uses: taiki-e/install-action@0e76c5c569f13f7eb21e8e5b26fe710062b57b62 # v2.65.13
435435
with:
436436
tool: wasm-pack
437437
- name: Run tests with headless mode
@@ -537,7 +537,7 @@ jobs:
537537
# command cannot be run for all the .slt files. Run it for just one that works (limit.slt)
538538
# until most of the tickets in https://github.com/apache/datafusion/issues/16248 are addressed
539539
# and this command can be run without filters.
540-
run: cargo test --test sqllogictests -- --substrait-round-trip limit.slt
540+
run: cargo test --test sqllogictests --features substrait -- --substrait-round-trip limit.slt
541541

542542
# Temporarily commenting out the Windows flow, the reason is enormously slow running build
543543
# Waiting for new Windows 2025 github runner

Cargo.lock

Lines changed: 17 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ log = { workspace = true }
4848
object_store = { workspace = true }
4949

5050
[dev-dependencies]
51+
chrono = { workspace = true }
5152
datafusion-datasource-parquet = { workspace = true }
5253

5354
# Note: add additional linter rules in lib.rs.

datafusion/catalog-listing/src/helpers.rs

Lines changed: 134 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,17 +340,25 @@ fn filter_partitions(
340340
Ok(None)
341341
}
342342

343+
/// Returns `Ok(None)` when the file is not inside a valid partition path
344+
/// (e.g. a stale file in the table root directory). Such files are skipped
345+
/// because hive-style partition values are never null and there is no valid
346+
/// value to assign for non-partitioned files.
343347
fn try_into_partitioned_file(
344348
object_meta: ObjectMeta,
345349
partition_cols: &[(String, DataType)],
346350
table_path: &ListingTableUrl,
347-
) -> Result<PartitionedFile> {
351+
) -> Result<Option<PartitionedFile>> {
348352
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
349353
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
350354

355+
let Some(parsed) = parsed else {
356+
// parse_partitions_for_path already logs a debug message
357+
return Ok(None);
358+
};
359+
351360
let partition_values = parsed
352361
.into_iter()
353-
.flatten()
354362
.zip(partition_cols)
355363
.map(|(parsed, (_, datatype))| {
356364
ScalarValue::try_from_string(parsed.to_string(), datatype)
@@ -360,7 +368,7 @@ fn try_into_partitioned_file(
360368
let mut pf: PartitionedFile = object_meta.into();
361369
pf.partition_values = partition_values;
362370

363-
Ok(pf)
371+
Ok(Some(pf))
364372
}
365373

366374
/// Discover the partitions on the given path and prune out files
@@ -405,13 +413,15 @@ pub async fn pruned_partition_list<'a>(
405413
)?;
406414

407415
Ok(objects
408-
.map_ok(|object_meta| {
409-
try_into_partitioned_file(object_meta, partition_cols, table_path)
416+
.try_filter_map(|object_meta| {
417+
futures::future::ready(try_into_partitioned_file(
418+
object_meta,
419+
partition_cols,
420+
table_path,
421+
))
410422
})
411423
.try_filter_map(move |pf| {
412-
futures::future::ready(
413-
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
414-
)
424+
futures::future::ready(filter_partitions(pf, filters, &df_schema))
415425
})
416426
.boxed())
417427
}
@@ -574,6 +584,122 @@ mod tests {
574584
);
575585
}
576586

587+
#[test]
588+
fn test_try_into_partitioned_file_valid_partition() {
589+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
590+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
591+
let meta = ObjectMeta {
592+
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
593+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
594+
size: 100,
595+
e_tag: None,
596+
version: None,
597+
};
598+
599+
let result =
600+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
601+
assert!(result.is_some());
602+
let pf = result.unwrap();
603+
assert_eq!(pf.partition_values.len(), 1);
604+
assert_eq!(
605+
pf.partition_values[0],
606+
ScalarValue::Utf8(Some("2024-01".to_string()))
607+
);
608+
}
609+
610+
#[test]
611+
fn test_try_into_partitioned_file_root_file_skipped() {
612+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
613+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
614+
let meta = ObjectMeta {
615+
location: Path::from("bucket/mytable/data.parquet"),
616+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
617+
size: 100,
618+
e_tag: None,
619+
version: None,
620+
};
621+
622+
let result =
623+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
624+
assert!(
625+
result.is_none(),
626+
"Files outside partition structure should be skipped"
627+
);
628+
}
629+
630+
#[test]
631+
fn test_try_into_partitioned_file_wrong_partition_name() {
632+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
633+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
634+
let meta = ObjectMeta {
635+
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
636+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
637+
size: 100,
638+
e_tag: None,
639+
version: None,
640+
};
641+
642+
let result =
643+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
644+
assert!(
645+
result.is_none(),
646+
"Files with wrong partition column name should be skipped"
647+
);
648+
}
649+
650+
#[test]
651+
fn test_try_into_partitioned_file_multiple_partitions() {
652+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
653+
let partition_cols = vec![
654+
("year".to_string(), DataType::Utf8),
655+
("month".to_string(), DataType::Utf8),
656+
];
657+
let meta = ObjectMeta {
658+
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
659+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
660+
size: 100,
661+
e_tag: None,
662+
version: None,
663+
};
664+
665+
let result =
666+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
667+
assert!(result.is_some());
668+
let pf = result.unwrap();
669+
assert_eq!(pf.partition_values.len(), 2);
670+
assert_eq!(
671+
pf.partition_values[0],
672+
ScalarValue::Utf8(Some("2024".to_string()))
673+
);
674+
assert_eq!(
675+
pf.partition_values[1],
676+
ScalarValue::Utf8(Some("01".to_string()))
677+
);
678+
}
679+
680+
#[test]
681+
fn test_try_into_partitioned_file_partial_partition_skipped() {
682+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
683+
let partition_cols = vec![
684+
("year".to_string(), DataType::Utf8),
685+
("month".to_string(), DataType::Utf8),
686+
];
687+
let meta = ObjectMeta {
688+
location: Path::from("bucket/mytable/year=2024/data.parquet"),
689+
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
690+
size: 100,
691+
e_tag: None,
692+
version: None,
693+
};
694+
695+
let result =
696+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
697+
assert!(
698+
result.is_none(),
699+
"Files with incomplete partition structure should be skipped"
700+
);
701+
}
702+
577703
#[test]
578704
fn test_expr_applicable_for_cols() {
579705
assert!(expr_applicable_for_cols(

0 commit comments

Comments
 (0)