From 50d09c7acd8af7b1a412aad308aa17f1e5889d28 Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Fri, 6 Mar 2026 04:42:50 +0100 Subject: [PATCH 1/3] Move FileIO close from RecordWriter to RecordWriterManager --- .../beam/sdk/io/iceberg/RecordWriter.java | 25 +----- .../sdk/io/iceberg/RecordWriterManager.java | 65 +++++++++----- .../io/iceberg/RecordWriterManagerTest.java | 84 ++++++++++++++++++- 3 files changed, 128 insertions(+), 46 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index d233b0ac05b5..08aeb35971f0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -31,10 +31,8 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +45,6 @@ class RecordWriter { private final Table table; private final String absoluteFilename; private final FileFormat fileFormat; - private @Nullable FileIO io; RecordWriter( Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) @@ -74,11 +71,9 @@ class RecordWriter { } OutputFile outputFile; EncryptionKeyMetadata keyMetadata; - // Keep FileIO open for the lifetime of this writer to avoid - // premature shutdown of underlying client pools (e.g., S3), - // which manifests as "Connection pool shut down" (Issue #36438). - this.io = table.io(); - OutputFile tmpFile = io.newOutputFile(absoluteFilename); + // table.io() returns the catalog's shared FileIO instance. + // FileIO lifecycle is managed by RecordWriterManager.close(). + OutputFile tmpFile = table.io().newOutputFile(absoluteFilename); EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); outputFile = encryptedOutputFile.encryptingOutputFile(); keyMetadata = encryptedOutputFile.keyMetadata(); @@ -135,20 +130,6 @@ public void close() throws IOException { fileFormat, table.name(), absoluteFilename), e); } finally { - // Always attempt to close FileIO and decrement metrics - if (io != null) { - try { - io.close(); - } catch (Exception ioCloseError) { - if (closeError != null) { - closeError.addSuppressed(ioCloseError); - } else { - closeError = new IOException("Failed to close FileIO", ioCloseError); - } - } finally { - io = null; - } - } activeIcebergWriters.dec(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index da62fb658846..1551936a1e66 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -29,8 +30,10 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.schemas.Schema; @@ -58,6 +61,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.transforms.Transforms; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -403,33 +407,50 @@ public boolean write(WindowedValue icebergDestination, Row r */ @Override public void close() throws IOException { - for (Map.Entry, DestinationState> - windowedDestinationAndState : destinations.entrySet()) { - DestinationState state = windowedDestinationAndState.getValue(); + try { + for (Map.Entry, DestinationState> + windowedDestinationAndState : destinations.entrySet()) { + DestinationState state = windowedDestinationAndState.getValue(); - // removing writers from the state's cache will trigger the logic to collect each writer's - // data file. - state.writers.invalidateAll(); - // first check for any exceptions swallowed by the cache - if (!state.exceptions.isEmpty()) { - IllegalStateException exception = - new IllegalStateException( - String.format("Encountered %s failed writer(s).", state.exceptions.size())); - for (Exception e : state.exceptions) { - exception.addSuppressed(e); + // removing writers from the state's cache will trigger the logic to collect each writer's + // data file. + state.writers.invalidateAll(); + // first check for any exceptions swallowed by the cache + if (!state.exceptions.isEmpty()) { + IllegalStateException exception = + new IllegalStateException( + String.format("Encountered %s failed writer(s).", state.exceptions.size())); + for (Exception e : state.exceptions) { + exception.addSuppressed(e); + } + throw exception; } - throw exception; - } - if (state.dataFiles.isEmpty()) { - continue; - } + if (state.dataFiles.isEmpty()) { + continue; + } - totalSerializableDataFiles.put( - windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles)); - state.dataFiles.clear(); + totalSerializableDataFiles.put( + windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles)); + state.dataFiles.clear(); + } + } finally { + // Close unique FileIO instances now that all writers are done. + // table.io() returns the catalog's shared FileIO; we deduplicate by identity + // so we close each underlying connection pool exactly once. + Set closedIOs = new HashSet<>(); + for (DestinationState state : destinations.values()) { + FileIO io = state.table.io(); + if (io instanceof Closeable && closedIOs.add(io)) { + try { + ((Closeable) io).close(); + } catch (IOException e) { + LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e); + } + } + } + destinations.clear(); } - destinations.clear(); checkArgument( openWriters == 0, "Expected all data writers to be closed, but found %s data writer(s) still open", diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 375d90737117..c334b80bb7bf 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -65,6 +65,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; @@ -957,7 +958,7 @@ public void testDefaultMetrics() throws IOException { } @Test - public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException { + public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException { TableIdentifier tableId = TableIdentifier.of( "default", @@ -980,7 +981,86 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException { writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); writer.close(); - assertTrue("FileIO should be closed after writer close", trackingFileIO.closed); + // RecordWriter must NOT close FileIO — it's the catalog's shared instance. + assertFalse("RecordWriter.close() must not close the shared FileIO", trackingFileIO.closed); + } + + /** + * Verifies that when multiple writers share the same FileIO, closing any writer does not close + * the shared FileIO — that is the responsibility of RecordWriterManager.close(). + */ + @Test + public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOException { + // Create two tables that share the same FileIO (simulating dynamic destinations + // backed by the same catalog) + TableIdentifier tableId1 = + TableIdentifier.of( + "default", + "table_batch_close_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6)); + TableIdentifier tableId2 = + TableIdentifier.of( + "default", + "table_batch_close_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6)); + Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA); + Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA); + + // Both tables share the same CloseTrackingFileIO — mirrors how a catalog returns + // the same shared FileIO instance for all tables + CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io()); + Table spyTable1 = Mockito.spy(table1); + Table spyTable2 = Mockito.spy(table2); + Mockito.doReturn(sharedFileIO).when(spyTable1).io(); + Mockito.doReturn(sharedFileIO).when(spyTable2).io(); + + PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema()); + PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema()); + + RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, "file1.parquet", pk1); + RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, "file2.parquet", pk2); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row); + + writer1.write(record); + writer2.write(record); + + writer1.close(); + assertFalse("FileIO must remain open between batch writer closes", sharedFileIO.closed); + + writer2.close(); + assertFalse("FileIO must remain open after all writers close", sharedFileIO.closed); + + // Both writers produced valid data files + assertNotNull(writer1.getDataFile()); + assertNotNull(writer2.getDataFile()); + } + + /** + * Verifies that RecordWriterManager.close() successfully flushes data files from multiple + * destinations. + */ + @Test + public void testRecordWriterManagerCloseFlushesAllDestinations() throws IOException { + String tableName1 = + "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); + String tableName2 = + "table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); + WindowedValue dest1 = getWindowedDestination(tableName1, null); + WindowedValue dest2 = getWindowedDestination(tableName2, null); + + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + assertTrue(writerManager.write(dest1, row)); + assertTrue(writerManager.write(dest2, row)); + assertEquals(2, writerManager.openWriters); + + writerManager.close(); + + Map, List> dataFiles = + writerManager.getSerializableDataFiles(); + assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1)); + assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2)); } private static final class CloseTrackingFileIO implements FileIO { From 5235f9b333788e16388b97cdad305f2dfa961971 Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Fri, 6 Mar 2026 05:28:32 +0100 Subject: [PATCH 2/3] fix --- .../apache/beam/sdk/io/iceberg/RecordWriterManager.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 1551936a1e66..4d0412f46539 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -20,7 +20,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -441,10 +440,10 @@ public void close() throws IOException { Set closedIOs = new HashSet<>(); for (DestinationState state : destinations.values()) { FileIO io = state.table.io(); - if (io instanceof Closeable && closedIOs.add(io)) { + if (io != null && closedIOs.add(io)) { try { - ((Closeable) io).close(); - } catch (IOException e) { + io.close(); + } catch (Exception e) { LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e); } } From c5282eda9eae62d38cad32c19a3823c0ddf92506 Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Fri, 6 Mar 2026 18:21:00 +0100 Subject: [PATCH 3/3] clarify FileIO ownership comments and verify close --- .../beam/sdk/io/iceberg/RecordWriter.java | 2 +- .../sdk/io/iceberg/RecordWriterManager.java | 2 +- .../io/iceberg/RecordWriterManagerTest.java | 32 +++++++++++++++---- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 08aeb35971f0..82251c00e72e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -71,7 +71,7 @@ class RecordWriter { } OutputFile outputFile; EncryptionKeyMetadata keyMetadata; - // table.io() returns the catalog's shared FileIO instance. + // table.io() may return a shared FileIO instance. // FileIO lifecycle is managed by RecordWriterManager.close(). OutputFile tmpFile = table.io().newOutputFile(absoluteFilename); EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 4d0412f46539..eb79513df4f9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -435,7 +435,7 @@ public void close() throws IOException { } } finally { // Close unique FileIO instances now that all writers are done. - // table.io() returns the catalog's shared FileIO; we deduplicate by identity + // table.io() may return a shared FileIO; we deduplicate by identity // so we close each underlying connection pool exactly once. Set closedIOs = new HashSet<>(); for (DestinationState state : destinations.values()) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index c334b80bb7bf..2672ac70c088 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -981,7 +981,7 @@ public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException { writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); writer.close(); - // RecordWriter must NOT close FileIO — it's the catalog's shared instance. + // RecordWriter must NOT close FileIO — it may be a shared instance. assertFalse("RecordWriter.close() must not close the shared FileIO", trackingFileIO.closed); } @@ -1004,8 +1004,8 @@ public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOExcepti Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA); Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA); - // Both tables share the same CloseTrackingFileIO — mirrors how a catalog returns - // the same shared FileIO instance for all tables + // Both tables share the same CloseTrackingFileIO — mirrors how some catalogs + // return a shared FileIO instance across tables CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io()); Table spyTable1 = Mockito.spy(table1); Table spyTable2 = Mockito.spy(table2); @@ -1036,19 +1036,36 @@ public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOExcepti } /** - * Verifies that RecordWriterManager.close() successfully flushes data files from multiple - * destinations. + * Verifies that RecordWriterManager.close() flushes data files from multiple destinations and + * closes the shared FileIO. */ @Test - public void testRecordWriterManagerCloseFlushesAllDestinations() throws IOException { + public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException { String tableName1 = "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); String tableName2 = "table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); + TableIdentifier tableId1 = TableIdentifier.of("default", tableName1); + TableIdentifier tableId2 = TableIdentifier.of("default", tableName2); + + Table realTable1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA); + Table realTable2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA); + + CloseTrackingFileIO sharedTrackingIO = new CloseTrackingFileIO(realTable1.io()); + Table spyTable1 = Mockito.spy(realTable1); + Table spyTable2 = Mockito.spy(realTable2); + Mockito.doReturn(sharedTrackingIO).when(spyTable1).io(); + Mockito.doReturn(sharedTrackingIO).when(spyTable2).io(); + + Catalog spyCatalog = Mockito.spy(catalog); + Mockito.doReturn(spyTable1).when(spyCatalog).loadTable(tableId1); + Mockito.doReturn(spyTable2).when(spyCatalog).loadTable(tableId2); + WindowedValue dest1 = getWindowedDestination(tableName1, null); WindowedValue dest2 = getWindowedDestination(tableName2, null); - RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); + RecordWriterManager writerManager = + new RecordWriterManager(spyCatalog, "test_file_name", 1000, 3); Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); assertTrue(writerManager.write(dest1, row)); @@ -1061,6 +1078,7 @@ public void testRecordWriterManagerCloseFlushesAllDestinations() throws IOExcept writerManager.getSerializableDataFiles(); assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1)); assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2)); + assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed); } private static final class CloseTrackingFileIO implements FileIO {