Skip to content

Commit 50d09c7

Browse files
committed
Move FileIO close from RecordWriter to RecordWriterManager
1 parent 676c998 commit 50d09c7

3 files changed

Lines changed: 128 additions & 46 deletions

File tree

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@
3131
import org.apache.iceberg.encryption.EncryptedOutputFile;
3232
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
3333
import org.apache.iceberg.io.DataWriter;
34-
import org.apache.iceberg.io.FileIO;
3534
import org.apache.iceberg.io.OutputFile;
3635
import org.apache.iceberg.parquet.Parquet;
37-
import org.checkerframework.checker.nullness.qual.Nullable;
3836
import org.slf4j.Logger;
3937
import org.slf4j.LoggerFactory;
4038

@@ -47,7 +45,6 @@ class RecordWriter {
4745
private final Table table;
4846
private final String absoluteFilename;
4947
private final FileFormat fileFormat;
50-
private @Nullable FileIO io;
5148

5249
RecordWriter(
5350
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
@@ -74,11 +71,9 @@ class RecordWriter {
7471
}
7572
OutputFile outputFile;
7673
EncryptionKeyMetadata keyMetadata;
77-
// Keep FileIO open for the lifetime of this writer to avoid
78-
// premature shutdown of underlying client pools (e.g., S3),
79-
// which manifests as "Connection pool shut down" (Issue #36438).
80-
this.io = table.io();
81-
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
74+
// table.io() returns the catalog's shared FileIO instance.
75+
// FileIO lifecycle is managed by RecordWriterManager.close().
76+
OutputFile tmpFile = table.io().newOutputFile(absoluteFilename);
8277
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
8378
outputFile = encryptedOutputFile.encryptingOutputFile();
8479
keyMetadata = encryptedOutputFile.keyMetadata();
@@ -135,20 +130,6 @@ public void close() throws IOException {
135130
fileFormat, table.name(), absoluteFilename),
136131
e);
137132
} finally {
138-
// Always attempt to close FileIO and decrement metrics
139-
if (io != null) {
140-
try {
141-
io.close();
142-
} catch (Exception ioCloseError) {
143-
if (closeError != null) {
144-
closeError.addSuppressed(ioCloseError);
145-
} else {
146-
closeError = new IOException("Failed to close FileIO", ioCloseError);
147-
}
148-
} finally {
149-
io = null;
150-
}
151-
}
152133
activeIcebergWriters.dec();
153134
}
154135

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2121
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2222

23+
import java.io.Closeable;
2324
import java.io.IOException;
2425
import java.time.Duration;
2526
import java.time.Instant;
@@ -29,8 +30,10 @@
2930
import java.time.format.DateTimeFormatter;
3031
import java.time.temporal.ChronoUnit;
3132
import java.util.ArrayList;
33+
import java.util.HashSet;
3234
import java.util.List;
3335
import java.util.Map;
36+
import java.util.Set;
3437
import java.util.UUID;
3538
import java.util.concurrent.TimeUnit;
3639
import org.apache.beam.sdk.schemas.Schema;
@@ -58,6 +61,7 @@
5861
import org.apache.iceberg.data.Record;
5962
import org.apache.iceberg.exceptions.AlreadyExistsException;
6063
import org.apache.iceberg.exceptions.NoSuchTableException;
64+
import org.apache.iceberg.io.FileIO;
6165
import org.apache.iceberg.transforms.Transforms;
6266
import org.checkerframework.checker.nullness.qual.Nullable;
6367
import org.slf4j.Logger;
@@ -403,33 +407,50 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
403407
*/
404408
@Override
405409
public void close() throws IOException {
406-
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
407-
windowedDestinationAndState : destinations.entrySet()) {
408-
DestinationState state = windowedDestinationAndState.getValue();
410+
try {
411+
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
412+
windowedDestinationAndState : destinations.entrySet()) {
413+
DestinationState state = windowedDestinationAndState.getValue();
409414

410-
// removing writers from the state's cache will trigger the logic to collect each writer's
411-
// data file.
412-
state.writers.invalidateAll();
413-
// first check for any exceptions swallowed by the cache
414-
if (!state.exceptions.isEmpty()) {
415-
IllegalStateException exception =
416-
new IllegalStateException(
417-
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
418-
for (Exception e : state.exceptions) {
419-
exception.addSuppressed(e);
415+
// removing writers from the state's cache will trigger the logic to collect each writer's
416+
// data file.
417+
state.writers.invalidateAll();
418+
// first check for any exceptions swallowed by the cache
419+
if (!state.exceptions.isEmpty()) {
420+
IllegalStateException exception =
421+
new IllegalStateException(
422+
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
423+
for (Exception e : state.exceptions) {
424+
exception.addSuppressed(e);
425+
}
426+
throw exception;
420427
}
421-
throw exception;
422-
}
423428

424-
if (state.dataFiles.isEmpty()) {
425-
continue;
426-
}
429+
if (state.dataFiles.isEmpty()) {
430+
continue;
431+
}
427432

428-
totalSerializableDataFiles.put(
429-
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
430-
state.dataFiles.clear();
433+
totalSerializableDataFiles.put(
434+
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
435+
state.dataFiles.clear();
436+
}
437+
} finally {
438+
// Close unique FileIO instances now that all writers are done.
439+
// table.io() returns the catalog's shared FileIO; we deduplicate by identity
440+
// so we close each underlying connection pool exactly once.
441+
Set<FileIO> closedIOs = new HashSet<>();
442+
for (DestinationState state : destinations.values()) {
443+
FileIO io = state.table.io();
444+
if (io instanceof Closeable && closedIOs.add(io)) {
445+
try {
446+
((Closeable) io).close();
447+
} catch (IOException e) {
448+
LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e);
449+
}
450+
}
451+
}
452+
destinations.clear();
431453
}
432-
destinations.clear();
433454
checkArgument(
434455
openWriters == 0,
435456
"Expected all data writers to be closed, but found %s data writer(s) still open",

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.iceberg.catalog.Catalog;
6666
import org.apache.iceberg.catalog.Namespace;
6767
import org.apache.iceberg.catalog.TableIdentifier;
68+
import org.apache.iceberg.data.Record;
6869
import org.apache.iceberg.hadoop.HadoopCatalog;
6970
import org.apache.iceberg.io.FileIO;
7071
import org.apache.iceberg.io.InputFile;
@@ -957,7 +958,7 @@ public void testDefaultMetrics() throws IOException {
957958
}
958959

959960
@Test
960-
public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
961+
public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException {
961962
TableIdentifier tableId =
962963
TableIdentifier.of(
963964
"default",
@@ -980,7 +981,86 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
980981
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
981982
writer.close();
982983

983-
assertTrue("FileIO should be closed after writer close", trackingFileIO.closed);
984+
// RecordWriter must NOT close FileIO — it's the catalog's shared instance.
985+
assertFalse("RecordWriter.close() must not close the shared FileIO", trackingFileIO.closed);
986+
}
987+
988+
/**
989+
* Verifies that when multiple writers share the same FileIO, closing any writer does not close
990+
* the shared FileIO — that is the responsibility of RecordWriterManager.close().
991+
*/
992+
@Test
993+
public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOException {
994+
// Create two tables that share the same FileIO (simulating dynamic destinations
995+
// backed by the same catalog)
996+
TableIdentifier tableId1 =
997+
TableIdentifier.of(
998+
"default",
999+
"table_batch_close_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6));
1000+
TableIdentifier tableId2 =
1001+
TableIdentifier.of(
1002+
"default",
1003+
"table_batch_close_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6));
1004+
Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
1005+
Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
1006+
1007+
// Both tables share the same CloseTrackingFileIO — mirrors how a catalog returns
1008+
// the same shared FileIO instance for all tables
1009+
CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io());
1010+
Table spyTable1 = Mockito.spy(table1);
1011+
Table spyTable2 = Mockito.spy(table2);
1012+
Mockito.doReturn(sharedFileIO).when(spyTable1).io();
1013+
Mockito.doReturn(sharedFileIO).when(spyTable2).io();
1014+
1015+
PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema());
1016+
PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema());
1017+
1018+
RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, "file1.parquet", pk1);
1019+
RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, "file2.parquet", pk2);
1020+
1021+
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
1022+
Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row);
1023+
1024+
writer1.write(record);
1025+
writer2.write(record);
1026+
1027+
writer1.close();
1028+
assertFalse("FileIO must remain open between batch writer closes", sharedFileIO.closed);
1029+
1030+
writer2.close();
1031+
assertFalse("FileIO must remain open after all writers close", sharedFileIO.closed);
1032+
1033+
// Both writers produced valid data files
1034+
assertNotNull(writer1.getDataFile());
1035+
assertNotNull(writer2.getDataFile());
1036+
}
1037+
1038+
/**
1039+
* Verifies that RecordWriterManager.close() successfully flushes data files from multiple
1040+
* destinations.
1041+
*/
1042+
@Test
1043+
public void testRecordWriterManagerCloseFlushesAllDestinations() throws IOException {
1044+
String tableName1 =
1045+
"table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
1046+
String tableName2 =
1047+
"table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
1048+
WindowedValue<IcebergDestination> dest1 = getWindowedDestination(tableName1, null);
1049+
WindowedValue<IcebergDestination> dest2 = getWindowedDestination(tableName2, null);
1050+
1051+
RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3);
1052+
1053+
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
1054+
assertTrue(writerManager.write(dest1, row));
1055+
assertTrue(writerManager.write(dest2, row));
1056+
assertEquals(2, writerManager.openWriters);
1057+
1058+
writerManager.close();
1059+
1060+
Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
1061+
writerManager.getSerializableDataFiles();
1062+
assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1));
1063+
assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2));
9841064
}
9851065

9861066
private static final class CloseTrackingFileIO implements FileIO {

0 commit comments

Comments
 (0)