Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
"modification": 4
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,15 @@ private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWrit
int specId = entry.getKey();
List<DataFile> files = entry.getValue();
PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId));
ManifestWriter<DataFile> writer;
try (FileIO io = table.io()) {
writer = createManifestWriter(table.location(), uuid, spec, io);
for (DataFile file : files) {
writer.add(file);
committedDataFileByteSize.update(file.fileSizeInBytes());
committedDataFileRecordCount.update(file.recordCount());
}
writer.close();
update.appendManifest(writer.toManifestFile());
FileIO io = table.io();
ManifestWriter<DataFile> writer = createManifestWriter(table.location(), uuid, spec, io);
for (DataFile file : files) {
writer.add(file);
committedDataFileByteSize.update(file.fileSizeInBytes());
committedDataFileRecordCount.update(file.recordCount());
}
writer.close();
update.appendManifest(writer.toManifestFile());
Comment thread
dejii marked this conversation as resolved.
}
update.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.ReleaseInfo;
Expand All @@ -45,7 +46,6 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Type;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
import org.slf4j.Logger;
Expand All @@ -54,7 +54,8 @@
@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogConfig.class);
private transient @MonotonicNonNull Catalog cachedCatalog;
private static final ConcurrentHashMap<IcebergCatalogConfig, Catalog> CATALOG_CACHE =
new ConcurrentHashMap<>();
Comment thread
dejii marked this conversation as resolved.

@Pure
@Nullable
Expand All @@ -75,27 +76,28 @@ public static Builder builder() {

public abstract Builder toBuilder();

public org.apache.iceberg.catalog.Catalog catalog() {
if (cachedCatalog == null) {
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
public Catalog catalog() {
return CATALOG_CACHE.computeIfAbsent(this, IcebergCatalogConfig::buildCatalog);
Comment thread
dejii marked this conversation as resolved.
}

private static Catalog buildCatalog(IcebergCatalogConfig catalogConfig) {
String catalogName = catalogConfig.getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = catalogConfig.getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = catalogConfig.getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
return cachedCatalog;
return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
}

private void checkSupportsNamespaces() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -60,7 +58,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.transforms.Transforms;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -434,20 +431,6 @@ public void close() throws IOException {
state.dataFiles.clear();
}
} finally {
// Close unique FileIO instances now that all writers are done.
// table.io() may return a shared FileIO; we deduplicate by identity
// so we close each underlying connection pool exactly once.
Set<FileIO> closedIOs = new HashSet<>();
for (DestinationState state : destinations.values()) {
FileIO io = state.table.io();
if (io != null && closedIOs.add(io)) {
try {
io.close();
Comment thread
stankiewicz marked this conversation as resolved.
} catch (Exception e) {
LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e);
}
}
}
destinations.clear();
}
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,7 @@ public void close() throws IOException {
fileScanTasks.clear();
fileScanTasks = null;
}
if (io != null) {
io.close();
io = null;
}
io = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOExcepti
* closes the shared FileIO.
*/
@Test
public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException {
public void testRecordWriterManagerDoesNotCloseSharedFileIO() throws IOException {
String tableName1 =
"table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
String tableName2 =
Expand Down Expand Up @@ -1078,7 +1078,8 @@ public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOExcep
writerManager.getSerializableDataFiles();
assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1));
assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2));
assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed);
assertFalse(
"Shared FileIO should NOT be closed by RecordWriterManager", sharedTrackingIO.closed);
}

private static final class CloseTrackingFileIO implements FileIO {
Expand Down Expand Up @@ -1201,4 +1202,44 @@ public void testGetOrCreateTable_refreshLogic() {
// Verify that refresh() WAS called exactly once because the entry was stale.
verify(mockTable, times(1)).refresh();
}

/**
* Verifies that the shared FileIO survives across multiple bundles. This is the core regression
* test: if RecordWriterManager.close() closed the FileIO, the second bundle would fail with
* "Connection pool shut down".
*/
@Test
public void testFileIOSurvivesAcrossBundles() throws IOException {
String tableName =
"table_survive_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
TableIdentifier tableId = TableIdentifier.of("default", tableName);

Table realTable = warehouse.createTable(tableId, ICEBERG_SCHEMA);

CloseTrackingFileIO sharedIO = new CloseTrackingFileIO(realTable.io());
Table spyTable = Mockito.spy(realTable);
Mockito.doReturn(sharedIO).when(spyTable).io();

Catalog spyCatalog = Mockito.spy(catalog);
Mockito.doReturn(spyTable).when(spyCatalog).loadTable(tableId);

WindowedValue<IcebergDestination> dest = getWindowedDestination(tableName, null);
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();

// Bundle 1: write and close
RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog, "file_b1", 1000, 3);
assertTrue(bundle1.write(dest, row));
bundle1.close();
assertFalse("FileIO must survive after bundle 1 close", sharedIO.closed);
assertTrue(
"Bundle 1 should produce data files", bundle1.getSerializableDataFiles().containsKey(dest));

// Bundle 2: write and close using the same catalog (simulates DoFn reuse)
RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog, "file_b2", 1000, 3);
assertTrue(bundle2.write(dest, row));
bundle2.close();
assertFalse("FileIO must survive after bundle 2 close", sharedIO.closed);
assertTrue(
"Bundle 2 should produce data files", bundle2.getSerializableDataFiles().containsKey(dest));
}
Comment thread
dejii marked this conversation as resolved.
}
Loading