From eb8103f0960941c73c69d125dcceb7fcc348967e Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 1 Jun 2026 17:45:57 +0800 Subject: [PATCH] [#11040] fix(core): make Iceberg import idempotent under concurrent load Make Iceberg schema and table import idempotent when multiple Gravitino nodes try to import the same object concurrently. If EntityStore.put(.., true) reports the entity already exists, importSchema/importTable now rethrow EntityAlreadyExistsException; the dispatcher catches it, reloads from the store, and reuses the existing entity when it is genuinely imported by a single catalog (only surfacing the "managed by multiple catalogs" error when the reloaded entity is still not imported). Also reconcile Gravitino EntityStore metadata after Iceberg REST Catalog table/view drop and rename operations. Because IRC requests can be served by different nodes and the TreeLock is process-local, another node may drop or recreate the same object between the backend operation and the hook's EntityStore mutation. The hook now checks the current Iceberg backend state (best-effort, no distributed lock) and either re-imports the object or removes the stale Gravitino entity, reducing stale/orphan metadata in multi-node deployments. Fix: #11040 --- .../catalog/SchemaOperationDispatcher.java | 33 +++-- .../catalog/TableOperationDispatcher.java | 23 +++- .../TestSchemaOperationDispatcher.java | 73 ++++++++++ .../catalog/TestTableOperationDispatcher.java | 126 ++++++++++++++++++ .../IcebergTableHookDispatcher.java | 99 ++++++++++---- .../dispatcher/IcebergViewHookDispatcher.java | 113 +++++++++++----- .../TestIcebergTableHookDispatcher.java | 90 ++++++++++++- .../TestIcebergViewHookDispatcher.java | 74 +++++++++- 8 files changed, 541 insertions(+), 90 deletions(-) diff --git a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java index ffcdc0ad6ae..9ab9df5588c 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java @@ -192,13 +192,27 @@ public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> internalLoadSchema(ident)); if (!schema.imported()) { - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(ident.namespace().levels()), - LockType.WRITE, - () -> { - importSchema(ident); - return null; - }); + try { + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + importSchema(ident); + return null; + }); + } catch (EntityAlreadyExistsException e) { + // HA race: another Gravitino node concurrently imported this schema. Reload from the + // entity store to verify the entity stored by the winning node is consistent. + LOG.info( + "Schema {} was concurrently imported by another node; reloading from store.", ident); + EntityCombinedSchema reloaded = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> internalLoadSchema(ident)); + if (!reloaded.imported()) { + throw new UnsupportedOperationException( + "Schema managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " + + "To resolve: Remove all catalogs managing this schema, then recreate one catalog to ensure single-catalog management."); + } + } } return schema; @@ -438,10 +452,7 @@ private void importSchema(NameIdentifier identifier) { try { store.put(schemaEntity, true); } catch (EntityAlreadyExistsException e) { - LOG.error("Failed to import schema {} with id {} to the store.", identifier, uid, e); - throw new UnsupportedOperationException( - "Schema managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " - + "To resolve: Remove all catalogs managing this schema, then recreate one catalog to ensure single-catalog management."); + throw e; } catch (Exception e) { LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); throw new RuntimeException("Fail to import schema entity to the store.", e); diff --git a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java index 57499b6f6d0..244edec88de 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java @@ -123,8 +123,22 @@ public Table loadTable(NameIdentifier ident) throws NoSuchTableException { schemaDispatcher.loadSchema(schemaIdent); // Import the table. - entityCombinedTable = - TreeLockUtils.doWithTreeLock(schemaIdent, LockType.WRITE, () -> importTable(ident)); + try { + entityCombinedTable = + TreeLockUtils.doWithTreeLock(schemaIdent, LockType.WRITE, () -> importTable(ident)); + } catch (EntityAlreadyExistsException e) { + // HA race: another Gravitino node concurrently imported this table. Reload from the + // entity store to pick up the entity stored by the winning node. + LOG.info( + "Table {} was concurrently imported by another node; reloading from store.", ident); + entityCombinedTable = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> internalLoadTable(ident)); + if (!entityCombinedTable.imported()) { + throw new UnsupportedOperationException( + "Table managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " + + "To resolve: Remove all catalogs managing this table, then recreate one catalog to ensure single-catalog management."); + } + } } // Update the column entities in Gravitino store if the columns are different from the ones @@ -477,10 +491,7 @@ private EntityCombinedTable importTable(NameIdentifier identifier) { try { store.put(tableEntity, true); } catch (EntityAlreadyExistsException e) { - LOG.error("Failed to import table {} with id {} to the store.", identifier, uid, e); - throw new UnsupportedOperationException( - "Table managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. " - + "To resolve: Remove all catalogs managing this table, then recreate one catalog to ensure single-catalog management."); + throw e; } catch (Exception e) { LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); throw new RuntimeException("Fail to import the table entity to the store.", e); diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java index 084eb6c1051..455e428c347 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java @@ -40,6 +40,7 @@ import org.apache.gravitino.Config; import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; @@ -205,6 +206,78 @@ public void testCreateAndLoadSchema() throws IOException { Assertions.assertEquals("test", loadedSchema3.auditInfo().creator()); } + @Test + public void testConcurrentImportSchemaReusesExistingEntity() throws IOException { + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schemaConcurrent"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + dispatcher.createSchema(schemaIdent, "comment", props); + SchemaEntity importedSchemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + SchemaEntity concurrentSchemaEntity = + SchemaEntity.builder() + .withId(importedSchemaEntity.id()) + .withName(schemaIdent.name()) + .withNamespace(schemaIdent.namespace()) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate HA race: first two gets return not-found (so both the pre-import check and the + // internalLoadSchema inside importSchema proceed to store.put), then put throws + // EntityAlreadyExistsException, and the dispatcher-level retry sees the entity on the third + // get. + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(concurrentSchemaEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.SCHEMA), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + Schema loadedSchema = Assertions.assertDoesNotThrow(() -> dispatcher.loadSchema(schemaIdent)); + Assertions.assertEquals(schemaIdent.name(), loadedSchema.name()); + Assertions.assertEquals("comment", loadedSchema.comment()); + } + + @Test + public void testConcurrentImportSchemaFailsOnMismatchedIdentifier() throws IOException { + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schemaConcurrentMismatch"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + dispatcher.createSchema(schemaIdent, "comment", props); + SchemaEntity importedSchemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + SchemaEntity mismatchedSchemaEntity = + SchemaEntity.builder() + .withId(importedSchemaEntity.id() + 1) + .withName(schemaIdent.name()) + .withNamespace(schemaIdent.namespace()) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate genuine multi-catalog conflict: put fails, and the dispatcher-level retry finds + // an entity with a mismatched ID (operateOnEntity returns null → imported=false → error + // thrown). + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(mismatchedSchemaEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.SCHEMA), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, () -> dispatcher.loadSchema(schemaIdent)); + Assertions.assertTrue(exception.getMessage().contains("Schema managed by multiple catalogs")); + } + @Test public void testCreateAndAlterSchema() throws IOException { NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schema21"); diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java index d0016776c9c..9c44e46c7ef 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java @@ -44,9 +44,11 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.gravitino.Config; import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; @@ -256,6 +258,130 @@ public void testCreateAndLoadTable() throws IOException { Assertions.assertEquals("test", loadedTable4.auditInfo().creator()); } + @Test + public void testConcurrentImportTableReusesExistingEntity() throws IOException { + Namespace tableNs = Namespace.of(metalake, catalog, "schema52"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), "comment", props); + + NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableConcurrent"); + Column[] columns = + new Column[] { + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() + }; + + Table table = + tableOperationDispatcher.createTable( + tableIdent, columns, "comment", props, new Transform[0]); + TableEntity importedTableEntity = entityStore.get(tableIdent, TABLE, TableEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + TableEntity concurrentTableEntity = + TableEntity.builder() + .withId(importedTableEntity.id()) + .withName(tableIdent.name()) + .withNamespace(tableIdent.namespace()) + .withColumns( + IntStream.range(0, table.columns().length) + .mapToObj( + i -> + ColumnEntity.toColumnEntity(table.columns()[i], i, 0L, concurrentAudit)) + .collect(Collectors.toList())) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate HA race: first two gets return not-found (so both the pre-import check and the + // internalLoadTable inside importTable proceed to store.put), then put throws + // EntityAlreadyExistsException, and the dispatcher-level retry sees the entity on the third + // get. + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(concurrentTableEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.TABLE), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + Table loadedTable = + Assertions.assertDoesNotThrow(() -> tableOperationDispatcher.loadTable(tableIdent)); + Assertions.assertEquals(tableIdent.name(), loadedTable.name()); + Assertions.assertEquals("comment", loadedTable.comment()); + } + + @Test + public void testConcurrentImportTableFailsOnMismatchedIdentifier() throws IOException { + Namespace tableNs = Namespace.of(metalake, catalog, "schemaConcurrentMismatch"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), "comment", props); + + NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableConcurrentMismatch"); + Column[] columns = + new Column[] { + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() + }; + + Table table = + tableOperationDispatcher.createTable( + tableIdent, columns, "comment", props, new Transform[0]); + TableEntity importedTableEntity = entityStore.get(tableIdent, TABLE, TableEntity.class); + + AuditInfo concurrentAudit = + AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build(); + TableEntity mismatchedTableEntity = + TableEntity.builder() + .withId(importedTableEntity.id() + 1) + .withName(tableIdent.name()) + .withNamespace(tableIdent.namespace()) + .withColumns( + IntStream.range(0, table.columns().length) + .mapToObj( + i -> + ColumnEntity.toColumnEntity(table.columns()[i], i, 0L, concurrentAudit)) + .collect(Collectors.toList())) + .withAuditInfo(concurrentAudit) + .build(); + + // Simulate genuine multi-catalog conflict: put fails, and the dispatcher-level retry finds + // an entity with a mismatched ID (operateOnEntity returns null → imported=false → error + // thrown). + reset(entityStore); + doThrow(new NoSuchEntityException("mock error")) + .doThrow(new NoSuchEntityException("mock error")) + .doReturn(mismatchedTableEntity) + .when(entityStore) + .get(any(), eq(Entity.EntityType.TABLE), any()); + doThrow(new EntityAlreadyExistsException("mock conflict")) + .when(entityStore) + .put(any(), anyBoolean()); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> tableOperationDispatcher.loadTable(tableIdent)); + Assertions.assertTrue(exception.getMessage().contains("Table managed by multiple catalogs")); + } + @Test public void testCreateAndAlterTable() throws IOException { Namespace tableNs = Namespace.of(metalake, catalog, "schema61"); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java index 1bd75e9b59b..9f081571f55 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java @@ -45,9 +45,13 @@ import org.apache.iceberg.rest.responses.LoadCredentialsResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IcebergTableHookDispatcher implements IcebergTableOperationDispatcher { + private static final Logger LOG = LoggerFactory.getLogger(IcebergTableHookDispatcher.class); + private final IcebergTableOperationDispatcher dispatcher; private String metalake; @@ -92,23 +96,10 @@ public LoadTableResponse updateTable( public void dropTable( IcebergRequestContext context, TableIdentifier tableIdentifier, boolean purgeRequested) { dispatcher.dropTable(context, tableIdentifier, purgeRequested); - EntityStore store = GravitinoEnv.getInstance().entityStore(); - try { - if (store != null) { - // Delete the entity for the dropped table. - store.delete( - IcebergIdentifierUtils.toGravitinoTableIdentifier( - metalake, - context.catalogName(), - tableIdentifier, - HierarchicalSchemaUtil.schemaSeparator()), - Entity.EntityType.TABLE); - } - } catch (NoSuchEntityException ignore) { - // Ignore if the table entity does not exist. - } catch (IOException ioe) { - throw new RuntimeException("io exception when deleting table entity", ioe); - } + // Reconcile against Iceberg backend state — without a distributed TreeLock, + // another node may recreate the same table between the drop above and the + // EntityStore delete, leaving a stale Gravitino entity if we blindly delete. + bestEffortReconcileTableEntity(context, tableIdentifier); } @Override @@ -165,6 +156,12 @@ public void renameTable(IcebergRequestContext context, RenameTableRequest rename } catch (IOException ioe) { throw new RuntimeException("io exception when renaming table entity", ioe); } + + // IRC rename can race with another node's drop/create on either name. + // Reconcile both ends against the Iceberg backend so we don't leave a + // stale entity on the source or miss importing a re-created destination. + bestEffortReconcileTableEntity(context, renameTableRequest.source()); + bestEffortReconcileTableEntity(context, renameTableRequest.destination()); } @Override @@ -205,21 +202,73 @@ private void importTableAndSetOwner( // Import is intentionally NOT wrapped in try-catch: if it fails the table exists in Iceberg // but not in Gravitino, and silently swallowing that would mislead callers into thinking the // entity is registered. Surface the failure so the caller can react. + importTableEntity(context.catalogName(), namespace, tableName); + IcebergOwnershipUtils.setTableOwner( + metalake, + context.catalogName(), + namespace, + tableName, + context.userName(), + GravitinoEnv.getInstance().ownerDispatcher()); + } + + private void importTableEntity(String catalogName, Namespace namespace, String tableName) { TableDispatcher tableDispatcher = GravitinoEnv.getInstance().tableDispatcher(); if (tableDispatcher != null) { tableDispatcher.loadTable( IcebergIdentifierUtils.toGravitinoTableIdentifier( metalake, - context.catalogName(), + catalogName, TableIdentifier.of(namespace, tableName), HierarchicalSchemaUtil.schemaSeparator())); } - IcebergOwnershipUtils.setTableOwner( - metalake, - context.catalogName(), - namespace, - tableName, - context.userName(), - GravitinoEnv.getInstance().ownerDispatcher()); + } + + private void reconcileTableEntity( + IcebergRequestContext context, TableIdentifier tableIdentifier) { + // IRC requests can be served by different Gravitino nodes. Without a distributed TreeLock, + // another node may drop or recreate the same Iceberg table between the backend operation and + // this hook's EntityStore mutation. Reconcile the local Gravitino entity with the Iceberg + // backend state to avoid leaving stale/orphan table metadata in multi-node deployments. + if (dispatcher.tableExists(context, tableIdentifier)) { + importTableEntity(context.catalogName(), tableIdentifier.namespace(), tableIdentifier.name()); + return; + } + + deleteTableEntity(context.catalogName(), tableIdentifier); + + if (dispatcher.tableExists(context, tableIdentifier)) { + importTableEntity(context.catalogName(), tableIdentifier.namespace(), tableIdentifier.name()); + } + } + + private void bestEffortReconcileTableEntity( + IcebergRequestContext context, TableIdentifier tableIdentifier) { + try { + reconcileTableEntity(context, tableIdentifier); + } catch (RuntimeException e) { + LOG.warn( + "Failed to reconcile Gravitino table entity after the Iceberg backend operation " + + "succeeded. catalog={}, table={}", + context.catalogName(), + tableIdentifier, + e); + } + } + + private void deleteTableEntity(String catalogName, TableIdentifier tableIdentifier) { + EntityStore store = GravitinoEnv.getInstance().entityStore(); + try { + if (store != null) { + store.delete( + IcebergIdentifierUtils.toGravitinoTableIdentifier( + metalake, catalogName, tableIdentifier, HierarchicalSchemaUtil.schemaSeparator()), + Entity.EntityType.TABLE); + } + } catch (NoSuchEntityException ignore) { + // Ignore if the table entity does not exist. + } catch (IOException ioe) { + throw new RuntimeException("io exception when deleting table entity", ioe); + } } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java index 22d22724cd8..a6e35fef500 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java @@ -97,43 +97,10 @@ public LoadViewResponse replaceView( @Override public void dropView(IcebergRequestContext context, TableIdentifier viewIdentifier) { dispatcher.dropView(context, viewIdentifier); - - // Remove view from Gravitino entity store - EntityStore store = GravitinoEnv.getInstance().entityStore(); - try { - if (store != null) { - store.delete( - IcebergIdentifierUtils.toGravitinoTableIdentifier( - metalake, - context.catalogName(), - viewIdentifier, - HierarchicalSchemaUtil.schemaSeparator()), - Entity.EntityType.VIEW); - LOG.info( - "Successfully removed view from Gravitino entity store: {}.{}.{}.{}", - metalake, - context.catalogName(), - viewIdentifier.namespace(), - viewIdentifier.name()); - } - } catch (NoSuchEntityException ignore) { - // Ignore if the view entity does not exist in the store - LOG.debug( - "View entity does not exist in store: {}.{}.{}.{}", - metalake, - context.catalogName(), - viewIdentifier.namespace(), - viewIdentifier.name()); - } catch (IOException ioe) { - LOG.error( - "Failed to delete view entity from store: {}.{}.{}.{}", - metalake, - context.catalogName(), - viewIdentifier.namespace(), - viewIdentifier.name(), - ioe); - throw new RuntimeException("Failed to delete view entity from store", ioe); - } + // Reconcile against Iceberg backend state — without a distributed TreeLock, + // another node may recreate the same view between the drop above and the + // EntityStore delete, leaving a stale Gravitino entity if we blindly delete. + bestEffortReconcileViewEntity(context, viewIdentifier); } @Override @@ -191,6 +158,12 @@ public void renameView(IcebergRequestContext context, RenameTableRequest renameV LOG.error("Failed to rename view entity in store from {} to {}", sourceIdent, destIdent, ioe); throw new RuntimeException("Failed to rename view entity in store", ioe); } + + // IRC rename can race with another node's drop/create on either name. + // Reconcile both ends against the Iceberg backend so we don't leave a + // stale entity on the source or miss importing a re-created destination. + bestEffortReconcileViewEntity(context, renameViewRequest.source()); + bestEffortReconcileViewEntity(context, renameViewRequest.destination()); } /** @@ -235,4 +208,70 @@ private void importView(String catalogName, Namespace namespace, String viewName } } } + + private void reconcileViewEntity(IcebergRequestContext context, TableIdentifier viewIdentifier) { + // IRC requests can be served by different Gravitino nodes. Without a distributed TreeLock, + // another node may drop or recreate the same Iceberg view between the backend operation and + // this hook's EntityStore mutation. Reconcile the local Gravitino entity with the Iceberg + // backend state to avoid leaving stale/orphan view metadata in multi-node deployments. + if (dispatcher.viewExists(context, viewIdentifier)) { + importView(context.catalogName(), viewIdentifier.namespace(), viewIdentifier.name()); + return; + } + + deleteViewEntity(context.catalogName(), viewIdentifier); + + if (dispatcher.viewExists(context, viewIdentifier)) { + importView(context.catalogName(), viewIdentifier.namespace(), viewIdentifier.name()); + } + } + + private void bestEffortReconcileViewEntity( + IcebergRequestContext context, TableIdentifier viewIdentifier) { + try { + reconcileViewEntity(context, viewIdentifier); + } catch (RuntimeException e) { + LOG.warn( + "Failed to reconcile Gravitino view entity after the Iceberg backend operation " + + "succeeded. catalog={}, view={}", + context.catalogName(), + viewIdentifier, + e); + } + } + + private void deleteViewEntity(String catalogName, TableIdentifier viewIdentifier) { + EntityStore store = GravitinoEnv.getInstance().entityStore(); + try { + if (store != null) { + store.delete( + IcebergIdentifierUtils.toGravitinoTableIdentifier( + metalake, catalogName, viewIdentifier, HierarchicalSchemaUtil.schemaSeparator()), + Entity.EntityType.VIEW); + LOG.info( + "Successfully removed view from Gravitino entity store: {}.{}.{}.{}", + metalake, + catalogName, + viewIdentifier.namespace(), + viewIdentifier.name()); + } + } catch (NoSuchEntityException ignore) { + // Ignore if the view entity does not exist in the store + LOG.debug( + "View entity does not exist in store: {}.{}.{}.{}", + metalake, + catalogName, + viewIdentifier.namespace(), + viewIdentifier.name()); + } catch (IOException ioe) { + LOG.error( + "Failed to delete view entity from store: {}.{}.{}.{}", + metalake, + catalogName, + viewIdentifier.namespace(), + viewIdentifier.name(), + ioe); + throw new RuntimeException("Failed to delete view entity from store", ioe); + } + } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java index 6b03109c94c..2d4f2e125c8 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java @@ -184,6 +184,36 @@ public void testDropTableDeletesEntity() throws IOException { verify(mockEntityStore).delete(expectedIdentifier, Entity.EntityType.TABLE); } + @Test + public void testDropTableReimportsEntityWhenTableExistsAfterDrop() throws IOException { + TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); + when(mockDispatcher.tableExists(mockContext, tableId)).thenReturn(true); + + hookDispatcher.dropTable(mockContext, tableId, false); + + verify(mockDispatcher).dropTable(mockContext, tableId, false); + NameIdentifier expectedIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier( + TEST_METALAKE, TEST_CATALOG, tableId, ":"); + verify(mockEntityStore, never()).delete(expectedIdentifier, Entity.EntityType.TABLE); + verify(mockTableDispatcher).loadTable(expectedIdentifier); + } + + @Test + public void testDropTableReimportsEntityWhenTableIsRecreatedDuringDelete() throws IOException { + TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); + when(mockDispatcher.tableExists(mockContext, tableId)).thenReturn(false, true); + + hookDispatcher.dropTable(mockContext, tableId, false); + + verify(mockDispatcher).dropTable(mockContext, tableId, false); + NameIdentifier expectedIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier( + TEST_METALAKE, TEST_CATALOG, tableId, ":"); + verify(mockEntityStore).delete(expectedIdentifier, Entity.EntityType.TABLE); + verify(mockTableDispatcher).loadTable(expectedIdentifier); + } + @Test public void testDropTableIgnoresNoSuchEntityException() throws IOException { TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); @@ -202,16 +232,12 @@ public void testDropTableIgnoresNoSuchEntityException() throws IOException { } @Test - public void testDropTableThrowsRuntimeExceptionOnIOException() throws IOException { + public void testDropTableIgnoresReconciliationIOException() throws IOException { TableIdentifier tableId = TableIdentifier.of("test_schema", "test_table"); doThrow(new IOException("IO error")).when(mockEntityStore).delete(any(), any()); - RuntimeException exception = - Assertions.assertThrows( - RuntimeException.class, () -> hookDispatcher.dropTable(mockContext, tableId, false)); - - Assertions.assertTrue(exception.getMessage().contains("io exception when deleting table")); + Assertions.assertDoesNotThrow(() -> hookDispatcher.dropTable(mockContext, tableId, false)); verify(mockDispatcher).dropTable(mockContext, tableId, false); } @@ -242,6 +268,58 @@ public void testRenameTableUpdatesEntity() throws IOException { .update(eq(sourceIdentifier), eq(TableEntity.class), eq(Entity.EntityType.TABLE), any()); } + @Test + public void testRenameTableReconcilesSourceAndDestinationEntities() throws IOException { + TableIdentifier source = TableIdentifier.of("schema1", "old_table"); + TableIdentifier dest = TableIdentifier.of("schema2", "new_table"); + RenameTableRequest request = + RenameTableRequest.builder().withSource(source).withDestination(dest).build(); + + TableEntity mockTableEntity = mock(TableEntity.class); + when(mockTableEntity.id()).thenReturn(1L); + when(mockTableEntity.columns()).thenReturn(Collections.emptyList()); + AuditInfo auditInfo = + AuditInfo.builder().withCreator("original_creator").withCreateTime(Instant.now()).build(); + when(mockTableEntity.auditInfo()).thenReturn(auditInfo); + when(mockEntityStore.update(any(), eq(TableEntity.class), eq(Entity.EntityType.TABLE), any())) + .thenReturn(mockTableEntity); + when(mockDispatcher.tableExists(mockContext, source)).thenReturn(false, false); + when(mockDispatcher.tableExists(mockContext, dest)).thenReturn(true); + + hookDispatcher.renameTable(mockContext, request); + + NameIdentifier sourceIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE, TEST_CATALOG, source, ":"); + NameIdentifier destIdentifier = + IcebergIdentifierUtils.toGravitinoTableIdentifier(TEST_METALAKE, TEST_CATALOG, dest, ":"); + verify(mockEntityStore).delete(sourceIdentifier, Entity.EntityType.TABLE); + verify(mockTableDispatcher).loadTable(destIdentifier); + } + + @Test + public void testRenameTableIgnoresReconciliationImportFailure() throws IOException { + TableIdentifier source = TableIdentifier.of("schema1", "old_table"); + TableIdentifier dest = TableIdentifier.of("schema2", "new_table"); + RenameTableRequest request = + RenameTableRequest.builder().withSource(source).withDestination(dest).build(); + + TableEntity mockTableEntity = mock(TableEntity.class); + when(mockTableEntity.id()).thenReturn(1L); + when(mockTableEntity.columns()).thenReturn(Collections.emptyList()); + AuditInfo auditInfo = + AuditInfo.builder().withCreator("original_creator").withCreateTime(Instant.now()).build(); + when(mockTableEntity.auditInfo()).thenReturn(auditInfo); + when(mockEntityStore.update(any(), eq(TableEntity.class), eq(Entity.EntityType.TABLE), any())) + .thenReturn(mockTableEntity); + when(mockDispatcher.tableExists(mockContext, dest)).thenReturn(true); + doThrow(new RuntimeException("import failed")).when(mockTableDispatcher).loadTable(any()); + + Assertions.assertDoesNotThrow(() -> hookDispatcher.renameTable(mockContext, request)); + + verify(mockDispatcher).renameTable(mockContext, request); + verify(mockTableDispatcher).loadTable(any()); + } + @Test public void testRenameTableIgnoresNoSuchEntityException() throws IOException { TableIdentifier source = TableIdentifier.of("schema1", "old_table"); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java index 45faf33f9b5..4fdf1426b97 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java @@ -257,6 +257,34 @@ public void testDropViewRemovesFromEntityStore() throws Exception { verify(mockEntityStore, times(1)).delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); } + @Test + public void testDropViewReimportsEntityWhenViewExistsAfterDrop() throws Exception { + TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); + when(mockExecutor.viewExists(mockContext, viewIdent)).thenReturn(true); + + hookDispatcher.dropView(mockContext, viewIdent); + + verify(mockExecutor, times(1)).dropView(mockContext, viewIdent); + NameIdentifier expectedIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, viewIdent, ":"); + verify(mockEntityStore, never()).delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); + verify(mockViewDispatcher, times(1)).loadView(eq(expectedIdent)); + } + + @Test + public void testDropViewReimportsEntityWhenViewIsRecreatedDuringDelete() throws Exception { + TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); + when(mockExecutor.viewExists(mockContext, viewIdent)).thenReturn(false, true); + + hookDispatcher.dropView(mockContext, viewIdent); + + verify(mockExecutor, times(1)).dropView(mockContext, viewIdent); + NameIdentifier expectedIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, viewIdent, ":"); + verify(mockEntityStore, times(1)).delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); + verify(mockViewDispatcher, times(1)).loadView(eq(expectedIdent)); + } + @Test public void testDropViewHandlesMissingEntity() throws Exception { TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); @@ -276,7 +304,7 @@ public void testDropViewHandlesMissingEntity() throws Exception { } @Test - public void testDropViewHandlesIOException() throws Exception { + public void testDropViewIgnoresReconciliationIOException() throws Exception { TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), VIEW_NAME); // Simulate IO error @@ -286,11 +314,8 @@ public void testDropViewHandlesIOException() throws Exception { .when(mockEntityStore) .delete(eq(expectedIdent), eq(Entity.EntityType.VIEW)); - // Should throw RuntimeException wrapping the IOException - RuntimeException exception = - assertThrows(RuntimeException.class, () -> hookDispatcher.dropView(mockContext, viewIdent)); + hookDispatcher.dropView(mockContext, viewIdent); - assertEquals("Failed to delete view entity from store", exception.getMessage()); verify(mockExecutor, times(1)).dropView(mockContext, viewIdent); } @@ -313,6 +338,45 @@ public void testRenameViewUpdatesEntityStore() throws Exception { .update(eq(sourceGravitinoIdent), eq(ViewEntity.class), eq(Entity.EntityType.VIEW), any()); } + @Test + public void testRenameViewReconcilesSourceAndDestinationEntities() throws Exception { + TableIdentifier sourceIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view"); + TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "new_view"); + RenameTableRequest renameRequest = + RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build(); + when(mockExecutor.viewExists(mockContext, sourceIdent)).thenReturn(false, false); + when(mockExecutor.viewExists(mockContext, destIdent)).thenReturn(true); + + hookDispatcher.renameView(mockContext, renameRequest); + + NameIdentifier sourceGravitinoIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, sourceIdent, ":"); + NameIdentifier destGravitinoIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, destIdent, ":"); + verify(mockEntityStore, times(1)).delete(eq(sourceGravitinoIdent), eq(Entity.EntityType.VIEW)); + verify(mockViewDispatcher, times(1)).loadView(eq(destGravitinoIdent)); + } + + @Test + public void testRenameViewIgnoresReconciliationDeleteFailure() throws Exception { + TableIdentifier sourceIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view"); + TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "new_view"); + RenameTableRequest renameRequest = + RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build(); + when(mockExecutor.viewExists(mockContext, sourceIdent)).thenReturn(false); + + NameIdentifier sourceGravitinoIdent = + IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, sourceIdent, ":"); + doThrow(new IOException("IO error")) + .when(mockEntityStore) + .delete(eq(sourceGravitinoIdent), eq(Entity.EntityType.VIEW)); + + hookDispatcher.renameView(mockContext, renameRequest); + + verify(mockExecutor, times(1)).renameView(mockContext, renameRequest); + verify(mockEntityStore, times(1)).delete(eq(sourceGravitinoIdent), eq(Entity.EntityType.VIEW)); + } + @Test public void testRenameViewHandlesMissingEntity() throws Exception { TableIdentifier sourceIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view");