Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
import org.apache.gravitino.utils.IsolatedClassLoader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.jdbc.JdbcCatalogWithMetadataLocationSupport;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.RESTCatalog;
Expand Down Expand Up @@ -79,6 +83,7 @@ public class IcebergCatalogWrapper implements AutoCloseable {
private final IcebergConfig icebergConfig;
private String catalogUri = null;
private volatile TableMetadataCache metadataCache;
private volatile FileIO metadataFileIO;
private final Configuration configuration;

public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
Expand Down Expand Up @@ -246,6 +251,7 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier) {
return LoadTableResponse.builder().withTableMetadata(tableMetadataOptional.get()).build();
}

checkMetadataFileBeforeLoad(tableIdentifier);
LoadTableResponse loadTableResponse = CatalogHandlers.loadTable(getCatalog(), tableIdentifier);
if (loadTableResponse != null) {
getMetadataCache().updateTableMetadata(tableIdentifier, loadTableResponse.tableMetadata());
Expand All @@ -272,6 +278,14 @@ public Optional<String> getTableMetadataLocation(TableIdentifier tableIdentifier
}

public boolean tableExists(TableIdentifier tableIdentifier) {
Optional<String> metadataLocation = getTableMetadataLocation(tableIdentifier);
if (metadataLocation.isPresent() && shouldCheckMetadataFileExists()) {
Optional<Boolean> metadataFileExists = tryMetadataFileExists(metadataLocation.get());
if (metadataFileExists.isPresent()) {
return metadataFileExists.get();
}
}

return getCatalog().tableExists(tableIdentifier);
}

Expand Down Expand Up @@ -365,6 +379,10 @@ public void close() throws Exception {
if (cache != null) {
cache.close();
}
FileIO loadedMetadataFileIO = metadataFileIO;
if (loadedMetadataFileIO != null) {
loadedMetadataFileIO.close();
}

// For Iceberg REST server which use the same classloader when recreating catalog wrapper, the
// Driver couldn't be reloaded after deregister()
Expand All @@ -389,6 +407,57 @@ protected boolean useDifferentClassLoader() {
return true;
}

private void checkMetadataFileBeforeLoad(TableIdentifier tableIdentifier) {
Optional<String> metadataLocation = getTableMetadataLocation(tableIdentifier);
if (metadataLocation.isEmpty() || !shouldCheckMetadataFileExists()) {
return;
}

// exists() converts missing metadata files to false before Iceberg's metadata read retry loop.
Optional<Boolean> metadataFileExists = tryMetadataFileExists(metadataLocation.get());
if (metadataFileExists.isPresent() && !metadataFileExists.get()) {
throw new NoSuchTableException(
"Iceberg table metadata file does not exist for table %s: %s",
tableIdentifier, metadataLocation.get());
}
}

private boolean shouldCheckMetadataFileExists() {
return StringUtils.isNotBlank(icebergConfig.get(IcebergConfig.IO_IMPL));
}

private Optional<Boolean> tryMetadataFileExists(String metadataLocation) {
FileIO fileIO = getMetadataFileIO();
try {
return Optional.of(fileIO.newInputFile(metadataLocation).exists());
} catch (IllegalArgumentException | ValidationException e) {
LOG.debug(
"Skip metadata file existence check because FileIO {} cannot handle metadata location {}",
icebergConfig.get(IcebergConfig.IO_IMPL),
metadataLocation,
e);
return Optional.empty();
}
}

private FileIO getMetadataFileIO() {
FileIO loadedMetadataFileIO = metadataFileIO;
if (loadedMetadataFileIO != null) {
return loadedMetadataFileIO;
}

synchronized (initializationLock) {
if (metadataFileIO == null) {
metadataFileIO =
CatalogUtil.loadFileIO(
icebergConfig.get(IcebergConfig.IO_IMPL),
icebergConfig.getIcebergCatalogProperties(),
configuration);
}
return metadataFileIO;
}
}

private void closeJdbcDriverResources() {
// Because each catalog in Gravitino has its own classloader, after a catalog is no longer used
// for a long time or dropped, the instance of classloader needs to be released. In order to
Expand Down
Loading