diff --git a/Cargo.lock b/Cargo.lock index 94554ce569c66..b60c510bb0bad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1891,6 +1891,7 @@ version = "52.4.0" dependencies = [ "arrow", "async-trait", + "chrono", "datafusion-catalog", "datafusion-common", "datafusion-datasource", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index be1374b371485..61b55397137df 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -48,6 +48,7 @@ log = { workspace = true } object_store = { workspace = true } [dev-dependencies] +chrono = { workspace = true } datafusion-datasource-parquet = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index ea016015cebd3..db0b30f89538d 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -339,17 +339,25 @@ fn filter_partitions( Ok(None) } +/// Returns `Ok(None)` when the file is not inside a valid partition path +/// (e.g. a stale file in the table root directory). Such files are skipped +/// because hive-style partition values are never null and there is no valid +/// value to assign for non-partitioned files. fn try_into_partitioned_file( object_meta: ObjectMeta, partition_cols: &[(String, DataType)], table_path: &ListingTableUrl, -) -> Result { +) -> Result> { let cols = partition_cols.iter().map(|(name, _)| name.as_str()); let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols); + let Some(parsed) = parsed else { + // parse_partitions_for_path already logs a debug message + return Ok(None); + }; + let partition_values = parsed .into_iter() - .flatten() .zip(partition_cols) .map(|(parsed, (_, datatype))| { ScalarValue::try_from_string(parsed.to_string(), datatype) @@ -359,7 +367,7 @@ fn try_into_partitioned_file( let mut pf: PartitionedFile = object_meta.into(); pf.partition_values = partition_values; - Ok(pf) + Ok(Some(pf)) } /// Discover the partitions on the given path and prune out files @@ -404,13 +412,15 @@ pub async fn pruned_partition_list<'a>( )?; Ok(objects - .map_ok(|object_meta| { - try_into_partitioned_file(object_meta, partition_cols, table_path) + .try_filter_map(|object_meta| { + futures::future::ready(try_into_partitioned_file( + object_meta, + partition_cols, + table_path, + )) }) .try_filter_map(move |pf| { - futures::future::ready( - pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)), - ) + futures::future::ready(filter_partitions(pf, filters, &df_schema)) }) .boxed()) } @@ -573,6 +583,122 @@ mod tests { ); } + #[test] + fn test_try_into_partitioned_file_valid_partition() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("year_month".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"), + last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!(result.is_some()); + let pf = result.unwrap(); + assert_eq!(pf.partition_values.len(), 1); + assert_eq!( + pf.partition_values[0], + ScalarValue::Utf8(Some("2024-01".to_string())) + ); + } + + #[test] + fn test_try_into_partitioned_file_root_file_skipped() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("year_month".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/data.parquet"), + last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!( + result.is_none(), + "Files outside partition structure should be skipped" + ); + } + + #[test] + fn test_try_into_partitioned_file_wrong_partition_name() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("year_month".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"), + last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!( + result.is_none(), + "Files with wrong partition column name should be skipped" + ); + } + + #[test] + fn test_try_into_partitioned_file_multiple_partitions() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![ + ("year".to_string(), DataType::Utf8), + ("month".to_string(), DataType::Utf8), + ]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"), + last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!(result.is_some()); + let pf = result.unwrap(); + assert_eq!(pf.partition_values.len(), 2); + assert_eq!( + pf.partition_values[0], + ScalarValue::Utf8(Some("2024".to_string())) + ); + assert_eq!( + pf.partition_values[1], + ScalarValue::Utf8(Some("01".to_string())) + ); + } + + #[test] + fn test_try_into_partitioned_file_partial_partition_skipped() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![ + ("year".to_string(), DataType::Utf8), + ("month".to_string(), DataType::Utf8), + ]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/year=2024/data.parquet"), + last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!( + result.is_none(), + "Files with incomplete partition structure should be skipped" + ); + } + #[test] fn test_expr_applicable_for_cols() { assert!(expr_applicable_for_cols(