diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 312d62b7def1..cfc297f2f9aa 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -516,37 +516,22 @@ static T transformValue(Transform transform, Type type, ByteBuffer private Table getOrCreateTable(String filePath, FileFormat format) throws IOException { TableIdentifier tableId = TableIdentifier.parse(identifier); - @Nullable Table t; try { - t = catalogConfig.catalog().loadTable(tableId); + return catalogConfig.catalog().loadTable(tableId); } catch (NoSuchTableException e) { try { org.apache.iceberg.Schema schema = getSchema(filePath, format); PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); - t = - tableProps == null - ? catalogConfig - .catalog() - .createTable(TableIdentifier.parse(identifier), schema, spec) - : catalogConfig - .catalog() - .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + return tableProps == null + ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); } catch (AlreadyExistsException e2) { // if table already exists, just load it - t = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); } } - ensureNameMappingPresent(t); - return t; - } - - private static void ensureNameMappingPresent(Table table) { - if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { - // Forces Name based resolution instead of position based resolution - NameMapping mapping = MappingUtil.create(table.schema()); - String mappingJson = NameMappingParser.toJson(mapping); - table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit(); - } } /** @@ -743,6 +728,15 @@ public CommitManifestFilesDoFn(IcebergCatalogConfig catalogConfig, String identi this.identifier = identifier; } + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit(); + } + } + @ProcessElement public void process( @Element KV> batch, @@ -758,6 +752,7 @@ public void process( table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); } table.refresh(); + ensureNameMappingPresent(table); if (shouldSkip(commitId, lastCommitTimestamp.read())) { return;