Skip to content

Commit fb8be01

Browse files
committed
Skip files outside partition structure in hive-partitioned listing tables
When a hive-partitioned listing table contains files in the root directory (not inside any partition_col=value/ path), these files have no partition values. Previously they were included with empty partition_values, causing "Unable to get field named" errors when queries reference partition columns. Now try_into_partitioned_file returns None for files that don't match the partition structure, and the caller skips them.
1 parent 16e0a5c commit fb8be01

2 files changed

Lines changed: 138 additions & 8 deletions

File tree

datafusion/catalog-listing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ log = { workspace = true }
4848
object_store = { workspace = true }
4949

5050
[dev-dependencies]
51+
chrono = { workspace = true }
5152
datafusion-datasource-parquet = { workspace = true }
5253

5354
# Note: add additional linter rules in lib.rs.

datafusion/catalog-listing/src/helpers.rs

Lines changed: 137 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,17 +339,28 @@ fn filter_partitions(
339339
Ok(None)
340340
}
341341

342+
/// Returns `Ok(None)` when the file is not inside a valid partition path
343+
/// (e.g. a stale file in the table root directory). Such files are skipped
344+
/// because hive-style partition values are never null and there is no valid
345+
/// value to assign for non-partitioned files.
342346
fn try_into_partitioned_file(
343347
object_meta: ObjectMeta,
344348
partition_cols: &[(String, DataType)],
345349
table_path: &ListingTableUrl,
346-
) -> Result<PartitionedFile> {
350+
) -> Result<Option<PartitionedFile>> {
347351
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
348352
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);
349353

354+
let Some(parsed) = parsed else {
355+
debug!(
356+
"Skipping file outside partition structure: {}",
357+
object_meta.location
358+
);
359+
return Ok(None);
360+
};
361+
350362
let partition_values = parsed
351363
.into_iter()
352-
.flatten()
353364
.zip(partition_cols)
354365
.map(|(parsed, (_, datatype))| {
355366
ScalarValue::try_from_string(parsed.to_string(), datatype)
@@ -359,7 +370,7 @@ fn try_into_partitioned_file(
359370
let mut pf: PartitionedFile = object_meta.into();
360371
pf.partition_values = partition_values;
361372

362-
Ok(pf)
373+
Ok(Some(pf))
363374
}
364375

365376
/// Discover the partitions on the given path and prune out files
@@ -404,13 +415,15 @@ pub async fn pruned_partition_list<'a>(
404415
)?;
405416

406417
Ok(objects
407-
.map_ok(|object_meta| {
408-
try_into_partitioned_file(object_meta, partition_cols, table_path)
418+
.try_filter_map(|object_meta| {
419+
futures::future::ready(try_into_partitioned_file(
420+
object_meta,
421+
partition_cols,
422+
table_path,
423+
))
409424
})
410425
.try_filter_map(move |pf| {
411-
futures::future::ready(
412-
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
413-
)
426+
futures::future::ready(filter_partitions(pf, filters, &df_schema))
414427
})
415428
.boxed())
416429
}
@@ -573,6 +586,122 @@ mod tests {
573586
);
574587
}
575588

589+
#[test]
590+
fn test_try_into_partitioned_file_valid_partition() {
591+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
592+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
593+
let meta = ObjectMeta {
594+
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
595+
last_modified: chrono::Utc::now(),
596+
size: 100,
597+
e_tag: None,
598+
version: None,
599+
};
600+
601+
let result =
602+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
603+
assert!(result.is_some());
604+
let pf = result.unwrap();
605+
assert_eq!(pf.partition_values.len(), 1);
606+
assert_eq!(
607+
pf.partition_values[0],
608+
ScalarValue::Utf8(Some("2024-01".to_string()))
609+
);
610+
}
611+
612+
#[test]
613+
fn test_try_into_partitioned_file_root_file_skipped() {
614+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
615+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
616+
let meta = ObjectMeta {
617+
location: Path::from("bucket/mytable/data.parquet"),
618+
last_modified: chrono::Utc::now(),
619+
size: 100,
620+
e_tag: None,
621+
version: None,
622+
};
623+
624+
let result =
625+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
626+
assert!(
627+
result.is_none(),
628+
"Files outside partition structure should be skipped"
629+
);
630+
}
631+
632+
#[test]
633+
fn test_try_into_partitioned_file_wrong_partition_name() {
634+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
635+
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
636+
let meta = ObjectMeta {
637+
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
638+
last_modified: chrono::Utc::now(),
639+
size: 100,
640+
e_tag: None,
641+
version: None,
642+
};
643+
644+
let result =
645+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
646+
assert!(
647+
result.is_none(),
648+
"Files with wrong partition column name should be skipped"
649+
);
650+
}
651+
652+
#[test]
653+
fn test_try_into_partitioned_file_multiple_partitions() {
654+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
655+
let partition_cols = vec![
656+
("year".to_string(), DataType::Utf8),
657+
("month".to_string(), DataType::Utf8),
658+
];
659+
let meta = ObjectMeta {
660+
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
661+
last_modified: chrono::Utc::now(),
662+
size: 100,
663+
e_tag: None,
664+
version: None,
665+
};
666+
667+
let result =
668+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
669+
assert!(result.is_some());
670+
let pf = result.unwrap();
671+
assert_eq!(pf.partition_values.len(), 2);
672+
assert_eq!(
673+
pf.partition_values[0],
674+
ScalarValue::Utf8(Some("2024".to_string()))
675+
);
676+
assert_eq!(
677+
pf.partition_values[1],
678+
ScalarValue::Utf8(Some("01".to_string()))
679+
);
680+
}
681+
682+
#[test]
683+
fn test_try_into_partitioned_file_partial_partition_skipped() {
684+
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
685+
let partition_cols = vec![
686+
("year".to_string(), DataType::Utf8),
687+
("month".to_string(), DataType::Utf8),
688+
];
689+
let meta = ObjectMeta {
690+
location: Path::from("bucket/mytable/year=2024/data.parquet"),
691+
last_modified: chrono::Utc::now(),
692+
size: 100,
693+
e_tag: None,
694+
version: None,
695+
};
696+
697+
let result =
698+
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
699+
assert!(
700+
result.is_none(),
701+
"Files with incomplete partition structure should be skipped"
702+
);
703+
}
704+
576705
#[test]
577706
fn test_expr_applicable_for_cols() {
578707
assert!(expr_applicable_for_cols(

0 commit comments

Comments
 (0)