Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,18 @@ private void importSchema(NameIdentifier identifier) {
try {
store.put(schemaEntity, true);
} catch (EntityAlreadyExistsException e) {
// HA race: another Gravitino node finished importing the same schema between
// our internalLoadSchema check above and this put. The entity is already in
// the store, so the import is effectively a no-op — let the caller's load
// request succeed instead of failing with "managed by multiple catalogs".
SchemaEntity concurrentSchemaEntity = getEntity(identifier, SCHEMA, SchemaEntity.class);
if (isSameImportedSchema(concurrentSchemaEntity, stringId)) {
LOG.info(
"Schema {} was imported concurrently, reusing the existing entity in Gravitino.",
identifier);
Comment thread
yuqi1129 marked this conversation as resolved.
return;
}

LOG.error("Failed to import schema {} with id {} to the store.", identifier, uid, e);
throw new UnsupportedOperationException(
"Schema managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. "
Expand All @@ -397,6 +409,10 @@ private void importSchema(NameIdentifier identifier) {
}
}

private boolean isSameImportedSchema(SchemaEntity schemaEntity, StringIdentifier stringId) {
return schemaEntity != null && (stringId == null || schemaEntity.id().equals(stringId.id()));
}

private EntityCombinedSchema internalLoadSchema(NameIdentifier ident) {
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,22 @@ private EntityCombinedTable importTable(NameIdentifier identifier) {
try {
store.put(tableEntity, true);
} catch (EntityAlreadyExistsException e) {
// HA race: another Gravitino node finished importing the same table between
// our internalLoadTable check above and this put. Reuse the existing entity
// instead of failing the caller's load with "managed by multiple catalogs".
TableEntity concurrentTableEntity = getEntity(identifier, TABLE, TableEntity.class);
if (isSameImportedTable(concurrentTableEntity, stringId)) {
LOG.info(
"Table {} was imported concurrently, reusing the existing entity in Gravitino.",
identifier);
return EntityCombinedTable.of(table.tableFromCatalog(), concurrentTableEntity)
.withHiddenProperties(
getHiddenPropertyNames(
getCatalogIdentifier(identifier),
HasPropertyMetadata::tablePropertiesMetadata,
table.tableFromCatalog().properties()));
}
Comment thread
yuqi1129 marked this conversation as resolved.

LOG.error("Failed to import table {} with id {} to the store.", identifier, uid, e);
throw new UnsupportedOperationException(
"Table managed by multiple catalogs. This may cause unexpected issues such as privilege conflicts. "
Expand All @@ -494,6 +510,10 @@ private EntityCombinedTable importTable(NameIdentifier identifier) {
table.tableFromCatalog().properties()));
}

private boolean isSameImportedTable(TableEntity tableEntity, StringIdentifier stringId) {
return tableEntity != null && (stringId == null || tableEntity.id().equals(stringId.id()));
}

private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Table table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
Expand Down Expand Up @@ -205,6 +206,69 @@ public void testCreateAndLoadSchema() throws IOException {
Assertions.assertEquals("test", loadedSchema3.auditInfo().creator());
}

@Test
public void testConcurrentImportSchemaReusesExistingEntity() throws IOException {
NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schemaConcurrent");
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
dispatcher.createSchema(schemaIdent, "comment", props);
SchemaEntity importedSchemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class);

AuditInfo concurrentAudit =
AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build();
SchemaEntity concurrentSchemaEntity =
SchemaEntity.builder()
.withId(importedSchemaEntity.id())
.withName(schemaIdent.name())
.withNamespace(schemaIdent.namespace())
.withAuditInfo(concurrentAudit)
.build();

reset(entityStore);
doThrow(new NoSuchEntityException("mock error"))
.doReturn(concurrentSchemaEntity)
.when(entityStore)
.get(any(), eq(Entity.EntityType.SCHEMA), any());
doThrow(new EntityAlreadyExistsException("mock conflict"))
.when(entityStore)
.put(any(), anyBoolean());

Schema loadedSchema = Assertions.assertDoesNotThrow(() -> dispatcher.loadSchema(schemaIdent));
Assertions.assertEquals(schemaIdent.name(), loadedSchema.name());
Assertions.assertEquals("comment", loadedSchema.comment());
}

@Test
public void testConcurrentImportSchemaFailsOnMismatchedIdentifier() throws IOException {
NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schemaConcurrentMismatch");
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
dispatcher.createSchema(schemaIdent, "comment", props);
SchemaEntity importedSchemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class);

AuditInfo concurrentAudit =
AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build();
SchemaEntity mismatchedSchemaEntity =
SchemaEntity.builder()
.withId(importedSchemaEntity.id() + 1)
.withName(schemaIdent.name())
.withNamespace(schemaIdent.namespace())
.withAuditInfo(concurrentAudit)
.build();

reset(entityStore);
doThrow(new NoSuchEntityException("mock error"))
.doReturn(mismatchedSchemaEntity)
.when(entityStore)
.get(any(), eq(Entity.EntityType.SCHEMA), any());
doThrow(new EntityAlreadyExistsException("mock conflict"))
.when(entityStore)
.put(any(), anyBoolean());

UnsupportedOperationException exception =
Assertions.assertThrows(
UnsupportedOperationException.class, () -> dispatcher.loadSchema(schemaIdent));
Assertions.assertTrue(exception.getMessage().contains("Schema managed by multiple catalogs"));
}

@Test
public void testCreateAndAlterSchema() throws IOException {
NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schema21");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
Expand Down Expand Up @@ -256,6 +258,121 @@ public void testCreateAndLoadTable() throws IOException {
Assertions.assertEquals("test", loadedTable4.auditInfo().creator());
}

@Test
public void testConcurrentImportTableReusesExistingEntity() throws IOException {
Namespace tableNs = Namespace.of(metalake, catalog, "schema52");
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), "comment", props);

NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableConcurrent");
Column[] columns =
new Column[] {
TestColumn.builder()
.withName("col1")
.withPosition(0)
.withType(Types.StringType.get())
.build(),
TestColumn.builder()
.withName("col2")
.withPosition(1)
.withType(Types.StringType.get())
.build()
};

Table table =
tableOperationDispatcher.createTable(
tableIdent, columns, "comment", props, new Transform[0]);
TableEntity importedTableEntity = entityStore.get(tableIdent, TABLE, TableEntity.class);

AuditInfo concurrentAudit =
AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build();
TableEntity concurrentTableEntity =
TableEntity.builder()
.withId(importedTableEntity.id())
.withName(tableIdent.name())
.withNamespace(tableIdent.namespace())
.withColumns(
IntStream.range(0, table.columns().length)
.mapToObj(
i ->
ColumnEntity.toColumnEntity(table.columns()[i], i, 0L, concurrentAudit))
.collect(Collectors.toList()))
.withAuditInfo(concurrentAudit)
.build();

reset(entityStore);
doThrow(new NoSuchEntityException("mock error"))
.doReturn(concurrentTableEntity)
.when(entityStore)
.get(any(), eq(Entity.EntityType.TABLE), any());
doThrow(new EntityAlreadyExistsException("mock conflict"))
.when(entityStore)
.put(any(), anyBoolean());

Table loadedTable =
Assertions.assertDoesNotThrow(() -> tableOperationDispatcher.loadTable(tableIdent));
Assertions.assertEquals(tableIdent.name(), loadedTable.name());
Assertions.assertEquals("comment", loadedTable.comment());
}

@Test
public void testConcurrentImportTableFailsOnMismatchedIdentifier() throws IOException {
Namespace tableNs = Namespace.of(metalake, catalog, "schemaConcurrentMismatch");
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), "comment", props);

NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableConcurrentMismatch");
Column[] columns =
new Column[] {
TestColumn.builder()
.withName("col1")
.withPosition(0)
.withType(Types.StringType.get())
.build(),
TestColumn.builder()
.withName("col2")
.withPosition(1)
.withType(Types.StringType.get())
.build()
};

Table table =
tableOperationDispatcher.createTable(
tableIdent, columns, "comment", props, new Transform[0]);
TableEntity importedTableEntity = entityStore.get(tableIdent, TABLE, TableEntity.class);

AuditInfo concurrentAudit =
AuditInfo.builder().withCreator("concurrent").withCreateTime(Instant.now()).build();
TableEntity mismatchedTableEntity =
TableEntity.builder()
.withId(importedTableEntity.id() + 1)
.withName(tableIdent.name())
.withNamespace(tableIdent.namespace())
.withColumns(
IntStream.range(0, table.columns().length)
.mapToObj(
i ->
ColumnEntity.toColumnEntity(table.columns()[i], i, 0L, concurrentAudit))
.collect(Collectors.toList()))
.withAuditInfo(concurrentAudit)
.build();

reset(entityStore);
doThrow(new NoSuchEntityException("mock error"))
.doReturn(mismatchedTableEntity)
.when(entityStore)
.get(any(), eq(Entity.EntityType.TABLE), any());
doThrow(new EntityAlreadyExistsException("mock conflict"))
.when(entityStore)
.put(any(), anyBoolean());

UnsupportedOperationException exception =
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> tableOperationDispatcher.loadTable(tableIdent));
Assertions.assertTrue(exception.getMessage().contains("Table managed by multiple catalogs"));
}

@Test
public void testCreateAndAlterTable() throws IOException {
Namespace tableNs = Namespace.of(metalake, catalog, "schema61");
Expand Down
Loading
Loading