Skip to content

Commit c25ea93

Browse files
authored
Merge branch 'main' into fix/aliased_expr_explain
2 parents 2580bc5 + 5d508d3 commit c25ea93

18 files changed

Lines changed: 2124 additions & 82 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ log = { workspace = true }
4848
object_store = { workspace = true }
4949

5050
[dev-dependencies]
51+
chrono = { workspace = true }
5152
datafusion-datasource-parquet = { workspace = true }
5253

5354
# Note: add additional linter rules in lib.rs.

datafusion/catalog-listing/src/helpers.rs

Lines changed: 142 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,17 +340,25 @@ fn filter_partitions(
340340
Ok(None)
341341
}
342342

343+
/// Returns `Ok(None)` when the file is not inside a valid partition path
344+
/// (e.g. a stale file in the table root directory). Such files are skipped
345+
/// because hive-style partition values are never null and there is no valid
346+
/// value to assign for non-partitioned files.
343347
fn try_into_partitioned_file(
344348
object_meta: ObjectMeta,
345349
partition_cols: &[(String, DataType)],
346350
table_path: &ListingTableUrl,
347-
) -> Result<PartitionedFile> {
351+
) -> Result<Option<PartitionedFile>> {
348352
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
349353
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
350354

355+
let Some(parsed) = parsed else {
356+
// parse_partitions_for_path already logs a debug message
357+
return Ok(None);
358+
};
359+
351360
let partition_values = parsed
352361
.into_iter()
353-
.flatten()
354362
.zip(partition_cols)
355363
.map(|(parsed, (_, datatype))| {
356364
ScalarValue::try_from_string(parsed.to_string(), datatype)
@@ -360,7 +368,7 @@ fn try_into_partitioned_file(
360368
let mut pf: PartitionedFile = object_meta.into();
361369
pf.partition_values = partition_values;
362370

363-
Ok(pf)
371+
Ok(Some(pf))
364372
}
365373

366374
/// Discover the partitions on the given path and prune out files
@@ -405,13 +413,15 @@ pub async fn pruned_partition_list<'a>(
405413
)?;
406414

407415
Ok(objects
408-
.map_ok(|object_meta| {
409-
try_into_partitioned_file(object_meta, partition_cols, table_path)
416+
.try_filter_map(|object_meta| {
417+
futures::future::ready(try_into_partitioned_file(
418+
object_meta,
419+
partition_cols,
420+
table_path,
421+
))
410422
})
411423
.try_filter_map(move |pf| {
412-
futures::future::ready(
413-
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
414-
)
424+
futures::future::ready(filter_partitions(pf, filters, &df_schema))
415425
})
416426
.boxed())
417427
}
@@ -574,6 +584,130 @@ mod tests {
574584
);
575585
}
576586

587+
#[test]
588+
fn test_try_into_partitioned_file_valid_partition() {
589+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
590+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
591+
let meta = ObjectMeta {
592+
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
593+
last_modified: chrono::Utc::now(),
594+
size: 100,
595+
e_tag: None,
596+
version: None,
597+
};
598+
599+
let result =
600+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
601+
assert!(result.is_some());
602+
let pf = result.unwrap();
603+
assert_eq!(pf.partition_values.len(), 1);
604+
assert_eq!(
605+
pf.partition_values[0],
606+
ScalarValue::Utf8(Some("2024-01".to_string()))
607+
);
608+
}
609+
610+
#[test]
611+
fn test_try_into_partitioned_file_root_file_skipped() {
612+
// File in root directory (not inside any partition path) should be
613+
// skipped — this is the case where a stale file exists from before
614+
// hive partitioning was added.
615+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
616+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
617+
let meta = ObjectMeta {
618+
location: Path::from("bucket/mytable/data.parquet"),
619+
last_modified: chrono::Utc::now(),
620+
size: 100,
621+
e_tag: None,
622+
version: None,
623+
};
624+
625+
let result =
626+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
627+
assert!(
628+
result.is_none(),
629+
"Files outside partition structure should be skipped"
630+
);
631+
}
632+
633+
#[test]
634+
fn test_try_into_partitioned_file_wrong_partition_name() {
635+
// File in a directory that doesn't match the expected partition column
636+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
637+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
638+
let meta = ObjectMeta {
639+
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
640+
last_modified: chrono::Utc::now(),
641+
size: 100,
642+
e_tag: None,
643+
version: None,
644+
};
645+
646+
let result =
647+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
648+
assert!(
649+
result.is_none(),
650+
"Files with wrong partition column name should be skipped"
651+
);
652+
}
653+
654+
#[test]
655+
fn test_try_into_partitioned_file_multiple_partitions() {
656+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
657+
let partition_cols = vec![
658+
("year".to_string(), DataType::Utf8),
659+
("month".to_string(), DataType::Utf8),
660+
];
661+
let meta = ObjectMeta {
662+
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
663+
last_modified: chrono::Utc::now(),
664+
size: 100,
665+
e_tag: None,
666+
version: None,
667+
};
668+
669+
let result =
670+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
671+
assert!(result.is_some());
672+
let pf = result.unwrap();
673+
assert_eq!(pf.partition_values.len(), 2);
674+
assert_eq!(
675+
pf.partition_values[0],
676+
ScalarValue::Utf8(Some("2024".to_string()))
677+
);
678+
assert_eq!(
679+
pf.partition_values[1],
680+
ScalarValue::Utf8(Some("01".to_string()))
681+
);
682+
}
683+
684+
#[test]
685+
fn test_try_into_partitioned_file_partial_partition_skipped() {
686+
// File has first partition but not second — should be skipped
687+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
688+
let partition_cols = vec![
689+
("year".to_string(), DataType::Utf8),
690+
("month".to_string(), DataType::Utf8),
691+
];
692+
let meta = ObjectMeta {
693+
location: Path::from("bucket/mytable/year=2024/data.parquet"),
694+
last_modified: chrono::Utc::now(),
695+
size: 100,
696+
e_tag: None,
697+
version: None,
698+
};
699+
700+
let result =
701+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
702+
// File has year=2024 but no month= directory — parse_partitions_for_path
703+
// returns None because the path component "data.parquet" doesn't match
704+
// the expected "month=..." pattern.
705+
assert!(
706+
result.is_none(),
707+
"Files with incomplete partition structure should be skipped"
708+
);
709+
}
710+
577711
#[test]
578712
fn test_expr_applicable_for_cols() {
579713
assert!(expr_applicable_for_cols(

datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
mod bytes;
1919
mod dict;
20+
mod groups;
2021
mod native;
2122

2223
pub use bytes::BytesDistinctCountAccumulator;
2324
pub use bytes::BytesViewDistinctCountAccumulator;
2425
pub use dict::DictionaryCountAccumulator;
26+
pub use groups::PrimitiveDistinctCountGroupsAccumulator;
2527
pub use native::Bitmap65536DistinctCountAccumulator;
2628
pub use native::Bitmap65536DistinctCountAccumulatorI16;
2729
pub use native::BoolArray256DistinctCountAccumulator;

0 commit comments

Comments
 (0)