diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 4aca5e989771..312d62b7def1 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -94,6 +94,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -102,6 +103,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.OrcMetrics; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetUtil; @@ -514,22 +516,37 @@ static T transformValue(Transform transform, Type type, ByteBuffer private Table getOrCreateTable(String filePath, FileFormat format) throws IOException { TableIdentifier tableId = TableIdentifier.parse(identifier); + @Nullable Table t; try { - return catalogConfig.catalog().loadTable(tableId); + t = catalogConfig.catalog().loadTable(tableId); } catch (NoSuchTableException e) { try { org.apache.iceberg.Schema schema = getSchema(filePath, format); PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); - return tableProps == null - ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) - : catalogConfig - .catalog() - .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + t = + tableProps == null + ? catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); } catch (AlreadyExistsException e2) { // if table already exists, just load it - return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + t = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); } } + ensureNameMappingPresent(t); + return t; + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit(); + } } /** diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java index 85740a33bd29..00eeeb04af02 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java @@ -64,6 +64,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; @@ -72,6 +73,7 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; @@ -219,6 +221,12 @@ public void testAddFilesWithPartitionPath(boolean isPartitioned) throws Exceptio // check partition metadata is preserved assertEquals(writtenDf1.partition(), addedDf1.partition()); assertEquals(writtenDf2.partition(), addedDf2.partition()); + + // check that mapping util was added + assertEquals( + MappingUtil.create(icebergSchema).asMappedFields(), + NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING)) + .asMappedFields()); } @Test @@ -319,6 +327,11 @@ && checkStateNotNull(errorRow.getString(1)) // check partition metadata is preserved assertEquals(expectedPartition1, addedDf1.partition()); assertEquals(expectedPartition2, addedDf2.partition()); + + assertEquals( + MappingUtil.create(icebergSchema).asMappedFields(), + NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING)) + .asMappedFields()); } @Test