Skip to content

Commit 0cdd1b4

Browse files
authored
Use static shared catalog and remove premature FileIO close (#38149)
* Fix IcebergIO connection pool crash by moving FileIO lifecycle to @teardown * trigger build * move catalog init to @setup, make table cache per-DoFn * fix spotless and import * use static shared catalog * trigger integration tests * trigger integration test
1 parent b4e1d57 commit 0cdd1b4

7 files changed

Lines changed: 78 additions & 57 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 4
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 1
3+
"modification": 2
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -190,17 +190,15 @@ private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWrit
190190
int specId = entry.getKey();
191191
List<DataFile> files = entry.getValue();
192192
PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId));
193-
ManifestWriter<DataFile> writer;
194-
try (FileIO io = table.io()) {
195-
writer = createManifestWriter(table.location(), uuid, spec, io);
196-
for (DataFile file : files) {
197-
writer.add(file);
198-
committedDataFileByteSize.update(file.fileSizeInBytes());
199-
committedDataFileRecordCount.update(file.recordCount());
200-
}
201-
writer.close();
202-
update.appendManifest(writer.toManifestFile());
193+
FileIO io = table.io();
194+
ManifestWriter<DataFile> writer = createManifestWriter(table.location(), uuid, spec, io);
195+
for (DataFile file : files) {
196+
writer.add(file);
197+
committedDataFileByteSize.update(file.fileSizeInBytes());
198+
committedDataFileRecordCount.update(file.recordCount());
203199
}
200+
writer.close();
201+
update.appendManifest(writer.toManifestFile());
204202
}
205203
update.commit();
206204
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Set;
26+
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.stream.Collectors;
2728
import org.apache.beam.sdk.schemas.Schema;
2829
import org.apache.beam.sdk.util.ReleaseInfo;
@@ -45,7 +46,6 @@
4546
import org.apache.iceberg.exceptions.AlreadyExistsException;
4647
import org.apache.iceberg.exceptions.NoSuchTableException;
4748
import org.apache.iceberg.types.Type;
48-
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
4949
import org.checkerframework.checker.nullness.qual.Nullable;
5050
import org.checkerframework.dataflow.qual.Pure;
5151
import org.slf4j.Logger;
@@ -54,7 +54,8 @@
5454
@AutoValue
5555
public abstract class IcebergCatalogConfig implements Serializable {
5656
private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogConfig.class);
57-
private transient @MonotonicNonNull Catalog cachedCatalog;
57+
private static final ConcurrentHashMap<IcebergCatalogConfig, Catalog> CATALOG_CACHE =
58+
new ConcurrentHashMap<>();
5859

5960
@Pure
6061
@Nullable
@@ -75,27 +76,28 @@ public static Builder builder() {
7576

7677
public abstract Builder toBuilder();
7778

78-
public org.apache.iceberg.catalog.Catalog catalog() {
79-
if (cachedCatalog == null) {
80-
String catalogName = getCatalogName();
81-
if (catalogName == null) {
82-
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
83-
}
84-
Map<String, String> catalogProps = getCatalogProperties();
85-
if (catalogProps == null) {
86-
catalogProps = Maps.newHashMap();
87-
}
88-
Map<String, String> confProps = getConfigProperties();
89-
if (confProps == null) {
90-
confProps = Maps.newHashMap();
91-
}
92-
Configuration config = new Configuration();
93-
for (Map.Entry<String, String> prop : confProps.entrySet()) {
94-
config.set(prop.getKey(), prop.getValue());
95-
}
96-
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
79+
public Catalog catalog() {
80+
return CATALOG_CACHE.computeIfAbsent(this, IcebergCatalogConfig::buildCatalog);
81+
}
82+
83+
private static Catalog buildCatalog(IcebergCatalogConfig catalogConfig) {
84+
String catalogName = catalogConfig.getCatalogName();
85+
if (catalogName == null) {
86+
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
87+
}
88+
Map<String, String> catalogProps = catalogConfig.getCatalogProperties();
89+
if (catalogProps == null) {
90+
catalogProps = Maps.newHashMap();
91+
}
92+
Map<String, String> confProps = catalogConfig.getConfigProperties();
93+
if (confProps == null) {
94+
confProps = Maps.newHashMap();
95+
}
96+
Configuration config = new Configuration();
97+
for (Map.Entry<String, String> prop : confProps.entrySet()) {
98+
config.set(prop.getKey(), prop.getValue());
9799
}
98-
return cachedCatalog;
100+
return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
99101
}
100102

101103
private void checkSupportsNamespaces() {

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@
2929
import java.time.format.DateTimeFormatter;
3030
import java.time.temporal.ChronoUnit;
3131
import java.util.ArrayList;
32-
import java.util.HashSet;
3332
import java.util.List;
3433
import java.util.Map;
35-
import java.util.Set;
3634
import java.util.UUID;
3735
import java.util.concurrent.TimeUnit;
3836
import org.apache.beam.sdk.schemas.Schema;
@@ -60,7 +58,6 @@
6058
import org.apache.iceberg.data.Record;
6159
import org.apache.iceberg.exceptions.AlreadyExistsException;
6260
import org.apache.iceberg.exceptions.NoSuchTableException;
63-
import org.apache.iceberg.io.FileIO;
6461
import org.apache.iceberg.transforms.Transforms;
6562
import org.checkerframework.checker.nullness.qual.Nullable;
6663
import org.slf4j.Logger;
@@ -434,20 +431,6 @@ public void close() throws IOException {
434431
state.dataFiles.clear();
435432
}
436433
} finally {
437-
// Close unique FileIO instances now that all writers are done.
438-
// table.io() may return a shared FileIO; we deduplicate by identity
439-
// so we close each underlying connection pool exactly once.
440-
Set<FileIO> closedIOs = new HashSet<>();
441-
for (DestinationState state : destinations.values()) {
442-
FileIO io = state.table.io();
443-
if (io != null && closedIOs.add(io)) {
444-
try {
445-
io.close();
446-
} catch (Exception e) {
447-
LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e);
448-
}
449-
}
450-
}
451434
destinations.clear();
452435
}
453436
checkArgument(

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,7 @@ public void close() throws IOException {
209209
fileScanTasks.clear();
210210
fileScanTasks = null;
211211
}
212-
if (io != null) {
213-
io.close();
214-
io = null;
215-
}
212+
io = null;
216213
}
217214

218215
@Override

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,7 +1040,7 @@ public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOExcepti
10401040
* closes the shared FileIO.
10411041
*/
10421042
@Test
1043-
public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException {
1043+
public void testRecordWriterManagerDoesNotCloseSharedFileIO() throws IOException {
10441044
String tableName1 =
10451045
"table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
10461046
String tableName2 =
@@ -1078,7 +1078,8 @@ public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOExcep
10781078
writerManager.getSerializableDataFiles();
10791079
assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1));
10801080
assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2));
1081-
assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed);
1081+
assertFalse(
1082+
"Shared FileIO should NOT be closed by RecordWriterManager", sharedTrackingIO.closed);
10821083
}
10831084

10841085
private static final class CloseTrackingFileIO implements FileIO {
@@ -1201,4 +1202,44 @@ public void testGetOrCreateTable_refreshLogic() {
12011202
// Verify that refresh() WAS called exactly once because the entry was stale.
12021203
verify(mockTable, times(1)).refresh();
12031204
}
1205+
1206+
/**
1207+
* Verifies that the shared FileIO survives across multiple bundles. This is the core regression
1208+
* test: if RecordWriterManager.close() closed the FileIO, the second bundle would fail with
1209+
* "Connection pool shut down".
1210+
*/
1211+
@Test
1212+
public void testFileIOSurvivesAcrossBundles() throws IOException {
1213+
String tableName =
1214+
"table_survive_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
1215+
TableIdentifier tableId = TableIdentifier.of("default", tableName);
1216+
1217+
Table realTable = warehouse.createTable(tableId, ICEBERG_SCHEMA);
1218+
1219+
CloseTrackingFileIO sharedIO = new CloseTrackingFileIO(realTable.io());
1220+
Table spyTable = Mockito.spy(realTable);
1221+
Mockito.doReturn(sharedIO).when(spyTable).io();
1222+
1223+
Catalog spyCatalog = Mockito.spy(catalog);
1224+
Mockito.doReturn(spyTable).when(spyCatalog).loadTable(tableId);
1225+
1226+
WindowedValue<IcebergDestination> dest = getWindowedDestination(tableName, null);
1227+
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
1228+
1229+
// Bundle 1: write and close
1230+
RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog, "file_b1", 1000, 3);
1231+
assertTrue(bundle1.write(dest, row));
1232+
bundle1.close();
1233+
assertFalse("FileIO must survive after bundle 1 close", sharedIO.closed);
1234+
assertTrue(
1235+
"Bundle 1 should produce data files", bundle1.getSerializableDataFiles().containsKey(dest));
1236+
1237+
// Bundle 2: write and close using the same catalog (simulates DoFn reuse)
1238+
RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog, "file_b2", 1000, 3);
1239+
assertTrue(bundle2.write(dest, row));
1240+
bundle2.close();
1241+
assertFalse("FileIO must survive after bundle 2 close", sharedIO.closed);
1242+
assertTrue(
1243+
"Bundle 2 should produce data files", bundle2.getSerializableDataFiles().containsKey(dest));
1244+
}
12041245
}

0 commit comments

Comments
 (0)