Skip to content

Commit 345cbb4

Browse files
mbutrovichblackmwk
authored andcommitted
fix: build_fallback_field_id_map produces incorrect column indices for schemas with nested types (#2307)
## Which issue does this PR close? - Closes #2306. - Downstream issue: apache/datafusion-comet#3860 ## What changes are included in this PR? `build_fallback_field_id_map` iterated over Parquet leaf columns instead of top-level fields when building the field ID to column index mapping for migrated files (no embedded field IDs). When nested types (struct, list, map) precede a primitive column, they expand into multiple leaves, causing the mapping to diverge from `add_fallback_field_ids_to_arrow_schema` which correctly assigns ordinal IDs to top-level Arrow fields. This made predicates on columns after nested types resolve to a leaf inside the group, crashing with "Leaf column `id` in predicates isn't a root column in Parquet schema". The fix iterates `root_schema().get_fields()` directly, assigning ordinal IDs only to top-level fields. For non-primitive fields (struct/list/map), it uses `get_column_root_idx` to advance past their leaf columns. This mirrors iceberg-java's `ParquetSchemaUtil.addFallbackIds()`, which iterates `fileSchema.getFields()` assigning ordinal IDs to top-level fields. Also renames "Leave column" to "Leaf column" in error messages. ## Are these changes tested? - An integration test (`test_predicate_on_migrated_file_with_nested_types`) writes a Parquet file without field IDs containing struct, list, and map columns before an `id` column, then reads with a predicate on `id`. This reproduces the exact crash before the fix. Test data is constructed with `serde_arrow` for readability. - [Apache DataFusion Comet](https://github.com/apache/datafusion-comet) used the repro test in [apache/datafusion-comet#3860](apache/datafusion-comet#3860) and it passes with this change: apache/datafusion-comet#3872 --------- Co-authored-by: blackmwk <liurenjie1024@outlook.com> (cherry picked from commit 5ea6f4c)
1 parent 97db3b4 commit 345cbb4

3 files changed

Lines changed: 277 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 46 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ rand = { workspace = true }
8989
regex = { workspace = true }
9090
tempfile = { workspace = true }
9191
minijinja = { workspace = true }
92+
serde_arrow = { version = "0.14", features = ["arrow-58"] }
9293

9394
[package.metadata.cargo-machete]
9495
# These dependencies are added to ensure minimal dependency version

crates/iceberg/src/arrow/reader.rs

Lines changed: 230 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,7 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
11001100
return Err(Error::new(
11011101
ErrorKind::DataInvalid,
11021102
format!(
1103-
"Leave column in schema should be primitive type but got {field_type:?}"
1103+
"Leaf column in schema should be primitive type but got {field_type:?}"
11041104
),
11051105
));
11061106
}
@@ -1111,14 +1111,36 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
11111111
}
11121112

11131113
/// Build a fallback field ID map for Parquet files without embedded field IDs.
1114-
/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
1114+
///
1115+
/// Returns the number of primitive (leaf) columns in a Parquet type, recursing into groups.
1116+
fn leaf_count(ty: &parquet::schema::types::Type) -> usize {
1117+
if ty.is_primitive() {
1118+
1
1119+
} else {
1120+
ty.get_fields().iter().map(|f| leaf_count(f)).sum()
1121+
}
1122+
}
1123+
1124+
/// Builds a mapping from fallback field IDs to leaf column indices for Parquet files
1125+
/// without embedded field IDs. Returns entries only for primitive top-level fields.
1126+
///
1127+
/// Must use top-level field positions (not leaf column positions) to stay consistent
1128+
/// with `add_fallback_field_ids_to_arrow_schema`, which assigns ordinal IDs to
1129+
/// top-level Arrow fields. Using leaf positions instead would produce wrong indices
1130+
/// when nested types (struct/list/map) expand into multiple leaf columns.
1131+
///
1132+
/// Mirrors iceberg-java's ParquetSchemaUtil.addFallbackIds() which iterates
1133+
/// fileSchema.getFields() assigning ordinal IDs to top-level fields.
11151134
fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
11161135
let mut column_map = HashMap::new();
1136+
let mut leaf_idx = 0;
11171137

1118-
// 1-indexed to match iceberg-java's convention
1119-
for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1120-
let field_id = (idx + 1) as i32;
1121-
column_map.insert(field_id, idx);
1138+
for (top_pos, field) in parquet_schema.root_schema().get_fields().iter().enumerate() {
1139+
let field_id = (top_pos + 1) as i32;
1140+
if field.is_primitive() {
1141+
column_map.insert(field_id, leaf_idx);
1142+
}
1143+
leaf_idx += leaf_count(field);
11221144
}
11231145

11241146
column_map
@@ -1409,7 +1431,7 @@ impl PredicateConverter<'_> {
14091431
return Err(Error::new(
14101432
ErrorKind::DataInvalid,
14111433
format!(
1412-
"Leave column `{}` in predicates isn't a root column in Parquet schema.",
1434+
"Leaf column `{}` in predicates isn't a root column in Parquet schema.",
14131435
reference.field().name
14141436
),
14151437
));
@@ -1423,7 +1445,7 @@ impl PredicateConverter<'_> {
14231445
.ok_or(Error::new(
14241446
ErrorKind::DataInvalid,
14251447
format!(
1426-
"Leave column `{}` in predicates cannot be found in the required column indices.",
1448+
"Leaf column `{}` in predicates cannot be found in the required column indices.",
14271449
reference.field().name
14281450
),
14291451
))?;
@@ -4667,4 +4689,204 @@ message schema {
46674689
assert_eq!(result[1], expected_1);
46684690
assert_eq!(result[2], expected_2);
46694691
}
4692+
4693+
/// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
4694+
/// predicate on a column after nested types in a migrated file (no field IDs).
4695+
/// Schema has struct, list, and map columns before the predicate target (`id`),
4696+
/// exercising the fallback field ID mapping across all nested type variants.
4697+
#[tokio::test]
4698+
async fn test_predicate_on_migrated_file_with_nested_types() {
4699+
use serde::{Deserialize, Serialize};
4700+
use serde_arrow::schema::{SchemaLike, TracingOptions};
4701+
4702+
#[derive(Serialize, Deserialize)]
4703+
struct Person {
4704+
name: String,
4705+
age: i32,
4706+
}
4707+
4708+
#[derive(Serialize, Deserialize)]
4709+
struct Row {
4710+
person: Person,
4711+
people: Vec<Person>,
4712+
props: std::collections::BTreeMap<String, String>,
4713+
id: i32,
4714+
}
4715+
4716+
let rows = vec![
4717+
Row {
4718+
person: Person {
4719+
name: "Alice".into(),
4720+
age: 30,
4721+
},
4722+
people: vec![Person {
4723+
name: "Alice".into(),
4724+
age: 30,
4725+
}],
4726+
props: [("k1".into(), "v1".into())].into(),
4727+
id: 1,
4728+
},
4729+
Row {
4730+
person: Person {
4731+
name: "Bob".into(),
4732+
age: 25,
4733+
},
4734+
people: vec![Person {
4735+
name: "Bob".into(),
4736+
age: 25,
4737+
}],
4738+
props: [("k2".into(), "v2".into())].into(),
4739+
id: 2,
4740+
},
4741+
Row {
4742+
person: Person {
4743+
name: "Carol".into(),
4744+
age: 40,
4745+
},
4746+
people: vec![Person {
4747+
name: "Carol".into(),
4748+
age: 40,
4749+
}],
4750+
props: [("k3".into(), "v3".into())].into(),
4751+
id: 3,
4752+
},
4753+
];
4754+
4755+
let tracing_options = TracingOptions::default()
4756+
.map_as_struct(false)
4757+
.strings_as_large_utf8(false)
4758+
.sequence_as_large_list(false);
4759+
let fields = Vec::<arrow_schema::FieldRef>::from_type::<Row>(tracing_options).unwrap();
4760+
let arrow_schema = Arc::new(ArrowSchema::new(fields.clone()));
4761+
let batch = serde_arrow::to_record_batch(&fields, &rows).unwrap();
4762+
4763+
// Fallback field IDs: person=1, people=2, props=3, id=4
4764+
let iceberg_schema = Arc::new(
4765+
Schema::builder()
4766+
.with_schema_id(1)
4767+
.with_fields(vec![
4768+
NestedField::required(
4769+
1,
4770+
"person",
4771+
Type::Struct(crate::spec::StructType::new(vec![
4772+
NestedField::required(
4773+
5,
4774+
"name",
4775+
Type::Primitive(PrimitiveType::String),
4776+
)
4777+
.into(),
4778+
NestedField::required(6, "age", Type::Primitive(PrimitiveType::Int))
4779+
.into(),
4780+
])),
4781+
)
4782+
.into(),
4783+
NestedField::required(
4784+
2,
4785+
"people",
4786+
Type::List(crate::spec::ListType {
4787+
element_field: NestedField::required(
4788+
7,
4789+
"element",
4790+
Type::Struct(crate::spec::StructType::new(vec![
4791+
NestedField::required(
4792+
8,
4793+
"name",
4794+
Type::Primitive(PrimitiveType::String),
4795+
)
4796+
.into(),
4797+
NestedField::required(
4798+
9,
4799+
"age",
4800+
Type::Primitive(PrimitiveType::Int),
4801+
)
4802+
.into(),
4803+
])),
4804+
)
4805+
.into(),
4806+
}),
4807+
)
4808+
.into(),
4809+
NestedField::required(
4810+
3,
4811+
"props",
4812+
Type::Map(crate::spec::MapType {
4813+
key_field: NestedField::required(
4814+
10,
4815+
"key",
4816+
Type::Primitive(PrimitiveType::String),
4817+
)
4818+
.into(),
4819+
value_field: NestedField::required(
4820+
11,
4821+
"value",
4822+
Type::Primitive(PrimitiveType::String),
4823+
)
4824+
.into(),
4825+
}),
4826+
)
4827+
.into(),
4828+
NestedField::required(4, "id", Type::Primitive(PrimitiveType::Int)).into(),
4829+
])
4830+
.build()
4831+
.unwrap(),
4832+
);
4833+
4834+
let tmp_dir = TempDir::new().unwrap();
4835+
let table_location = tmp_dir.path().to_str().unwrap().to_string();
4836+
let file_path = format!("{table_location}/1.parquet");
4837+
4838+
let props = WriterProperties::builder()
4839+
.set_compression(Compression::SNAPPY)
4840+
.build();
4841+
let file = File::create(&file_path).unwrap();
4842+
let mut writer = ArrowWriter::try_new(file, arrow_schema, Some(props)).unwrap();
4843+
writer.write(&batch).expect("Writing batch");
4844+
writer.close().unwrap();
4845+
4846+
let predicate = Reference::new("id").greater_than(Datum::int(1));
4847+
4848+
let reader = ArrowReaderBuilder::new(FileIO::new_with_fs())
4849+
.with_row_group_filtering_enabled(true)
4850+
.with_row_selection_enabled(true)
4851+
.build();
4852+
4853+
let tasks = Box::pin(futures::stream::iter(
4854+
vec![Ok(FileScanTask {
4855+
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
4856+
start: 0,
4857+
length: 0,
4858+
record_count: None,
4859+
data_file_path: file_path,
4860+
data_file_format: DataFileFormat::Parquet,
4861+
schema: iceberg_schema.clone(),
4862+
project_field_ids: vec![4],
4863+
predicate: Some(predicate.bind(iceberg_schema, true).unwrap()),
4864+
deletes: vec![],
4865+
partition: None,
4866+
partition_spec: None,
4867+
name_mapping: None,
4868+
case_sensitive: false,
4869+
})]
4870+
.into_iter(),
4871+
)) as FileScanTaskStream;
4872+
4873+
let result = reader
4874+
.read(tasks)
4875+
.unwrap()
4876+
.try_collect::<Vec<RecordBatch>>()
4877+
.await
4878+
.unwrap();
4879+
4880+
let ids: Vec<i32> = result
4881+
.iter()
4882+
.flat_map(|b| {
4883+
b.column(0)
4884+
.as_primitive::<arrow_array::types::Int32Type>()
4885+
.values()
4886+
.iter()
4887+
.copied()
4888+
})
4889+
.collect();
4890+
assert_eq!(ids, vec![2, 3]);
4891+
}
46704892
}

0 commit comments

Comments
 (0)