From c22552692f9ce4c886bb699733468d57eacf7509 Mon Sep 17 00:00:00 2001 From: atovk Date: Mon, 11 May 2026 21:07:57 +0800 Subject: [PATCH 1/2] [#11039] fix(iceberg): Fast fail missing OSS table metadata A stale Iceberg catalog entry can point at an OSS metadata file that no longer exists. Loading that table through the Aliyun OSS path may spend a long time in Iceberg's metadata read retry loop while Gravitino table operations still hold tree locks. Use SupportsMetadataLocation to check OSS metadata with the configured Iceberg FileIO before loadTable and tableExists enter the full catalog load path. Missing OSS metadata now returns the normal no-such-table result without calling catalog.loadTable. Tests: - JAVA_HOME=/Users/nullwo/.gradle/jdks/amazon_com_inc_-17-aarch64-os_x/amazon-corretto-17.jdk/Contents/Home ./gradlew --no-daemon --max-workers=1 :iceberg:iceberg-common:test -PskipITs - JAVA_HOME=/Users/nullwo/.gradle/jdks/amazon_com_inc_-17-aarch64-os_x/amazon-corretto-17.jdk/Contents/Home ./gradlew --no-daemon --max-workers=1 :iceberg:iceberg-rest-server:test -PskipITs --- .../common/ops/IcebergCatalogWrapper.java | 56 ++++++ .../common/ops/TestIcebergCatalogWrapper.java | 160 ++++++++++++++++++ 2 files changed, 216 insertions(+) diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java index 97919418a69..2201e6ce093 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java @@ -37,6 +37,7 @@ import org.apache.gravitino.utils.IsolatedClassLoader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; @@ -44,6 +45,8 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.jdbc.JdbcCatalogWithMetadataLocationSupport; import org.apache.iceberg.rest.CatalogHandlers; import org.apache.iceberg.rest.RESTCatalog; @@ -71,6 +74,7 @@ public class IcebergCatalogWrapper implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapper.class); + private static final String OSS_LOCATION_PREFIX = "oss://"; private final Object initializationLock = new Object(); private volatile Catalog catalog; @@ -79,6 +83,7 @@ public class IcebergCatalogWrapper implements AutoCloseable { private final IcebergConfig icebergConfig; private String catalogUri = null; private volatile TableMetadataCache metadataCache; + private volatile FileIO metadataFileIO; private final Configuration configuration; public IcebergCatalogWrapper(IcebergConfig icebergConfig) { @@ -246,6 +251,7 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier) { return LoadTableResponse.builder().withTableMetadata(tableMetadataOptional.get()).build(); } + checkMetadataFileBeforeLoad(tableIdentifier); LoadTableResponse loadTableResponse = CatalogHandlers.loadTable(getCatalog(), tableIdentifier); if (loadTableResponse != null) { getMetadataCache().updateTableMetadata(tableIdentifier, loadTableResponse.tableMetadata()); @@ -272,6 +278,11 @@ public Optional getTableMetadataLocation(TableIdentifier tableIdentifier } public boolean tableExists(TableIdentifier tableIdentifier) { + Optional metadataLocation = getTableMetadataLocation(tableIdentifier); + if (metadataLocation.isPresent() && shouldCheckMetadataFileExists(metadataLocation.get())) { + return metadataFileExists(metadataLocation.get()); + } + return getCatalog().tableExists(tableIdentifier); } @@ -365,6 +376,10 @@ public void close() throws Exception { if (cache != null) { cache.close(); } + FileIO loadedMetadataFileIO = metadataFileIO; + if (loadedMetadataFileIO != null) { + loadedMetadataFileIO.close(); + } // For Iceberg REST server which use the same classloader when recreating catalog wrapper, the // Driver couldn't be reloaded after deregister() @@ -389,6 +404,47 @@ protected boolean useDifferentClassLoader() { return true; } + private void checkMetadataFileBeforeLoad(TableIdentifier tableIdentifier) { + Optional metadataLocation = getTableMetadataLocation(tableIdentifier); + if (metadataLocation.isEmpty() || !shouldCheckMetadataFileExists(metadataLocation.get())) { + return; + } + + // Aliyun OSS exists() converts missing keys to false before Iceberg's metadata read retry loop. + if (!metadataFileExists(metadataLocation.get())) { + throw new NoSuchTableException( + "Iceberg table metadata file does not exist for table %s: %s", + tableIdentifier, metadataLocation.get()); + } + } + + private boolean shouldCheckMetadataFileExists(String metadataLocation) { + return StringUtils.startsWithIgnoreCase(metadataLocation, OSS_LOCATION_PREFIX) + && StringUtils.isNotBlank(icebergConfig.get(IcebergConfig.IO_IMPL)); + } + + private boolean metadataFileExists(String metadataLocation) { + return getMetadataFileIO().newInputFile(metadataLocation).exists(); + } + + private FileIO getMetadataFileIO() { + FileIO loadedMetadataFileIO = metadataFileIO; + if (loadedMetadataFileIO != null) { + return loadedMetadataFileIO; + } + + synchronized (initializationLock) { + if (metadataFileIO == null) { + metadataFileIO = + CatalogUtil.loadFileIO( + icebergConfig.get(IcebergConfig.IO_IMPL), + icebergConfig.getIcebergCatalogProperties(), + configuration); + } + return metadataFileIO; + } + } + private void closeJdbcDriverResources() { // Because each catalog in Gravitino has its own classloader, after a catalog is no longer used // for a long time or dropped, the instance of classloader needs to be released. In order to diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java index e34f1becb3a..6f3e4b2c1f6 100644 --- a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java @@ -22,6 +22,7 @@ import java.lang.reflect.Method; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,13 +31,25 @@ import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.cache.SupportsMetadataLocation; import org.apache.gravitino.iceberg.common.cache.TableMetadataCache; +import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SeekableInputStream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; public class TestIcebergCatalogWrapper { + private static final TableIdentifier MISSING_METADATA_TABLE = + TableIdentifier.of("db", "missing_metadata"); + private static final String MISSING_METADATA_LOCATION = + "oss://bucket/db/missing_metadata/metadata/00000.metadata.json"; @Test public void testCatalogShouldBeLazyLoaded() { @@ -77,6 +90,41 @@ public void testMetadataCacheShouldInitializeOnFirstAccessAndClose(@TempDir Path Assertions.assertTrue(TrackingTableMetadataCache.CLOSED.get()); } + @Test + public void testLoadTableShouldFastFailWhenOssMetadataFileIsMissing() throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(missingMetadataConfig()))) { + NoSuchTableException exception = + Assertions.assertThrows( + NoSuchTableException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); + + Assertions.assertTrue(exception.getMessage().contains(MISSING_METADATA_LOCATION)); + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.EXISTS_COUNT.get()); + Assertions.assertEquals(0, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + @Test + public void testTableExistsShouldFastFailWhenOssMetadataFileIsMissing() throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(missingMetadataConfig()))) { + Assertions.assertFalse(wrapper.tableExists(MISSING_METADATA_TABLE)); + + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.EXISTS_COUNT.get()); + Assertions.assertEquals(0, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + private static TableMetadataCache invokeGetMetadataCache(IcebergCatalogWrapper wrapper) throws Exception { Method method = IcebergCatalogWrapper.class.getDeclaredMethod("getMetadataCache"); @@ -106,6 +154,118 @@ private static Map metadataConfig(Path warehouseDir) { return config; } + private static Map missingMetadataConfig() { + Map config = new HashMap<>(); + config.put(IcebergConstants.CATALOG_BACKEND, "custom"); + config.put(IcebergConstants.CATALOG_BACKEND_IMPL, MissingMetadataCatalog.class.getName()); + config.put(IcebergConstants.URI, "custom://missing-metadata"); + config.put(IcebergConstants.WAREHOUSE, "unused"); + config.put(IcebergConstants.IO_IMPL, MissingMetadataFileIO.class.getName()); + return config; + } + + public static class MissingMetadataCatalog implements Catalog, SupportsMetadataLocation { + private static final AtomicInteger LOAD_TABLE_COUNT = new AtomicInteger(); + private static final AtomicInteger METADATA_LOCATION_COUNT = new AtomicInteger(); + + private String name; + + static void reset() { + LOAD_TABLE_COUNT.set(0); + METADATA_LOCATION_COUNT.set(0); + } + + @Override + public void initialize(String name, Map properties) { + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public List listTables(Namespace namespace) { + return List.of(MISSING_METADATA_TABLE); + } + + @Override + public Table loadTable(TableIdentifier identifier) { + LOAD_TABLE_COUNT.incrementAndGet(); + throw new RuntimeException("Catalog loadTable should not be called"); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return false; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) {} + + @Override + public String metadataLocation(TableIdentifier tableIdentifier) { + METADATA_LOCATION_COUNT.incrementAndGet(); + return MISSING_METADATA_LOCATION; + } + } + + public static class MissingMetadataFileIO implements FileIO { + private static final AtomicInteger NEW_INPUT_FILE_COUNT = new AtomicInteger(); + private static final AtomicInteger EXISTS_COUNT = new AtomicInteger(); + + static void reset() { + NEW_INPUT_FILE_COUNT.set(0); + EXISTS_COUNT.set(0); + } + + @Override + public InputFile newInputFile(String path) { + NEW_INPUT_FILE_COUNT.incrementAndGet(); + return new MissingMetadataInputFile(path); + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException(); + } + } + + private static class MissingMetadataInputFile implements InputFile { + private final String location; + + private MissingMetadataInputFile(String location) { + this.location = location; + } + + @Override + public long getLength() { + throw new UnsupportedOperationException(); + } + + @Override + public SeekableInputStream newStream() { + throw new UnsupportedOperationException(); + } + + @Override + public String location() { + return location; + } + + @Override + public boolean exists() { + MissingMetadataFileIO.EXISTS_COUNT.incrementAndGet(); + return false; + } + } + public static class TrackingTableMetadataCache implements TableMetadataCache { private static final AtomicInteger INITIALIZE_COUNT = new AtomicInteger(); private static final AtomicBoolean CLOSED = new AtomicBoolean(); From b752d8d23a51c1d44fa869cf1ef84b319be523b4 Mon Sep 17 00:00:00 2001 From: atovk Date: Tue, 12 May 2026 20:27:41 +0800 Subject: [PATCH 2/2] Handle stale Iceberg metadata across storage backends Metadata fast-fail was scoped to OSS, but missing metadata is storage-agnostic once a catalog exposes the latest metadata location. The wrapper now uses the configured FileIO.exists path for any metadata location and falls back only when the FileIO rejects that location, preserving REST cases where S3FileIO is configured while the memory catalog stores local metadata paths. Constraint: Gravitino needs a direct fast-fail before an upstream Iceberg fix is released and adopted Rejected: Keep the oss:// prefix check | misses S3, GCS, ADLS, and local metadata with the same stale-pointer failure mode Rejected: Swallow FileIO initialization failures | hides invalid io-impl configuration Confidence: high Scope-risk: narrow Directive: Keep fallback limited to location incompatibility; do not catch FileIO initialization or configuration failures Tested: ./gradlew --no-daemon --max-workers=1 :iceberg:iceberg-common:test --tests org.apache.gravitino.iceberg.common.ops.TestIcebergCatalogWrapper -PskipITs Tested: ./gradlew --no-daemon --max-workers=1 :iceberg:iceberg-common:test :iceberg:iceberg-rest-server:test -PskipITs Not-tested: Full ./gradlew test Related: apache/iceberg#16299 --- .../common/ops/IcebergCatalogWrapper.java | 35 +++-- .../common/ops/TestIcebergCatalogWrapper.java | 139 +++++++++++++++++- 2 files changed, 155 insertions(+), 19 deletions(-) diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java index 2201e6ce093..7f30127ceaa 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java @@ -46,6 +46,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.jdbc.JdbcCatalogWithMetadataLocationSupport; import org.apache.iceberg.rest.CatalogHandlers; @@ -74,7 +75,6 @@ public class IcebergCatalogWrapper implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapper.class); - private static final String OSS_LOCATION_PREFIX = "oss://"; private final Object initializationLock = new Object(); private volatile Catalog catalog; @@ -279,8 +279,11 @@ public Optional getTableMetadataLocation(TableIdentifier tableIdentifier public boolean tableExists(TableIdentifier tableIdentifier) { Optional metadataLocation = getTableMetadataLocation(tableIdentifier); - if (metadataLocation.isPresent() && shouldCheckMetadataFileExists(metadataLocation.get())) { - return metadataFileExists(metadataLocation.get()); + if (metadataLocation.isPresent() && shouldCheckMetadataFileExists()) { + Optional metadataFileExists = tryMetadataFileExists(metadataLocation.get()); + if (metadataFileExists.isPresent()) { + return metadataFileExists.get(); + } } return getCatalog().tableExists(tableIdentifier); @@ -406,25 +409,35 @@ protected boolean useDifferentClassLoader() { private void checkMetadataFileBeforeLoad(TableIdentifier tableIdentifier) { Optional metadataLocation = getTableMetadataLocation(tableIdentifier); - if (metadataLocation.isEmpty() || !shouldCheckMetadataFileExists(metadataLocation.get())) { + if (metadataLocation.isEmpty() || !shouldCheckMetadataFileExists()) { return; } - // Aliyun OSS exists() converts missing keys to false before Iceberg's metadata read retry loop. - if (!metadataFileExists(metadataLocation.get())) { + // exists() converts missing metadata files to false before Iceberg's metadata read retry loop. + Optional metadataFileExists = tryMetadataFileExists(metadataLocation.get()); + if (metadataFileExists.isPresent() && !metadataFileExists.get()) { throw new NoSuchTableException( "Iceberg table metadata file does not exist for table %s: %s", tableIdentifier, metadataLocation.get()); } } - private boolean shouldCheckMetadataFileExists(String metadataLocation) { - return StringUtils.startsWithIgnoreCase(metadataLocation, OSS_LOCATION_PREFIX) - && StringUtils.isNotBlank(icebergConfig.get(IcebergConfig.IO_IMPL)); + private boolean shouldCheckMetadataFileExists() { + return StringUtils.isNotBlank(icebergConfig.get(IcebergConfig.IO_IMPL)); } - private boolean metadataFileExists(String metadataLocation) { - return getMetadataFileIO().newInputFile(metadataLocation).exists(); + private Optional tryMetadataFileExists(String metadataLocation) { + FileIO fileIO = getMetadataFileIO(); + try { + return Optional.of(fileIO.newInputFile(metadataLocation).exists()); + } catch (IllegalArgumentException | ValidationException e) { + LOG.debug( + "Skip metadata file existence check because FileIO {} cannot handle metadata location {}", + icebergConfig.get(IcebergConfig.IO_IMPL), + metadataLocation, + e); + return Optional.empty(); + } } private FileIO getMetadataFileIO() { diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java index 6f3e4b2c1f6..77f21ceedfc 100644 --- a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java @@ -27,6 +27,8 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.cache.SupportsMetadataLocation; @@ -37,6 +39,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -44,12 +47,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; public class TestIcebergCatalogWrapper { private static final TableIdentifier MISSING_METADATA_TABLE = TableIdentifier.of("db", "missing_metadata"); - private static final String MISSING_METADATA_LOCATION = + private static final String DEFAULT_MISSING_METADATA_LOCATION = "oss://bucket/db/missing_metadata/metadata/00000.metadata.json"; + private static final String LOCAL_MISSING_METADATA_LOCATION = + "/tmp/db/missing_metadata/metadata/00000.metadata.json"; @Test public void testCatalogShouldBeLazyLoaded() { @@ -90,9 +97,12 @@ public void testMetadataCacheShouldInitializeOnFirstAccessAndClose(@TempDir Path Assertions.assertTrue(TrackingTableMetadataCache.CLOSED.get()); } - @Test - public void testLoadTableShouldFastFailWhenOssMetadataFileIsMissing() throws Exception { + @ParameterizedTest + @MethodSource("metadataLocations") + public void testLoadTableShouldFastFailWhenMetadataFileIsMissing(String metadataLocation) + throws Exception { MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(metadataLocation); MissingMetadataFileIO.reset(); try (IcebergCatalogWrapper wrapper = @@ -101,7 +111,7 @@ public void testLoadTableShouldFastFailWhenOssMetadataFileIsMissing() throws Exc Assertions.assertThrows( NoSuchTableException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); - Assertions.assertTrue(exception.getMessage().contains(MISSING_METADATA_LOCATION)); + Assertions.assertTrue(exception.getMessage().contains(metadataLocation)); Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); Assertions.assertEquals(1, MissingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); Assertions.assertEquals(1, MissingMetadataFileIO.EXISTS_COUNT.get()); @@ -109,9 +119,12 @@ public void testLoadTableShouldFastFailWhenOssMetadataFileIsMissing() throws Exc } } - @Test - public void testTableExistsShouldFastFailWhenOssMetadataFileIsMissing() throws Exception { + @ParameterizedTest + @MethodSource("metadataLocations") + public void testTableExistsShouldFastFailWhenMetadataFileIsMissing(String metadataLocation) + throws Exception { MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(metadataLocation); MissingMetadataFileIO.reset(); try (IcebergCatalogWrapper wrapper = @@ -125,6 +138,68 @@ public void testTableExistsShouldFastFailWhenOssMetadataFileIsMissing() throws E } } + @Test + public void testLoadTableShouldFallbackWhenMetadataFileIORejectsLocation() throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(LOCAL_MISSING_METADATA_LOCATION); + RejectingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(rejectingMetadataConfig()))) { + RuntimeException exception = + Assertions.assertThrows( + RuntimeException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); + + Assertions.assertTrue(exception.getMessage().contains("Catalog loadTable")); + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, RejectingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + @Test + public void testTableExistsShouldFallbackWhenMetadataFileIORejectsLocation() throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(LOCAL_MISSING_METADATA_LOCATION); + RejectingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(rejectingMetadataConfig()))) { + Assertions.assertTrue(wrapper.tableExists(MISSING_METADATA_TABLE)); + + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, RejectingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataCatalog.TABLE_EXISTS_COUNT.get()); + } + } + + @Test + public void testLoadTableShouldFailWhenMetadataFileIOCannotInitialize() throws Exception { + MissingMetadataCatalog.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(invalidMetadataFileIOConfig()))) { + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); + + Assertions.assertTrue( + exception.getMessage().contains("Cannot initialize FileIO implementation")); + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(0, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + private static Stream metadataLocations() { + return Stream.of( + DEFAULT_MISSING_METADATA_LOCATION, + "s3://bucket/db/missing_metadata/metadata/00000.metadata.json", + "gs://bucket/db/missing_metadata/metadata/00000.metadata.json", + "abfs://container@account.dfs.core.windows.net/db/missing_metadata/metadata/00000.metadata.json", + LOCAL_MISSING_METADATA_LOCATION, + "file:/tmp/db/missing_metadata/metadata/00000.metadata.json"); + } + private static TableMetadataCache invokeGetMetadataCache(IcebergCatalogWrapper wrapper) throws Exception { Method method = IcebergCatalogWrapper.class.getDeclaredMethod("getMetadataCache"); @@ -164,15 +239,32 @@ private static Map missingMetadataConfig() { return config; } + private static Map invalidMetadataFileIOConfig() { + Map config = missingMetadataConfig(); + config.put(IcebergConstants.IO_IMPL, "invalid.MetadataFileIO"); + return config; + } + + private static Map rejectingMetadataConfig() { + Map config = missingMetadataConfig(); + config.put(IcebergConstants.IO_IMPL, RejectingMetadataFileIO.class.getName()); + return config; + } + public static class MissingMetadataCatalog implements Catalog, SupportsMetadataLocation { private static final AtomicInteger LOAD_TABLE_COUNT = new AtomicInteger(); + private static final AtomicInteger TABLE_EXISTS_COUNT = new AtomicInteger(); private static final AtomicInteger METADATA_LOCATION_COUNT = new AtomicInteger(); + private static final AtomicReference METADATA_LOCATION = + new AtomicReference<>(DEFAULT_MISSING_METADATA_LOCATION); private String name; static void reset() { LOAD_TABLE_COUNT.set(0); + TABLE_EXISTS_COUNT.set(0); METADATA_LOCATION_COUNT.set(0); + METADATA_LOCATION.set(DEFAULT_MISSING_METADATA_LOCATION); } @Override @@ -193,7 +285,13 @@ public List listTables(Namespace namespace) { @Override public Table loadTable(TableIdentifier identifier) { LOAD_TABLE_COUNT.incrementAndGet(); - throw new RuntimeException("Catalog loadTable should not be called"); + throw new RuntimeException("Catalog loadTable was called"); + } + + @Override + public boolean tableExists(TableIdentifier identifier) { + TABLE_EXISTS_COUNT.incrementAndGet(); + return true; } @Override @@ -207,7 +305,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {} @Override public String metadataLocation(TableIdentifier tableIdentifier) { METADATA_LOCATION_COUNT.incrementAndGet(); - return MISSING_METADATA_LOCATION; + return METADATA_LOCATION.get(); } } @@ -266,6 +364,31 @@ public boolean exists() { } } + public static class RejectingMetadataFileIO implements FileIO { + private static final AtomicInteger NEW_INPUT_FILE_COUNT = new AtomicInteger(); + + static void reset() { + NEW_INPUT_FILE_COUNT.set(0); + } + + @Override + public InputFile newInputFile(String path) { + NEW_INPUT_FILE_COUNT.incrementAndGet(); + ValidationException.check(false, "Invalid test metadata location: %s", path); + throw new IllegalStateException("Unreachable"); + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException(); + } + } + public static class TrackingTableMetadataCache implements TableMetadataCache { private static final AtomicInteger INITIALIZE_COUNT = new AtomicInteger(); private static final AtomicBoolean CLOSED = new AtomicBoolean();