Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ log = { workspace = true }
object_store = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
datafusion-datasource-parquet = { workspace = true }

# Note: add additional linter rules in lib.rs.
Expand Down
142 changes: 134 additions & 8 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,25 @@ fn filter_partitions(
Ok(None)
}

/// Returns `Ok(None)` when the file is not inside a valid partition path
/// (e.g. a stale file in the table root directory). Such files are skipped
/// because hive-style partition values are never null and there is no valid
/// value to assign for non-partitioned files.
fn try_into_partitioned_file(
object_meta: ObjectMeta,
partition_cols: &[(String, DataType)],
table_path: &ListingTableUrl,
) -> Result<PartitionedFile> {
) -> Result<Option<PartitionedFile>> {
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);

let Some(parsed) = parsed else {
// parse_partitions_for_path already logs a debug message
return Ok(None);
};

let partition_values = parsed
.into_iter()
.flatten()
.zip(partition_cols)
.map(|(parsed, (_, datatype))| {
ScalarValue::try_from_string(parsed.to_string(), datatype)
Expand All @@ -359,7 +367,7 @@ fn try_into_partitioned_file(
let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;

Ok(pf)
Ok(Some(pf))
}

/// Discover the partitions on the given path and prune out files
Expand Down Expand Up @@ -404,13 +412,15 @@ pub async fn pruned_partition_list<'a>(
)?;

Ok(objects
.map_ok(|object_meta| {
try_into_partitioned_file(object_meta, partition_cols, table_path)
.try_filter_map(|object_meta| {
futures::future::ready(try_into_partitioned_file(
object_meta,
partition_cols,
table_path,
))
})
.try_filter_map(move |pf| {
futures::future::ready(
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
)
futures::future::ready(filter_partitions(pf, filters, &df_schema))
})
.boxed())
}
Expand Down Expand Up @@ -573,6 +583,122 @@ mod tests {
);
}

#[test]
fn test_try_into_partitioned_file_valid_partition() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};
Comment thread
zhuqi-lucas marked this conversation as resolved.

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(result.is_some());
let pf = result.unwrap();
assert_eq!(pf.partition_values.len(), 1);
assert_eq!(
pf.partition_values[0],
ScalarValue::Utf8(Some("2024-01".to_string()))
);
}

#[test]
fn test_try_into_partitioned_file_root_file_skipped() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};
Comment thread
zhuqi-lucas marked this conversation as resolved.

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files outside partition structure should be skipped"
);
}

#[test]
fn test_try_into_partitioned_file_wrong_partition_name() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};
Comment thread
zhuqi-lucas marked this conversation as resolved.

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files with wrong partition column name should be skipped"
);
}

#[test]
fn test_try_into_partitioned_file_multiple_partitions() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![
("year".to_string(), DataType::Utf8),
("month".to_string(), DataType::Utf8),
];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};
Comment thread
zhuqi-lucas marked this conversation as resolved.

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(result.is_some());
let pf = result.unwrap();
assert_eq!(pf.partition_values.len(), 2);
assert_eq!(
pf.partition_values[0],
ScalarValue::Utf8(Some("2024".to_string()))
);
assert_eq!(
pf.partition_values[1],
ScalarValue::Utf8(Some("01".to_string()))
);
}

#[test]
fn test_try_into_partitioned_file_partial_partition_skipped() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![
("year".to_string(), DataType::Utf8),
("month".to_string(), DataType::Utf8),
];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year=2024/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};
Comment thread
zhuqi-lucas marked this conversation as resolved.

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files with incomplete partition structure should be skipped"
);
}

#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
Expand Down
Loading