diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java index 97919418a69..7f30127ceaa 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java @@ -37,6 +37,7 @@ import org.apache.gravitino.utils.IsolatedClassLoader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; @@ -44,6 +45,9 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.jdbc.JdbcCatalogWithMetadataLocationSupport; import org.apache.iceberg.rest.CatalogHandlers; import org.apache.iceberg.rest.RESTCatalog; @@ -79,6 +83,7 @@ public class IcebergCatalogWrapper implements AutoCloseable { private final IcebergConfig icebergConfig; private String catalogUri = null; private volatile TableMetadataCache metadataCache; + private volatile FileIO metadataFileIO; private final Configuration configuration; public IcebergCatalogWrapper(IcebergConfig icebergConfig) { @@ -246,6 +251,7 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier) { return LoadTableResponse.builder().withTableMetadata(tableMetadataOptional.get()).build(); } + checkMetadataFileBeforeLoad(tableIdentifier); LoadTableResponse loadTableResponse = CatalogHandlers.loadTable(getCatalog(), tableIdentifier); if (loadTableResponse != null) { getMetadataCache().updateTableMetadata(tableIdentifier, loadTableResponse.tableMetadata()); @@ -272,6 +278,14 @@ public Optional getTableMetadataLocation(TableIdentifier tableIdentifier } public boolean tableExists(TableIdentifier tableIdentifier) { + Optional metadataLocation = getTableMetadataLocation(tableIdentifier); + if (metadataLocation.isPresent() && shouldCheckMetadataFileExists()) { + Optional metadataFileExists = tryMetadataFileExists(metadataLocation.get()); + if (metadataFileExists.isPresent()) { + return metadataFileExists.get(); + } + } + return getCatalog().tableExists(tableIdentifier); } @@ -365,6 +379,10 @@ public void close() throws Exception { if (cache != null) { cache.close(); } + FileIO loadedMetadataFileIO = metadataFileIO; + if (loadedMetadataFileIO != null) { + loadedMetadataFileIO.close(); + } // For Iceberg REST server which use the same classloader when recreating catalog wrapper, the // Driver couldn't be reloaded after deregister() @@ -389,6 +407,57 @@ protected boolean useDifferentClassLoader() { return true; } + private void checkMetadataFileBeforeLoad(TableIdentifier tableIdentifier) { + Optional metadataLocation = getTableMetadataLocation(tableIdentifier); + if (metadataLocation.isEmpty() || !shouldCheckMetadataFileExists()) { + return; + } + + // exists() converts missing metadata files to false before Iceberg's metadata read retry loop. + Optional metadataFileExists = tryMetadataFileExists(metadataLocation.get()); + if (metadataFileExists.isPresent() && !metadataFileExists.get()) { + throw new NoSuchTableException( + "Iceberg table metadata file does not exist for table %s: %s", + tableIdentifier, metadataLocation.get()); + } + } + + private boolean shouldCheckMetadataFileExists() { + return StringUtils.isNotBlank(icebergConfig.get(IcebergConfig.IO_IMPL)); + } + + private Optional tryMetadataFileExists(String metadataLocation) { + FileIO fileIO = getMetadataFileIO(); + try { + return Optional.of(fileIO.newInputFile(metadataLocation).exists()); + } catch (IllegalArgumentException | ValidationException e) { + LOG.debug( + "Skip metadata file existence check because FileIO {} cannot handle metadata location {}", + icebergConfig.get(IcebergConfig.IO_IMPL), + metadataLocation, + e); + return Optional.empty(); + } + } + + private FileIO getMetadataFileIO() { + FileIO loadedMetadataFileIO = metadataFileIO; + if (loadedMetadataFileIO != null) { + return loadedMetadataFileIO; + } + + synchronized (initializationLock) { + if (metadataFileIO == null) { + metadataFileIO = + CatalogUtil.loadFileIO( + icebergConfig.get(IcebergConfig.IO_IMPL), + icebergConfig.getIcebergCatalogProperties(), + configuration); + } + return metadataFileIO; + } + } + private void closeJdbcDriverResources() { // Because each catalog in Gravitino has its own classloader, after a catalog is no longer used // for a long time or dropped, the instance of classloader needs to be released. In order to diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java index e34f1becb3a..77f21ceedfc 100644 --- a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java @@ -22,21 +22,41 @@ import java.lang.reflect.Method; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.cache.SupportsMetadataLocation; import org.apache.gravitino.iceberg.common.cache.TableMetadataCache; +import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SeekableInputStream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; public class TestIcebergCatalogWrapper { + private static final TableIdentifier MISSING_METADATA_TABLE = + TableIdentifier.of("db", "missing_metadata"); + private static final String DEFAULT_MISSING_METADATA_LOCATION = + "oss://bucket/db/missing_metadata/metadata/00000.metadata.json"; + private static final String LOCAL_MISSING_METADATA_LOCATION = + "/tmp/db/missing_metadata/metadata/00000.metadata.json"; @Test public void testCatalogShouldBeLazyLoaded() { @@ -77,6 +97,109 @@ public void testMetadataCacheShouldInitializeOnFirstAccessAndClose(@TempDir Path Assertions.assertTrue(TrackingTableMetadataCache.CLOSED.get()); } + @ParameterizedTest + @MethodSource("metadataLocations") + public void testLoadTableShouldFastFailWhenMetadataFileIsMissing(String metadataLocation) + throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(metadataLocation); + MissingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(missingMetadataConfig()))) { + NoSuchTableException exception = + Assertions.assertThrows( + NoSuchTableException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); + + Assertions.assertTrue(exception.getMessage().contains(metadataLocation)); + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.EXISTS_COUNT.get()); + Assertions.assertEquals(0, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + @ParameterizedTest + @MethodSource("metadataLocations") + public void testTableExistsShouldFastFailWhenMetadataFileIsMissing(String metadataLocation) + throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(metadataLocation); + MissingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(missingMetadataConfig()))) { + Assertions.assertFalse(wrapper.tableExists(MISSING_METADATA_TABLE)); + + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataFileIO.EXISTS_COUNT.get()); + Assertions.assertEquals(0, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + @Test + public void testLoadTableShouldFallbackWhenMetadataFileIORejectsLocation() throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(LOCAL_MISSING_METADATA_LOCATION); + RejectingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(rejectingMetadataConfig()))) { + RuntimeException exception = + Assertions.assertThrows( + RuntimeException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); + + Assertions.assertTrue(exception.getMessage().contains("Catalog loadTable")); + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, RejectingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + @Test + public void testTableExistsShouldFallbackWhenMetadataFileIORejectsLocation() throws Exception { + MissingMetadataCatalog.reset(); + MissingMetadataCatalog.METADATA_LOCATION.set(LOCAL_MISSING_METADATA_LOCATION); + RejectingMetadataFileIO.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(rejectingMetadataConfig()))) { + Assertions.assertTrue(wrapper.tableExists(MISSING_METADATA_TABLE)); + + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(1, RejectingMetadataFileIO.NEW_INPUT_FILE_COUNT.get()); + Assertions.assertEquals(1, MissingMetadataCatalog.TABLE_EXISTS_COUNT.get()); + } + } + + @Test + public void testLoadTableShouldFailWhenMetadataFileIOCannotInitialize() throws Exception { + MissingMetadataCatalog.reset(); + + try (IcebergCatalogWrapper wrapper = + new IcebergCatalogWrapper(new IcebergConfig(invalidMetadataFileIOConfig()))) { + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> wrapper.loadTable(MISSING_METADATA_TABLE)); + + Assertions.assertTrue( + exception.getMessage().contains("Cannot initialize FileIO implementation")); + Assertions.assertEquals(1, MissingMetadataCatalog.METADATA_LOCATION_COUNT.get()); + Assertions.assertEquals(0, MissingMetadataCatalog.LOAD_TABLE_COUNT.get()); + } + } + + private static Stream metadataLocations() { + return Stream.of( + DEFAULT_MISSING_METADATA_LOCATION, + "s3://bucket/db/missing_metadata/metadata/00000.metadata.json", + "gs://bucket/db/missing_metadata/metadata/00000.metadata.json", + "abfs://container@account.dfs.core.windows.net/db/missing_metadata/metadata/00000.metadata.json", + LOCAL_MISSING_METADATA_LOCATION, + "file:/tmp/db/missing_metadata/metadata/00000.metadata.json"); + } + private static TableMetadataCache invokeGetMetadataCache(IcebergCatalogWrapper wrapper) throws Exception { Method method = IcebergCatalogWrapper.class.getDeclaredMethod("getMetadataCache"); @@ -106,6 +229,166 @@ private static Map metadataConfig(Path warehouseDir) { return config; } + private static Map missingMetadataConfig() { + Map config = new HashMap<>(); + config.put(IcebergConstants.CATALOG_BACKEND, "custom"); + config.put(IcebergConstants.CATALOG_BACKEND_IMPL, MissingMetadataCatalog.class.getName()); + config.put(IcebergConstants.URI, "custom://missing-metadata"); + config.put(IcebergConstants.WAREHOUSE, "unused"); + config.put(IcebergConstants.IO_IMPL, MissingMetadataFileIO.class.getName()); + return config; + } + + private static Map invalidMetadataFileIOConfig() { + Map config = missingMetadataConfig(); + config.put(IcebergConstants.IO_IMPL, "invalid.MetadataFileIO"); + return config; + } + + private static Map rejectingMetadataConfig() { + Map config = missingMetadataConfig(); + config.put(IcebergConstants.IO_IMPL, RejectingMetadataFileIO.class.getName()); + return config; + } + + public static class MissingMetadataCatalog implements Catalog, SupportsMetadataLocation { + private static final AtomicInteger LOAD_TABLE_COUNT = new AtomicInteger(); + private static final AtomicInteger TABLE_EXISTS_COUNT = new AtomicInteger(); + private static final AtomicInteger METADATA_LOCATION_COUNT = new AtomicInteger(); + private static final AtomicReference METADATA_LOCATION = + new AtomicReference<>(DEFAULT_MISSING_METADATA_LOCATION); + + private String name; + + static void reset() { + LOAD_TABLE_COUNT.set(0); + TABLE_EXISTS_COUNT.set(0); + METADATA_LOCATION_COUNT.set(0); + METADATA_LOCATION.set(DEFAULT_MISSING_METADATA_LOCATION); + } + + @Override + public void initialize(String name, Map properties) { + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public List listTables(Namespace namespace) { + return List.of(MISSING_METADATA_TABLE); + } + + @Override + public Table loadTable(TableIdentifier identifier) { + LOAD_TABLE_COUNT.incrementAndGet(); + throw new RuntimeException("Catalog loadTable was called"); + } + + @Override + public boolean tableExists(TableIdentifier identifier) { + TABLE_EXISTS_COUNT.incrementAndGet(); + return true; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return false; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) {} + + @Override + public String metadataLocation(TableIdentifier tableIdentifier) { + METADATA_LOCATION_COUNT.incrementAndGet(); + return METADATA_LOCATION.get(); + } + } + + public static class MissingMetadataFileIO implements FileIO { + private static final AtomicInteger NEW_INPUT_FILE_COUNT = new AtomicInteger(); + private static final AtomicInteger EXISTS_COUNT = new AtomicInteger(); + + static void reset() { + NEW_INPUT_FILE_COUNT.set(0); + EXISTS_COUNT.set(0); + } + + @Override + public InputFile newInputFile(String path) { + NEW_INPUT_FILE_COUNT.incrementAndGet(); + return new MissingMetadataInputFile(path); + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException(); + } + } + + private static class MissingMetadataInputFile implements InputFile { + private final String location; + + private MissingMetadataInputFile(String location) { + this.location = location; + } + + @Override + public long getLength() { + throw new UnsupportedOperationException(); + } + + @Override + public SeekableInputStream newStream() { + throw new UnsupportedOperationException(); + } + + @Override + public String location() { + return location; + } + + @Override + public boolean exists() { + MissingMetadataFileIO.EXISTS_COUNT.incrementAndGet(); + return false; + } + } + + public static class RejectingMetadataFileIO implements FileIO { + private static final AtomicInteger NEW_INPUT_FILE_COUNT = new AtomicInteger(); + + static void reset() { + NEW_INPUT_FILE_COUNT.set(0); + } + + @Override + public InputFile newInputFile(String path) { + NEW_INPUT_FILE_COUNT.incrementAndGet(); + ValidationException.check(false, "Invalid test metadata location: %s", path); + throw new IllegalStateException("Unreachable"); + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException(); + } + } + public static class TrackingTableMetadataCache implements TableMetadataCache { private static final AtomicInteger INITIALIZE_COUNT = new AtomicInteger(); private static final AtomicBoolean CLOSED = new AtomicBoolean();