diff --git a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java index ffcdc0ad6ae..9ab9df5588c 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java @@ -192,13 +192,27 @@ public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> internalLoadSchema(ident)); if (!schema.imported()) { - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(ident.namespace().levels()), - LockType.WRITE, - () -> { - importSchema(ident); - return null; - }); + try { + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + importSchema(ident); + return null; + }); + } catch (EntityAlreadyExistsException e) { + // HA race: another Gravitino node concurrently imported this schema. Reload from the + // entity store to verify the entity stored by the winning node is consistent. + LOG.info( + "Schema {} was concurrently imported by another node; reloading from store.", ident); + EntityCombinedSchema reloaded = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> internalLoadSchema(ident)); + if (!reloaded.imported()) { + throw new UnsupportedOperationException( + "Schema managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " + + "To resolve: Remove all catalogs managing this schema, then recreate one catalog to ensure single-catalog management."); + } + } } return schema; @@ -438,10 +452,7 @@ private void importSchema(NameIdentifier identifier) { try { store.put(schemaEntity, true); } catch (EntityAlreadyExistsException e) { - LOG.error("Failed to import schema {} with id {} to the store.", identifier, uid, e); - throw new UnsupportedOperationException( - "Schema managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " - + "To resolve: Remove all catalogs managing this schema, then recreate one catalog to ensure single-catalog management."); + throw e; } catch (Exception e) { LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); throw new RuntimeException("Fail to import schema entity to the store.", e); diff --git a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java index 57499b6f6d0..244edec88de 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java @@ -123,8 +123,22 @@ public Table loadTable(NameIdentifier ident) throws NoSuchTableException { schemaDispatcher.loadSchema(schemaIdent); // Import the table. - entityCombinedTable = - TreeLockUtils.doWithTreeLock(schemaIdent, LockType.WRITE, () -> importTable(ident)); + try { + entityCombinedTable = + TreeLockUtils.doWithTreeLock(schemaIdent, LockType.WRITE, () -> importTable(ident)); + } catch (EntityAlreadyExistsException e) { + // HA race: another Gravitino node concurrently imported this table. Reload from the + // entity store to pick up the entity stored by the winning node. + LOG.info( + "Table {} was concurrently imported by another node; reloading from store.", ident); + entityCombinedTable = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> internalLoadTable(ident)); + if (!entityCombinedTable.imported()) { + throw new UnsupportedOperationException( + "Table managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " + + "To resolve: Remove all catalogs managing this table, then recreate one catalog to ensure single-catalog management."); + } + } } // Update the column entities in Gravitino store if the columns are different from the ones @@ -477,10 +491,7 @@ private EntityCombinedTable importTable(NameIdentifier identifier) { try { store.put(tableEntity, true); } catch (EntityAlreadyExistsException e) { - LOG.error("Failed to import table {} with id {} to the store.", identifier, uid, e); - throw new UnsupportedOperationException( - "Table managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " - + "To resolve: Remove all catalogs managing this table, then recreate one catalog to ensure single-catalog management."); + throw e; } catch (Exception e) { LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); throw new RuntimeException("Fail to import the table entity to the store.", e); diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java index 084eb6c1051..455e428c347 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java @@ -40,6 +40,7 @@ import org.apache.gravitino.Config; import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; @@ -205,6 +206,78 @@ public void testCreateAndLoadSchema() throws IOException { Assertions.assertEquals("test", loadedSchema3.auditInfo().creator()); } + @Test + public void testConcurrentImportSchemaReusesExistingEntity() throws IOException { + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schemaConcurrent"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + dispatcher.createSchema(schemaIdent, "comment", props); + SchemaEntity importedSchemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + SchemaEntity concurrentSchemaEntity = + SchemaEntity.builder() + .withId(importedSchemaEntity.id()) + .withName(schemaIdent.name()) + .withNamespace(schemaIdent.namespace()) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate HA race: first two gets return not-found (so both the pre-import check and the + // internalLoadSchema inside importSchema proceed to store.put), then put throws + // EntityAlreadyExistsException, and the dispatcher-level retry sees the entity on the third + // get. + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(concurrentSchemaEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.SCHEMA), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + Schema loadedSchema = Assertions.assertDoesNotThrow(() -> dispatcher.loadSchema(schemaIdent)); + Assertions.assertEquals(schemaIdent.name(), loadedSchema.name()); + Assertions.assertEquals("comment", loadedSchema.comment()); + } + + @Test + public void testConcurrentImportSchemaFailsOnMismatchedIdentifier() throws IOException { + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schemaConcurrentMismatch"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + dispatcher.createSchema(schemaIdent, "comment", props); + SchemaEntity importedSchemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + SchemaEntity mismatchedSchemaEntity = + SchemaEntity.builder() + .withId(importedSchemaEntity.id() + 1) + .withName(schemaIdent.name()) + .withNamespace(schemaIdent.namespace()) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate genuine multi-catalog conflict: put fails, and the dispatcher-level retry finds + // an entity with a mismatched ID (operateOnEntity returns null → imported=false → error + // thrown). + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(mismatchedSchemaEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.SCHEMA), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, () -> dispatcher.loadSchema(schemaIdent)); + Assertions.assertTrue(exception.getMessage().contains("Schema managed by multiple catalogs")); + } + @Test public void testCreateAndAlterSchema() throws IOException { NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schema21"); diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java index d0016776c9c..9c44e46c7ef 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java @@ -44,9 +44,11 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.gravitino.Config; import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; @@ -256,6 +258,130 @@ public void testCreateAndLoadTable() throws IOException { Assertions.assertEquals("test", loadedTable4.auditInfo().creator()); } + @Test + public void testConcurrentImportTableReusesExistingEntity() throws IOException { + Namespace tableNs = Namespace.of(metalake, catalog, "schema52"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), "comment", props); + + NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableConcurrent"); + Column[] columns = + new Column[] { + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() + }; + + Table table = + tableOperationDispatcher.createTable( + tableIdent, columns, "comment", props, new Transform[0]); + TableEntity importedTableEntity = entityStore.get(tableIdent, TABLE, TableEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + TableEntity concurrentTableEntity = + TableEntity.builder() + .withId(importedTableEntity.id()) + .withName(tableIdent.name()) + .withNamespace(tableIdent.namespace()) + .withColumns( + IntStream.range(0, table.columns().length) + .mapToObj( + i -> + ColumnEntity.toColumnEntity(table.columns()[i], i, 0L, concurrentAudit)) + .collect(Collectors.toList())) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate HA race: first two gets return not-found (so both the pre-import check and the + // internalLoadTable inside importTable proceed to store.put), then put throws + // EntityAlreadyExistsException, and the dispatcher-level retry sees the entity on the third + // get. + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(concurrentTableEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.TABLE), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + Table loadedTable = + Assertions.assertDoesNotThrow(() -> tableOperationDispatcher.loadTable(tableIdent)); + Assertions.assertEquals(tableIdent.name(), loadedTable.name()); + Assertions.assertEquals("comment", loadedTable.comment()); + } + + @Test + public void testConcurrentImportTableFailsOnMismatchedIdentifier() throws IOException { + Namespace tableNs = Namespace.of(metalake, catalog, "schemaConcurrentMismatch"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), "comment", props); + + NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableConcurrentMismatch"); + Column[] columns = + new Column[] { + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() + }; + + Table table = + tableOperationDispatcher.createTable( + tableIdent, columns, "comment", props, new Transform[0]); + TableEntity importedTableEntity = entityStore.get(tableIdent, TABLE, TableEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + TableEntity mismatchedTableEntity = + TableEntity.builder() + .withId(importedTableEntity.id() + 1) + .withName(tableIdent.name()) + .withNamespace(tableIdent.namespace()) + .withColumns( + IntStream.range(0, table.columns().length) + .mapToObj( + i -> + ColumnEntity.toColumnEntity(table.columns()[i], i, 0L, concurrentAudit)) + .collect(Collectors.toList())) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate genuine multi-catalog conflict: put fails, and the dispatcher-level retry finds + // an entity with a mismatched ID (operateOnEntity returns null → imported=false → error + // thrown). + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(mismatchedTableEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.TABLE), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> tableOperationDispatcher.loadTable(tableIdent)); + Assertions.assertTrue(exception.getMessage().contains("Table managed by multiple catalogs")); + } + @Test public void testCreateAndAlterTable() throws IOException { Namespace tableNs = Namespace.of(metalake, catalog, "schema61"); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java index 1bd75e9b59b..9f081571f55 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java @@ -45,9 +45,13 @@ import org.apache.iceberg.rest.responses.LoadCredentialsResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IcebergTableHookDispatcher implements IcebergTableOperationDispatcher { + private static final Logger LOG = LoggerFactory.getLogger(IcebergTableHookDispatcher.class); + private final IcebergTableOperationDispatcher dispatcher; private String metalake; @@ -92,23 +96,10 @@ public LoadTableResponse updateTable( public void dropTable( IcebergRequestContext context, TableIdentifier tableIdentifier, boolean purgeRequested) { dispatcher.dropTable(context, tableIdentifier, purgeRequested); - EntityStore store = GravitinoEnv.getInstance().entityStore(); - try { - if (store != null) { - // Delete the entity for the dropped table. - store.delete( - IcebergIdentifierUtils.toGravitinoTableIdentifier( - metalake, - context.catalogName(), - tableIdentifier, - HierarchicalSchemaUtil.schemaSeparator()), - Entity.EntityType.TABLE); - } - } catch (NoSuchEntityException ignore) { - // Ignore if the table entity does not exist. - } catch (IOException ioe) { - throw new RuntimeException("io exception when deleting table entity", ioe); - } + // Reconcile against Iceberg backend state — without a distributed TreeLock, + // another node may recreate the same table between the drop above and the + // EntityStore delete, leaving a stale Gravitino entity if we blindly delete. + bestEffortReconcileTableEntity(context, tableIdentifier); } @Override @@ -165,6 +156,12 @@ public void renameTable(IcebergRequestContext context, RenameTableRequest rename } catch (IOException ioe) { throw new RuntimeException("io exception when renaming table entity", ioe); } + + // IRC rename can race with another node's drop/create on either name. + // Reconcile both ends against the Iceberg backend so we don't leave a + // stale entity on the source or miss importing a re-created destination. + bestEffortReconcileTableEntity(context, renameTableRequest.source()); + bestEffortReconcileTableEntity(context, renameTableRequest.destination()); } @Override @@ -205,21 +202,73 @@ private void importTableAndSetOwner( // Import is intentionally NOT wrapped in try-catch: if it fails the table exists in Iceberg // but not in Gravitino, and silently swallowing that would mislead callers into thinking the // entity is registered. Surface the failure so the caller can react. + importTableEntity(context.catalogName(), namespace, tableName); + IcebergOwnershipUtils.setTableOwner( + metalake, + context.catalogName(), + namespace, + tableName, + context.userName(), + GravitinoEnv.getInstance().ownerDispatcher()); + } + + private void importTableEntity(String catalogName, Namespace namespace, String tableName) { TableDispatcher tableDispatcher = GravitinoEnv.getInstance().tableDispatcher(); if (tableDispatcher != null) { tableDispatcher.loadTable( IcebergIdentifierUtils.toGravitinoTableIdentifier( metalake, - context.catalogName(), + catalogName, TableIdentifier.of(namespace, tableName), HierarchicalSchemaUtil.schemaSeparator())); } - IcebergOwnershipUtils.setTableOwner( - metalake, - context.catalogName(), - namespace, - tableName, - context.userName(), - GravitinoEnv.getInstance().ownerDispatcher()); + } + + private void reconcileTableEntity( + IcebergRequestContext context, TableIdentifier tableIdentifier) { + // IRC requests can be served by different Gravitino nodes. Without a distributed TreeLock, + // another node may drop or recreate the same Iceberg table between the backend operation and + // this hook's EntityStore mutation. Reconcile the local Gravitino entity with the Iceberg + // backend state to avoid leaving stale/orphan table metadata in multi-node deployments. + if (dispatcher.tableExists(context, tableIdentifier)) { + importTableEntity(context.catalogName(), tableIdentifier.namespace(), tableIdentifier.name()); + return; + } + + deleteTableEntity(context.catalogName(), tableIdentifier); + + if (dispatcher.tableExists(context, tableIdentifier)) { + importTableEntity(context.catalogName(), tableIdentifier.namespace(), tableIdentifier.name()); + } + } + + private void bestEffortReconcileTableEntity( + IcebergRequestContext context, TableIdentifier tableIdentifier) { + try { + reconcileTableEntity(context, tableIdentifier); + } catch (RuntimeException e) { + LOG.warn( + "Failed to reconcile Gravitino table entity after the Iceberg backend operation " + + "succeeded. catalog={}, table={}", + context.catalogName(), + tableIdentifier, + e); + } + } + + private void deleteTableEntity(String catalogName, TableIdentifier tableIdentifier) { + EntityStore store = GravitinoEnv.getInstance().entityStore(); + try { + if (store != null) { + store.delete( + IcebergIdentifierUtils.toGravitinoTableIdentifier( + metalake, catalogName, tableIdentifier, HierarchicalSchemaUtil.schemaSeparator()), + Entity.EntityType.TABLE); + } + } catch (NoSuchEntityException ignore) { + // Ignore if the table entity does not exist. + } catch (IOException ioe) { + throw new RuntimeException("io exception when deleting table entity", ioe); + } } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java index 22d22724cd8..a6e35fef500 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java @@ -97,43 +97,10 @@ public LoadViewResponse replaceView( @Override public void dropView(IcebergRequestContext context, TableIdentifier viewIdentifier) { dispatcher.dropView(context, viewIdentifier); - - // Remove view from Gravitino entity store - EntityStore store = GravitinoEnv.getInstance().entityStore(); - try { - if (store != null) { - store.delete( - IcebergIdentifierUtils.toGravitinoTableIdentifier( - metalake, - context.catalogName(), - viewIdentifier, - HierarchicalSchemaUtil.schemaSeparator()), - Entity.EntityType.VIEW); - LOG.info( - "Successfully removed view from Gravitino entity store: {}.{}.{}.{}", - metalake, - context.catalogName(), - viewIdentifier.namespace(), - viewIdentifier.name()); - } - } catch (NoSuchEntityException ignore) { - // Ignore if the view entity does not exist in the store - LOG.debug( - "View entity does not exist in store: {}.{}.{}.{}", - metalake, - context.catalogName(), - viewIdentifier.namespace(), - viewIdentifier.name()); - } catch (IOException ioe) { - LOG.error( - "Failed to delete view entity from store: {}.{}.{}.{}", - metalake, - context.catalogName(), - viewIdentifier.namespace(), - viewIdentifier.name(), - ioe); - throw new RuntimeException("Failed to delete view entity from store", ioe); - } + // Reconcile against Iceberg backend state — without a distributed TreeLock, + // another node may recreate the same view between the drop above and the + // EntityStore delete, leaving a stale Gravitino entity if we blindly delete. + bestEffortReconcileViewEntity(context, viewIdentifier); } @Override @@ -191,6 +158,12 @@ public void renameView(IcebergRequestContext context, RenameTableRequest renameV LOG.error("Failed to rename view entity in store from {} to {}", sourceIdent, destIdent, ioe); throw new RuntimeException("Failed to rename view entity in store", ioe); } + + // IRC rename can race with another node's drop/create on either name. + // Reconcile both ends against the Iceberg backend so we don't leave a + // stale entity on the source or miss importing a re-created destination. + bestEffortReconcileViewEntity(context, renameViewRequest.source()); + bestEffortReconcileViewEntity(context, renameViewRequest.destination()); } /** @@ -235,4 +208,70 @@ private void importView(String catalogName, Namespace namespace, String viewName } } } + + private void reconcileViewEntity(IcebergRequestContext context, TableIdentifier viewIdentifier) { + // IRC requests can be served by different Gravitino nodes. Without a distributed TreeLock, + // another node may drop or recreate the same Iceberg view between the backend operation and + // this hook's EntityStore mutation. Reconcile the local Gravitino entity with the Iceberg + // backend state to avoid leaving stale/orphan view metadata in multi-node deployments. + if (dispatcher.viewExists(context, viewIdentifier)) { + importView(context.catalogName(), viewIdentifier.namespace(), viewIdentifier.name()); + return; + } + + deleteViewEntity(context.catalogName(), viewIdentifier); + + if (dispatcher.viewExists(context, viewIdentifier)) { + importView(context.catalogName(), viewIdentifier.namespace(), viewIdentifier.name()); + } + } + + private void bestEffortReconcileViewEntity( + IcebergRequestContext context, TableIdentifier viewIdentifier) { + try { + reconcileViewEntity(context, viewIdentifier); + } catch (RuntimeException e) { + LOG.warn( + "Failed to reconcile Gravitino view entity after the Iceberg backend operation " + + "succeeded. catalog={}, view={}", + context.catalogName(), + viewIdentifier, + e); + } + } + + private void deleteViewEntity(String catalogName, TableIdentifier viewIdentifier) { + EntityStore store = GravitinoEnv.getInstance().entityStore(); + try { + if (store != null) { + store.delete( + IcebergIdentifierUtils.toGravitinoTableIdentifier( + metalake, catalogName, viewIdentifier, HierarchicalSchemaUtil.schemaSeparator()), + Entity.EntityType.VIEW); + LOG.info( + "Successfully removed view from Gravitino entity store: {}.{}.{}.{}", + metalake, + catalogName, + viewIdentifier.namespace(), + viewIdentifier.name()); + } + } catch (NoSuchEntityException ignore) { + // Ignore if the view entity does not exist in the store + LOG.debug( + "View entity does not exist in store: {}.{}.{}.{}", + metalake, + catalogName, + viewIdentifier.namespace(), + viewIdentifier.name()); + } catch (IOException ioe) { + LOG.error( + "Failed to delete view entity from store: {}.{}.{}.{}", + metalake, + catalogName, + viewIdentifier.namespace(), + viewIdentifier.name(), + ioe); + throw new RuntimeException("Failed to delete view entity from store", ioe); + } + } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java index 6b03109c94c..2d4f2e125c8 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java @@ -184,6 +184,36 @@ public void testDropTableDeletesEntity() throws IOException { verify(mockEntityStore).delete(expectedIdentifier, Entity.EntityType.TABLE); } + @Test + public void testDropTableReimportsEntityWhenTableExistsAfterDrop() throws IOException { + TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); + when(mockDispatcher.tableExists(mockContext, tableId)).thenReturn(true); + + hookDispatcher.dropTable(mockContext, tableId, false); + + verify(mockDispatcher).dropTable(mockContext, tableId, false); + NameIdentifier expectedIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier( + TEST_METALAKE, TEST_CATALOG, tableId, ":"); + verify(mockEntityStore, never()).delete(expectedIdentifier, Entity.EntityType.TABLE); + verify(mockTableDispatcher).loadTable(expectedIdentifier); + } + + @Test + public void testDropTableReimportsEntityWhenTableIsRecreatedDuringDelete() throws IOException { + TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); + when(mockDispatcher.tableExists(mockContext, tableId)).thenReturn(false, true); + + hookDispatcher.dropTable(mockContext, tableId, false); + + verify(mockDispatcher).dropTable(mockContext, tableId, false); + NameIdentifier expectedIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier( + TEST_METALAKE, TEST_CATALOG, tableId, ":"); + verify(mockEntityStore).delete(expectedIdentifier, Entity.EntityType.TABLE); + verify(mockTableDispatcher).loadTable(expectedIdentifier); + } + @Test public void testDropTableIgnoresNoSuchEntityException() throws IOException { TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); @@ -202,16 +232,12 @@ public void testDropTableIgnoresNoSuchEntityException() throws IOException { } @Test - public void testDropTableThrowsRuntimeExceptionOnIOException() throws IOException { + public void testDropTableIgnoresReconciliationIOException() throws IOException { TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); doThrow(new IOException("IO error")).when(mockEntityStore).delete(any(), any()); - RuntimeException exception = - Assertions.assertThrows( - RuntimeException.class, () -> hookDispatcher.dropTable(mockContext, tableId, false)); - - Assertions.assertTrue(exception.getMessage().contains("io exception when deleting table")); + Assertions.assertDoesNotThrow(() -> hookDispatcher.dropTable(mockContext, tableId, false)); verify(mockDispatcher).dropTable(mockContext, tableId, false); } @@ -242,6 +268,58 @@ public void testRenameTableUpdatesEntity() throws IOException { .update(eq(sourceIdentifier), eq(TableEntity.class), eq(Entity.EntityType.TABLE), any()); } + @Test + public void testRenameTableReconcilesSourceAndDestinationEntities() throws IOException { + TableIdentifier source = TableIdentifier.of("schema1", "old_table"); + TableIdentifier dest = TableIdentifier.of("schema2", "new_table"); + RenameTableRequest request = + RenameTableRequest.builder().withSource(source).withDestination(dest).build(); + + TableEntity mockTableEntity = mock(TableEntity.class); + when(mockTableEntity.id()).thenReturn(1L); + when(mockTableEntity.columns()).thenReturn(Collections.emptyList()); + AuditInfo auditInfo = + AuditInfo.builder().withCreator("original_creator").withCreateTime(Instant.now()).build(); + when(mockTableEntity.auditInfo()).thenReturn(auditInfo); + when(mockEntityStore.update(any(), eq(TableEntity.class), eq(Entity.EntityType.TABLE), any())) + .thenReturn(mockTableEntity); + when(mockDispatcher.tableExists(mockContext, source)).thenReturn(false, false); + when(mockDispatcher.tableExists(mockContext, dest)).thenReturn(true); + + hookDispatcher.renameTable(mockContext, request); + + NameIdentifier sourceIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE, TEST_CATALOG, source, ":"); + NameIdentifier destIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE, TEST_CATALOG, dest, ":"); + verify(mockEntityStore).delete(sourceIdentifier, Entity.EntityType.TABLE); + verify(mockTableDispatcher).loadTable(destIdentifier); + } + + @Test + public void testRenameTableIgnoresReconciliationImportFailure() throws IOException { + TableIdentifier source = TableIdentifier.of("schema1", "old_table"); + TableIdentifier dest = TableIdentifier.of("schema2", "new_table"); + RenameTableRequest request = + RenameTableRequest.builder().withSource(source).withDestination(dest).build(); + + TableEntity mockTableEntity = mock(TableEntity.class); + when(mockTableEntity.id()).thenReturn(1L); + when(mockTableEntity.columns()).thenReturn(Collections.emptyList()); + AuditInfo auditInfo = + AuditInfo.builder().withCreator("original_creator").withCreateTime(Instant.now()).build(); + when(mockTableEntity.auditInfo()).thenReturn(auditInfo); + when(mockEntityStore.update(any(), eq(TableEntity.class), eq(Entity.EntityType.TABLE), any())) + .thenReturn(mockTableEntity); + when(mockDispatcher.tableExists(mockContext, dest)).thenReturn(true); + doThrow(new RuntimeException("import failed")).when(mockTableDispatcher).loadTable(any()); + + Assertions.assertDoesNotThrow(() -> hookDispatcher.renameTable(mockContext, request)); + + verify(mockDispatcher).renameTable(mockContext, request); + verify(mockTableDispatcher).loadTable(any()); + } + @Test public void testRenameTableIgnoresNoSuchEntityException() throws IOException { TableIdentifier source = TableIdentifier.of("schema1", "old_table"); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java index 45faf33f9b5..4fdf1426b97 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java @@ -257,6 +257,34 @@ public void testDropViewRemovesFromEntityStore() throws Exception { verify(mockEntityStore, times(1)).delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); } + @Test + public void testDropViewReimportsEntityWhenViewExistsAfterDrop() throws Exception { + TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); + when(mockExecutor.viewExists(mockContext, viewIdent)).thenReturn(true); + + hookDispatcher.dropView(mockContext, viewIdent); + + verify(mockExecutor, times(1)).dropView(mockContext, viewIdent); + NameIdentifier expectedIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, viewIdent, ":"); + verify(mockEntityStore, never()).delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); + verify(mockViewDispatcher, times(1)).loadView(eq(expectedIdent)); + } + + @Test + public void testDropViewReimportsEntityWhenViewIsRecreatedDuringDelete() throws Exception { + TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); + when(mockExecutor.viewExists(mockContext, viewIdent)).thenReturn(false, true); + + hookDispatcher.dropView(mockContext, viewIdent); + + verify(mockExecutor, times(1)).dropView(mockContext, viewIdent); + NameIdentifier expectedIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, viewIdent, ":"); + verify(mockEntityStore, times(1)).delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); + verify(mockViewDispatcher, times(1)).loadView(eq(expectedIdent)); + } + @Test public void testDropViewHandlesMissingEntity() throws Exception { TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); @@ -276,7 +304,7 @@ public void testDropViewHandlesMissingEntity() throws Exception { } @Test - public void testDropViewHandlesIOException() throws Exception { + public void testDropViewIgnoresReconciliationIOException() throws Exception { TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); // Simulate IO error @@ -286,11 +314,8 @@ public void testDropViewHandlesIOException() throws Exception { .when(mockEntityStore) .delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); - // Should throw RuntimeException wrapping the IOException - RuntimeException exception = - assertThrows(RuntimeException.class, () -> hookDispatcher.dropView(mockContext, viewIdent)); + hookDispatcher.dropView(mockContext, viewIdent); - assertEquals("Failed to delete view entity from store", exception.getMessage()); verify(mockExecutor, times(1)).dropView(mockContext, viewIdent); } @@ -313,6 +338,45 @@ public void testRenameViewUpdatesEntityStore() throws Exception { .update(eq(sourceGravitinoIdent), eq(ViewEntity.class), eq(Entity.EntityType.VIEW), any()); } + @Test + public void testRenameViewReconcilesSourceAndDestinationEntities() throws Exception { + TableIdentifier sourceIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view"); + TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "new_view"); + RenameTableRequest renameRequest = + RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build(); + when(mockExecutor.viewExists(mockContext, sourceIdent)).thenReturn(false, false); + when(mockExecutor.viewExists(mockContext, destIdent)).thenReturn(true); + + hookDispatcher.renameView(mockContext, renameRequest); + + NameIdentifier sourceGravitinoIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, sourceIdent, ":"); + NameIdentifier destGravitinoIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, destIdent, ":"); + verify(mockEntityStore, times(1)).delete(eq(sourceGravitinoIdent), eq(Entity.EntityType.VIEW)); + verify(mockViewDispatcher, times(1)).loadView(eq(destGravitinoIdent)); + } + + @Test + public void testRenameViewIgnoresReconciliationDeleteFailure() throws Exception { + TableIdentifier sourceIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view"); + TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "new_view"); + RenameTableRequest renameRequest = + RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build(); + when(mockExecutor.viewExists(mockContext, sourceIdent)).thenReturn(false); + + NameIdentifier sourceGravitinoIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, sourceIdent, ":"); + doThrow(new IOException("IO error")) + .when(mockEntityStore) + .delete(eq(sourceGravitinoIdent), eq(Entity.EntityType.VIEW)); + + hookDispatcher.renameView(mockContext, renameRequest); + + verify(mockExecutor, times(1)).renameView(mockContext, renameRequest); + verify(mockEntityStore, times(1)).delete(eq(sourceGravitinoIdent), eq(Entity.EntityType.VIEW)); + } + @Test public void testRenameViewHandlesMissingEntity() throws Exception { TableIdentifier sourceIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view");