diff --git a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java index a0dbce64a20..492cdcc38bc 100644 --- a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java +++ b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java @@ -325,7 +325,7 @@ public static void setUp() throws IllegalAccessException { MockedStatic catalogMetaServiceMockedStatic = Mockito.mockStatic(CatalogMetaService.class); MockedStatic schemaMetaServiceMockedStatic = - Mockito.mockStatic(SchemaMetaService.class); + Mockito.mockStatic(SchemaMetaService.class, Mockito.CALLS_REAL_METHODS); metalakeMetaServiceMockedStatic .when(MetalakeMetaService::getInstance) diff --git a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java index 854760060d6..25c4fc60b1d 100644 --- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java +++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java @@ -165,6 +165,34 @@ void insertRelation( boolean override) throws IOException; + /** + * Batch inserts the same relation from many source entities to one destination. Parameter order + * matches {@link #insertRelation}. All sources share one {@code srcType}. Semantics match + * repeated {@link #insertRelation} with the same {@code relType}; a relational backend may use + * fewer round-trips for some relation types (e.g. {@link Type#OWNER_REL}). + * + * @param relType the relation type (e.g. {@link Type#OWNER_REL}) + * @param srcIdentifiers identifiers of the source side for each relation row + * @param srcType entity type shared by every identifier in {@code srcIdentifiers} + * @param dstIdentifier destination entity identifier (shared by all rows) + * @param dstType destination entity type + * @param override if true, replace existing relations of each source entity first, per {@link + * #insertRelation} + * @throws IOException if the storage operation fails + */ + default void batchInsertRelations( + Type relType, + List srcIdentifiers, + Entity.EntityType srcType, + NameIdentifier dstIdentifier, + Entity.EntityType dstType, + boolean override) + throws IOException { + for (NameIdentifier srcIdent : srcIdentifiers) { + insertRelation(relType, srcIdent, srcType, dstIdentifier, dstType, override); + } + } + /** * Updates the relations for a given entity by adding a set of new relations and removing another * set of relations. diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java index 22c7c98330b..f401cd50be4 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java @@ -733,6 +733,29 @@ public void insertRelation( } } + @Override + public void batchInsertRelations( + Type relType, + List srcIdentifiers, + Entity.EntityType srcType, + NameIdentifier dstIdentifier, + Entity.EntityType dstType, + boolean override) + throws IOException { + if (srcIdentifiers == null || srcIdentifiers.isEmpty()) { + return; + } + switch (relType) { + case OWNER_REL: + OwnerMetaService.getInstance() + .batchSetOwners(srcIdentifiers, srcType, dstIdentifier, dstType); + break; + default: + throw new IllegalArgumentException( + String.format("Doesn't support batch insert for the relation type %s", relType)); + } + } + @Override public List updateEntityRelations( Type relType, diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java index d89c6c678d8..27381659d67 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java @@ -55,7 +55,7 @@ /** * Relation store to store entities. This means we can store entities in a relational store. I.e., * MySQL, PostgreSQL, etc. If you want to use a different backend, you can implement the {@link - * RelationalBackend} interface + * RelationalBackend} interface. The default JDBC backend is {@link JDBCBackend}. */ public class RelationalEntityStore implements EntityStore, SupportsRelationOperations { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalEntityStore.class); @@ -327,6 +327,26 @@ public void insertRelation( cache.invalidate(dstIdentifier, dstType, relType); } + @Override + public void batchInsertRelations( + Type relType, + List srcIdentifiers, + Entity.EntityType srcType, + NameIdentifier dstIdentifier, + Entity.EntityType dstType, + boolean override) + throws IOException { + if (srcIdentifiers == null || srcIdentifiers.isEmpty()) { + return; + } + backend.batchInsertRelations( + relType, srcIdentifiers, srcType, dstIdentifier, dstType, override); + for (NameIdentifier ident : srcIdentifiers) { + cache.invalidate(ident, srcType, relType); + } + cache.invalidate(dstIdentifier, dstType, relType); + } + @Override public List updateEntityRelations( Type relType, diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java new file mode 100644 index 00000000000..e53fb701a56 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; + +public abstract class BasePOStorageOps { + public void insertPO(Mapper mapper, PO po, boolean overwrite) { + throw new UnsupportedOperationException( + "insertPO is not supported by " + getClass().getSimpleName()); + } + + public void batchInsertPOs(Mapper mapper, List pos, boolean overwrite) { + throw new UnsupportedOperationException( + "batchInsertPOs is not supported by " + getClass().getSimpleName()); + } + + public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) { + throw new UnsupportedOperationException( + "updatePO is not supported by " + getClass().getSimpleName()); + } + + /** + * When {@code true} and the entity-id cache is enabled, callers may resolve rows by parent entity + * id plus short name via {@link #getPO} and {@link #listPOs}; see {@link POStorageReadRouting}. + */ + public boolean supportsParentIdRelationalRead() { + return false; + } + + public PO getPO(Mapper mapper, Long parentId, String name) { + throw new UnsupportedOperationException( + "getPO by parent id is not supported by " + getClass().getSimpleName()); + } + + public List listPOs(Mapper mapper, Long parentId) { + throw new UnsupportedOperationException( + "listPOs by parent id is not supported by " + getClass().getSimpleName()); + } + + public List listPOs(Mapper mapper, Namespace namespace, List names) { + throw new UnsupportedOperationException( + "listPOs by namespace and names is not supported by " + getClass().getSimpleName()); + } + + public List listPOs(Mapper mapper, List entityIds) { + throw new UnsupportedOperationException( + "listPOs by entityIds is not supported by " + getClass().getSimpleName()); + } + + public PO getPOByFullName(Mapper mapper, NameIdentifier identifier) { + throw new UnsupportedOperationException( + "getPOByFullName is not supported by " + getClass().getSimpleName()); + } + + public List listPOsByNSFullName(Mapper mapper, Namespace namespace) { + throw new UnsupportedOperationException( + "listPOsByNSFullName is not supported by " + getClass().getSimpleName()); + } + + protected abstract Entity.EntityType entityType(); +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java index 0377afed002..b8967e70f42 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java @@ -33,7 +33,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.gravitino.Entity; -import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.HasIdentifier; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.NameIdentifier; @@ -56,14 +55,17 @@ import org.slf4j.LoggerFactory; public class FunctionMetaService { + private static final Logger LOG = LoggerFactory.getLogger(FunctionMetaService.class); + private static final FunctionMetaService INSTANCE = new FunctionMetaService(); + private BasePOStorageOps ops; + public static FunctionMetaService getInstance() { return INSTANCE; } - private static final Logger LOG = LoggerFactory.getLogger(FunctionMetaService.class); - private static final FunctionMetaService INSTANCE = new FunctionMetaService(); - - private FunctionMetaService() {} + private FunctionMetaService() { + this.ops = new HierarchicalConventionPOStorageOps<>(new FunctionPOStorageOps()); + } @Monitored( metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, @@ -87,18 +89,17 @@ public FunctionEntity getFunctionByIdentifier(NameIdentifier ident) { metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "getFunctionIdBySchemaIdAndFunctionName") public Long getFunctionIdBySchemaIdAndFunctionName(Long schemaId, String functionName) { - Long functionId = + FunctionPO functionPO = SessionUtils.getWithoutCommit( - FunctionMetaMapper.class, - mapper -> mapper.selectFunctionIdBySchemaIdAndFunctionName(schemaId, functionName)); + FunctionMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, functionName)); - if (functionId == null) { + if (functionPO == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT), functionName); } - return functionId; + return functionPO.functionId(); } @Monitored( @@ -115,14 +116,7 @@ public void insertFunction(FunctionEntity functionEntity, boolean overwrite) thr SessionUtils.doMultipleWithCommit( () -> SessionUtils.doWithoutCommit( - FunctionMetaMapper.class, - mapper -> { - if (overwrite) { - mapper.insertFunctionMetaOnDuplicateKeyUpdate(po); - } else { - mapper.insertFunctionMeta(po); - } - }), + FunctionMetaMapper.class, mapper -> ops.insertPO(mapper, po, overwrite)), () -> SessionUtils.doWithoutCommit( FunctionVersionMetaMapper.class, @@ -231,8 +225,18 @@ public int deleteFunctionVersionsByRetentionCount(Long versionRetentionCount, in baseMetricName = "getFunctionPOByIdentifier") FunctionPO getFunctionPOByIdentifier(NameIdentifier ident) { NameIdentifierUtil.checkFunction(ident); + FunctionPO functionPO = + SessionUtils.getWithoutCommit( + FunctionMetaMapper.class, + mapper -> POStorageReadRouting.getPO(mapper, ident, ops, Entity.EntityType.FUNCTION)); - return functionPOFetcher().apply(ident); + if (functionPO == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT), + ident.name()); + } + return functionPO; } @Monitored( @@ -260,7 +264,7 @@ public FunctionEntity updateFunction( () -> SessionUtils.doWithoutCommit( FunctionMetaMapper.class, - mapper -> mapper.updateFunctionMeta(newFunctionPO, oldFunctionPO))); + mapper -> ops.updatePO(mapper, newFunctionPO, oldFunctionPO))); return newEntity; } catch (RuntimeException re) { @@ -270,89 +274,10 @@ public FunctionEntity updateFunction( } } - private Function functionPOFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::getFunctionPOBySchemaId - : this::getFunctionPOByFullQualifiedName; - } - - private FunctionPO getFunctionPOBySchemaId(NameIdentifier ident) { - Long schemaId = - EntityIdService.getEntityId( - NameIdentifier.of(ident.namespace().levels()), Entity.EntityType.SCHEMA); - - FunctionPO functionPO = - SessionUtils.getWithoutCommit( - FunctionMetaMapper.class, - mapper -> mapper.selectFunctionMetaBySchemaIdAndName(schemaId, ident.name())); - - if (functionPO == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT), - ident.toString()); - } - return functionPO; - } - - private FunctionPO getFunctionPOByFullQualifiedName(NameIdentifier ident) { - String[] namespaceLevels = ident.namespace().levels(); - FunctionPO functionPO = - SessionUtils.getWithoutCommit( - FunctionMetaMapper.class, - mapper -> - mapper.selectFunctionMetaByFullQualifiedName( - namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], ident.name())); - - if (functionPO == null || functionPO.functionId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT), - ident.name()); - } - - if (functionPO.schemaId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.SCHEMA.name().toLowerCase(), - namespaceLevels[2]); - } - return functionPO; - } - private List listFunctionPOs(Namespace namespace) { - return functionListFetcher().apply(namespace); - } - - private List listFunctionPOsBySchemaId(Namespace namespace) { - Long schemaId = - EntityIdService.getEntityId( - NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA); return SessionUtils.getWithoutCommit( - FunctionMetaMapper.class, mapper -> mapper.listFunctionPOsBySchemaId(schemaId)); - } - - private Function> functionListFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::listFunctionPOsBySchemaId - : this::listFunctionPOsByFullQualifiedName; - } - - private List listFunctionPOsByFullQualifiedName(Namespace namespace) { - String[] namespaceLevels = namespace.levels(); - List functionPOs = - SessionUtils.getWithoutCommit( - FunctionMetaMapper.class, - mapper -> - mapper.listFunctionPOsByFullQualifiedName( - namespaceLevels[0], namespaceLevels[1], namespaceLevels[2])); - if (functionPOs.isEmpty() || functionPOs.get(0).schemaId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.SCHEMA.name().toLowerCase(), - namespaceLevels[2]); - } - return functionPOs.stream().filter(po -> po.functionId() != null).collect(Collectors.toList()); + FunctionMetaMapper.class, + mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, Entity.EntityType.FUNCTION)); } private void fillFunctionPOBuilderParentEntityId( diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java new file mode 100644 index 00000000000..eb453869d26 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper; +import org.apache.gravitino.storage.relational.po.FunctionPO; + +public class FunctionPOStorageOps extends BasePOStorageOps { + + public FunctionPOStorageOps() {} + + @Override + public void insertPO(FunctionMetaMapper mapper, FunctionPO functionPO, boolean overwrite) { + if (overwrite) { + mapper.insertFunctionMetaOnDuplicateKeyUpdate(functionPO); + } else { + mapper.insertFunctionMeta(functionPO); + } + } + + @Override + public Integer updatePO(FunctionMetaMapper mapper, FunctionPO newPO, FunctionPO oldPO) { + return mapper.updateFunctionMeta(newPO, oldPO); + } + + @Override + public FunctionPO getPO(FunctionMetaMapper mapper, Long parentId, String name) { + return mapper.selectFunctionMetaBySchemaIdAndName(parentId, name); + } + + @Override + public FunctionPO getPOByFullName(FunctionMetaMapper mapper, NameIdentifier identifier) { + Namespace namespace = identifier.namespace(); + return mapper.selectFunctionMetaByFullQualifiedName( + namespace.level(0), namespace.level(1), namespace.level(2), identifier.name()); + } + + @Override + public List listPOs(FunctionMetaMapper mapper, Long parentId) { + return mapper.listFunctionPOsBySchemaId(parentId); + } + + @Override + public List listPOs(FunctionMetaMapper mapper, List entityIds) { + return mapper.listFunctionPOsByFunctionIds(entityIds); + } + + @Override + public List listPOsByNSFullName(FunctionMetaMapper mapper, Namespace namespace) { + List pos = + mapper.listFunctionPOsByFullQualifiedName( + namespace.level(0), namespace.level(1), namespace.level(2)); + // INNER JOIN on metalake/catalog: an empty result means the metalake or catalog does not exist. + if (pos.isEmpty()) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // LEFT JOIN on schema_meta: rows with non-null catalogId but null schemaId mean the + // catalog exists but the schema does not. + if (pos.get(0).schemaId() == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + namespace.level(2)); + } + // LEFT JOIN on function_meta: filter out the placeholder row for a schema without functions. + return pos.stream().filter(po -> po.functionId() != null).collect(Collectors.toList()); + } + + @Override + public boolean supportsParentIdRelationalRead() { + return true; + } + + @Override + protected Entity.EntityType entityType() { + return Entity.EntityType.FUNCTION; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConventionPOStorageOps.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConventionPOStorageOps.java new file mode 100644 index 00000000000..4f2b570ddb7 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConventionPOStorageOps.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.utils.HierarchicalSchemaUtil; + +/** + * Wraps a {@link BasePOStorageOps} to bridge the hierarchical schema naming convention. Identifiers + * and namespace segments in logical form (logical separator) are translated to physical form + * (physical separator) before delegating. Two optional PO rewriters allow callers to translate a PO + * field across the boundary: {@code physicalToLogicalRewriter} is applied to POs returned from read + * methods (typically physical→logical), and {@code logicalToPhysicalRewriter} is applied to POs + * passed into write methods (typically logical→physical) so the SQL still receives storage-form + * values. + * + * @param persistent object type + * @param MyBatis mapper type + */ +public class HierarchicalConventionPOStorageOps extends BasePOStorageOps { + + private final BasePOStorageOps delegate; + private final UnaryOperator physicalToLogicalRewriter; + private final UnaryOperator logicalToPhysicalRewriter; + + public HierarchicalConventionPOStorageOps(BasePOStorageOps delegate) { + this(delegate, UnaryOperator.identity(), UnaryOperator.identity()); + } + + public HierarchicalConventionPOStorageOps( + BasePOStorageOps delegate, + UnaryOperator physicalToLogicalRewriter, + UnaryOperator logicalToPhysicalRewriter) { + this.delegate = delegate; + this.physicalToLogicalRewriter = physicalToLogicalRewriter; + this.logicalToPhysicalRewriter = logicalToPhysicalRewriter; + } + + @Override + public void insertPO(Mapper mapper, PO po, boolean overwrite) { + delegate.insertPO(mapper, logicalToPhysicalRewriter.apply(po), overwrite); + } + + @Override + public void batchInsertPOs(Mapper mapper, List pos, boolean overwrite) { + delegate.batchInsertPOs(mapper, applyWrite(pos), overwrite); + } + + @Override + public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) { + return delegate.updatePO( + mapper, logicalToPhysicalRewriter.apply(newPO), logicalToPhysicalRewriter.apply(oldPO)); + } + + @Override + public PO getPO(Mapper mapper, Long parentId, String name) { + return applyRead(delegate.getPO(mapper, parentId, toPhysicalIfHierarchical(name))); + } + + @Override + public List listPOs(Mapper mapper, Long parentId) { + return applyRead(delegate.listPOs(mapper, parentId)); + } + + @Override + public List listPOs(Mapper mapper, Namespace logicalNamespace, List names) { + Namespace physicalNamespace = logicalToPhysicalNamespace(logicalNamespace); + List physicalNames = + names.stream() + .map(HierarchicalConventionPOStorageOps::toPhysicalIfHierarchical) + .collect(Collectors.toList()); + return applyRead(delegate.listPOs(mapper, physicalNamespace, physicalNames)); + } + + @Override + public List listPOs(Mapper mapper, List entityIds) { + return applyRead(delegate.listPOs(mapper, entityIds)); + } + + @Override + public PO getPOByFullName(Mapper mapper, NameIdentifier logical) { + NameIdentifier physical = logicalToPhysicalIdentifier(logical); + return applyRead(delegate.getPOByFullName(mapper, physical)); + } + + @Override + public List listPOsByNSFullName(Mapper mapper, Namespace logical) { + Namespace physical = logicalToPhysicalNamespace(logical); + return applyRead(delegate.listPOsByNSFullName(mapper, physical)); + } + + @Override + public boolean supportsParentIdRelationalRead() { + return delegate.supportsParentIdRelationalRead(); + } + + @Override + protected Entity.EntityType entityType() { + return delegate.entityType(); + } + + private PO applyRead(PO po) { + return po == null ? null : physicalToLogicalRewriter.apply(po); + } + + private List applyRead(List pos) { + if (pos == null || pos.isEmpty()) { + return pos; + } + return pos.stream().map(this::applyRead).collect(Collectors.toList()); + } + + private List applyWrite(List pos) { + if (pos == null || pos.isEmpty()) { + return pos; + } + return pos.stream().map(logicalToPhysicalRewriter).collect(Collectors.toList()); + } + + private static String toPhysicalIfHierarchical(String name) { + if (StringUtils.isBlank(name)) { + return name; + } + String sep = HierarchicalSchemaUtil.schemaSeparator(); + if (!name.contains(sep)) { + return name; + } + return HierarchicalSchemaUtil.logicalToPhysical(name, sep); + } + + private static NameIdentifier logicalToPhysicalIdentifier(NameIdentifier logical) { + String[] levels = logical.namespace().levels(); + if (levels.length == 2) { + String rawName = logical.name(); + String physicalName = + StringUtils.isNotBlank(rawName) + ? HierarchicalSchemaUtil.logicalToPhysical( + rawName, HierarchicalSchemaUtil.schemaSeparator()) + : rawName; + if (physicalName.equals(logical.name())) { + return logical; + } + return NameIdentifier.of(logical.namespace(), physicalName); + } + if (levels.length == 3) { + String rawSeg = levels[2]; + String physicalSchema = + StringUtils.isNotBlank(rawSeg) + ? HierarchicalSchemaUtil.logicalToPhysical( + rawSeg, HierarchicalSchemaUtil.schemaSeparator()) + : rawSeg; + if (physicalSchema.equals(levels[2])) { + return logical; + } + return NameIdentifier.of(Namespace.of(levels[0], levels[1], physicalSchema), logical.name()); + } + return logical; + } + + private static Namespace logicalToPhysicalNamespace(Namespace logical) { + String[] levels = logical.levels(); + if (levels.length != 3) { + return logical; + } + String rawSeg = levels[2]; + String physicalSchema = + StringUtils.isNotBlank(rawSeg) + ? HierarchicalSchemaUtil.logicalToPhysical( + rawSeg, HierarchicalSchemaUtil.schemaSeparator()) + : rawSeg; + if (physicalSchema.equals(levels[2])) { + return logical; + } + return Namespace.of(levels[0], levels[1], physicalSchema); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java index ee8f7ff8ad7..315d7118914 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java @@ -96,6 +96,14 @@ public class MetadataObjectService { MetadataObjectService::getJobTemplateObjectsFullName) .build(); + static final Map> TYPE_TO_STORAGE_OPS_MAP = + ImmutableMap.>builder() + .put(MetadataObject.Type.SCHEMA, new SchemaPOStorageOps()) + .put(MetadataObject.Type.TABLE, new TablePOStorageOps()) + .put(MetadataObject.Type.VIEW, new ViewPOStorageOps()) + .put(MetadataObject.Type.FUNCTION, new FunctionPOStorageOps()) + .build(); + private static Map getPolicyObjectsFullName(List policyIds) { if (policyIds == null || policyIds.isEmpty()) { return Maps.newHashMap(); @@ -345,9 +353,13 @@ public static Map getFunctionObjectsFullName(List functionId return Maps.newHashMap(); } + @SuppressWarnings("unchecked") + BasePOStorageOps ops = + (BasePOStorageOps) + TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.FUNCTION); List functionPOs = SessionUtils.getWithoutCommit( - FunctionMetaMapper.class, mapper -> mapper.listFunctionPOsByFunctionIds(functionIds)); + FunctionMetaMapper.class, mapper -> ops.listPOs(mapper, functionIds)); if (functionPOs == null || functionPOs.isEmpty()) { return new HashMap<>(); @@ -395,9 +407,13 @@ public static Map getTableObjectsFullName(List tableIds) { return Maps.newHashMap(); } + @SuppressWarnings("unchecked") + BasePOStorageOps ops = + (BasePOStorageOps) + TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.TABLE); List tablePOs = SessionUtils.getWithoutCommit( - TableMetaMapper.class, mapper -> mapper.listTablePOsByTableIds(tableIds)); + TableMetaMapper.class, mapper -> ops.listPOs(mapper, tableIds)); if (tablePOs == null || tablePOs.isEmpty()) { return Maps.newHashMap(); @@ -534,9 +550,12 @@ public static Map getTopicObjectsFullName(List topicIds) { metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "getViewObjectsFullName") public static Map getViewObjectsFullName(List viewIds) { + @SuppressWarnings("unchecked") + BasePOStorageOps ops = + (BasePOStorageOps) + TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.VIEW); List viewPOs = - SessionUtils.getWithoutCommit( - ViewMetaMapper.class, mapper -> mapper.listViewPOsByViewIds(viewIds)); + SessionUtils.getWithoutCommit(ViewMetaMapper.class, mapper -> ops.listPOs(mapper, viewIds)); if (viewPOs == null || viewPOs.isEmpty()) { return new HashMap<>(); } @@ -601,9 +620,13 @@ public static Map getCatalogObjectsFullName(List catalogIds) metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "getSchemaObjectsFullName") public static Map getSchemaObjectsFullName(List schemaIds) { + @SuppressWarnings("unchecked") + BasePOStorageOps ops = + (BasePOStorageOps) + TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.SCHEMA); List schemaPOs = SessionUtils.getWithoutCommit( - SchemaMetaMapper.class, mapper -> mapper.listSchemaPOsBySchemaIds(schemaIds)); + SchemaMetaMapper.class, mapper -> ops.listPOs(mapper, schemaIds)); if (schemaPOs == null || schemaPOs.isEmpty()) { return new HashMap<>(); diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java new file mode 100644 index 00000000000..3b41ecb59c3 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import org.apache.gravitino.Entity; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.utils.NameIdentifierUtil; + +/** + * Routes relational PO reads between parent-id based SQL (when the entity-id cache is enabled) and + * full qualified name SQL. Callers keep cache policy in one place instead of embedding it in {@link + * BasePOStorageOps}. + */ +public final class POStorageReadRouting { + + private POStorageReadRouting() {} + + /** + * Loads a PO using parent-id based SQL when the entity-id cache is enabled and {@code ops} + * supports that path; otherwise uses full qualified name SQL. + * + * @param mapper MyBatis mapper session + * @param identifier entity name identifier (logical naming when wrapped by hierarchical ops) + * @param ops storage operations delegate + * @param entityType entity type for {@code identifier} (used to resolve the parent id) + * @param persistent object type + * @param mapper type + * @return loaded PO or null when the delegate returns null + */ + public static PO getPO( + Mapper mapper, + NameIdentifier identifier, + BasePOStorageOps ops, + Entity.EntityType entityType) { + return getPO(mapper, identifier, ops, entityType, GravitinoEnv.getInstance().cacheEnabled()); + } + + /** + * Same as {@link #getPO} but uses an explicit cache flag (typically for tests). + * + * @param cacheEnabled when true, prefer parent-id based reads when supported + */ + public static PO getPO( + Mapper mapper, + NameIdentifier identifier, + BasePOStorageOps ops, + Entity.EntityType entityType, + boolean cacheEnabled) { + if (cacheEnabled && ops.supportsParentIdRelationalRead()) { + Long parentId = + EntityIdService.getEntityId( + NameIdentifier.parse(identifier.namespace().toString()), + NameIdentifierUtil.parentEntityType(entityType)); + return ops.getPO(mapper, parentId, identifier.name()); + } + return ops.getPOByFullName(mapper, identifier); + } + + /** + * Lists POs under a namespace using parent-id based SQL when the entity-id cache is enabled and + * {@code ops} supports that path; otherwise uses full qualified namespace SQL. + * + * @param mapper MyBatis mapper session + * @param namespace parent namespace for the listed entities + * @param ops storage operations delegate + * @param entityType entity type stored under {@code namespace} (used to resolve the parent id) + * @param persistent object type + * @param mapper type + * @return list from the delegate (may be empty) + */ + public static List listPOs( + Mapper mapper, + Namespace namespace, + BasePOStorageOps ops, + Entity.EntityType entityType) { + return listPOs(mapper, namespace, ops, entityType, GravitinoEnv.getInstance().cacheEnabled()); + } + + /** + * Same as {@link #listPOs} but uses an explicit cache flag (typically for tests). + * + * @param cacheEnabled when true, prefer parent-id based reads when supported + */ + public static List listPOs( + Mapper mapper, + Namespace namespace, + BasePOStorageOps ops, + Entity.EntityType entityType, + boolean cacheEnabled) { + if (cacheEnabled && ops.supportsParentIdRelationalRead()) { + Long parentId = + EntityIdService.getEntityId( + NameIdentifier.parse(namespace.toString()), + NameIdentifierUtil.parentEntityType(entityType)); + return ops.listPOs(mapper, parentId); + } + return ops.listPOsByNSFullName(mapper, namespace); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java index c7edb6d26f5..5cc65c178d5 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java @@ -33,7 +33,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.gravitino.Entity; -import org.apache.gravitino.Entity.EntityType; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.HasIdentifier; import org.apache.gravitino.MetadataObject; @@ -80,29 +79,18 @@ /** The service class for schema metadata. It provides the basic database operations for schema. */ public class SchemaMetaService { private static final SchemaMetaService INSTANCE = new SchemaMetaService(); + private BasePOStorageOps ops; public static SchemaMetaService getInstance() { return INSTANCE; } - private SchemaMetaService() {} - - @Monitored( - metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, - baseMetricName = "getSchemaPOByCatalogIdAndName") - public SchemaPO getSchemaPOByCatalogIdAndName(Long catalogId, String schemaName) { - SchemaPO schemaPO = - SessionUtils.getWithoutCommit( - SchemaMetaMapper.class, - mapper -> mapper.selectSchemaMetaByCatalogIdAndName(catalogId, schemaName)); - - if (schemaPO == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.SCHEMA.name().toLowerCase(), - schemaName); - } - return schemaPO; + private SchemaMetaService() { + this.ops = + new HierarchicalConventionPOStorageOps<>( + new SchemaPOStorageOps(), + SchemaMetaService::physicalToLogicalSchemaPO, + SchemaMetaService::logicalToPhysicalSchemaPO); } @Monitored( @@ -110,39 +98,21 @@ public SchemaPO getSchemaPOByCatalogIdAndName(Long catalogId, String schemaName) baseMetricName = "getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName") public SchemaIds getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName( String metalakeName, String catalogName, String schemaName) { - SchemaIds schemaIds = + NameIdentifier identifier = NameIdentifier.of(metalakeName, catalogName, schemaName); + SchemaPO schemaPO = SessionUtils.getWithoutCommit( SchemaMetaMapper.class, mapper -> - mapper.selectSchemaIdByMetalakeNameAndCatalogNameAndSchemaName( - metalakeName, catalogName, schemaName)); + POStorageReadRouting.getPO(mapper, identifier, ops, Entity.EntityType.SCHEMA)); - if (schemaIds == null) { + if (schemaPO == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.SCHEMA.name().toLowerCase(), schemaName); } - return schemaIds; - } - - @Monitored( - metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, - baseMetricName = "getSchemaIdByCatalogIdAndName") - public Long getSchemaIdByCatalogIdAndName(Long catalogId, String schemaName) { - Long schemaId = - SessionUtils.getWithoutCommit( - SchemaMetaMapper.class, - mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName)); - - if (schemaId == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.SCHEMA.name().toLowerCase(), - schemaName); - } - return schemaId; + return new SchemaIds(schemaPO.getMetalakeId(), schemaPO.getCatalogId(), schemaPO.getSchemaId()); } @Monitored( @@ -169,23 +139,24 @@ public List listSchemasByNamespace(Namespace namespace) { public void insertSchema(SchemaEntity schemaEntity, boolean overwrite) throws IOException { try { NameIdentifierUtil.checkSchema(schemaEntity.nameIdentifier()); - // Callers above this service (e.g. JDBCBackend + naming bridge) pass storage-form schema - // names: nested paths use the internal physical separator, not the external logical one. - String physicalSep = HierarchicalSchemaUtil.physicalSeparator(); + // SchemaEntity arrives in API/logical form (separator = HierarchicalSchemaUtil + // .schemaSeparator()). We split here on the logical separator and build ancestor rows in + // logical form. HierarchicalConventionPOStorageOps.batchInsertPOs applies its write + // rewriter to translate each PO's name to storage form before SQL execution. + String logicalSep = HierarchicalSchemaUtil.schemaSeparator(); String schemaName = schemaEntity.name(); List rowsToInsert = new ArrayList<>(); - if (schemaName == null || !schemaName.contains(physicalSep)) { + if (schemaName == null || !schemaName.contains(logicalSep)) { rowsToInsert.add(schemaEntity); } else { - // Segments of the storage-form name; e.g. [A, B, C] -> ancestor rows "A", "A"+sep+"B", then - // leaf. - String[] parts = schemaName.split(Pattern.quote(physicalSep), -1); + // Segments of the logical name; e.g. "A:B:C" -> ancestor rows "A", "A:B", then leaf. + String[] parts = schemaName.split(Pattern.quote(logicalSep), -1); for (int nSeg = 1; nSeg < parts.length; nSeg++) { - String ancestorPhysical = String.join(physicalSep, Arrays.copyOf(parts, nSeg)); + String ancestorLogical = String.join(logicalSep, Arrays.copyOf(parts, nSeg)); SchemaEntity ancestor = SchemaEntity.builder() .withId(nextIdForNestedAncestor()) - .withName(ancestorPhysical) + .withName(ancestorLogical) .withNamespace(schemaEntity.namespace()) .withComment(null) .withProperties(Collections.emptyMap()) @@ -204,19 +175,16 @@ public void insertSchema(SchemaEntity schemaEntity, boolean overwrite) throws IO if (n > 1) { SchemaEntity firstAncestor = rowsToInsert.get(0); Namespace ancestorNs = firstAncestor.namespace(); - List ancestorPhysicalNames = + List ancestorNames = rowsToInsert.subList(0, n - 1).stream() .map(SchemaEntity::name) .collect(Collectors.toList()); - Set existingAncestorNames = - mapper - .batchSelectSchemaByIdentifier( - ancestorNs.level(0), ancestorNs.level(1), ancestorPhysicalNames) - .stream() + Set existingLogicalNames = + ops.listPOs(mapper, ancestorNs, ancestorNames).stream() .map(SchemaPO::getSchemaName) .collect(Collectors.toSet()); for (SchemaEntity row : rowsToInsert.subList(0, n - 1)) { - if (existingAncestorNames.contains(row.name())) { + if (existingLogicalNames.contains(row.name())) { continue; } SchemaPO.Builder builder = SchemaPO.builder(); @@ -230,11 +198,7 @@ public void insertSchema(SchemaEntity schemaEntity, boolean overwrite) throws IO SchemaPO leafPO = POConverters.initializeSchemaPOWithVersion(leafRow, leafBuilder); List schemaPosToInsert = new ArrayList<>(missingAncestorPOs); schemaPosToInsert.add(leafPO); - if (overwrite) { - mapper.batchInsertSchemaMetaOnDuplicateKeyUpdate(schemaPosToInsert); - } else { - mapper.batchInsertSchemaMeta(schemaPosToInsert); - } + ops.batchInsertPOs(mapper, schemaPosToInsert, overwrite); }); } catch (RuntimeException re) { ExceptionUtils.checkSQLException( @@ -271,7 +235,8 @@ public SchemaEntity updateSchema( SessionUtils.getWithoutCommit( SchemaMetaMapper.class, mapper -> - mapper.updateSchemaMeta( + ops.updatePO( + mapper, POConverters.updateSchemaPOWithVersion(oldSchemaPO, newEntity), oldSchemaPO))), () -> { @@ -494,96 +459,24 @@ public int deleteSchemaMetasByLegacyTimeline(Long legacyTimeline, int limit) { private SchemaPO getSchemaPOByIdentifier(NameIdentifier identifier) { NameIdentifierUtil.checkSchema(identifier); - return schemaPOFetcher().apply(identifier); - } - - private SchemaPO getSchemaByFullQualifiedName( - String metalakeName, String catalogName, String schemaName) { SchemaPO schemaPO = SessionUtils.getWithoutCommit( SchemaMetaMapper.class, mapper -> - mapper.selectSchemaByFullQualifiedName(metalakeName, catalogName, schemaName)); + POStorageReadRouting.getPO(mapper, identifier, ops, Entity.EntityType.SCHEMA)); if (schemaPO == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.CATALOG.name().toLowerCase(), - schemaName); - } - - return schemaPO; - } - - private List listSchemaPOs(Namespace namespace) { - return schemaListFetcher().apply(namespace); - } - - private List listSchemaPOsByCatalogId(Namespace namespace) { - Long catalogId = - EntityIdService.getEntityId( - NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG); - - return SessionUtils.getWithoutCommit( - SchemaMetaMapper.class, mapper -> mapper.listSchemaPOsByCatalogId(catalogId)); - } - - private List listSchemaPOsByFullQualifiedName(Namespace namespace) { - String[] namespaceLevels = namespace.levels(); - List schemaPOs = - SessionUtils.getWithoutCommit( - SchemaMetaMapper.class, - mapper -> - mapper.listSchemaPOsByFullQualifiedName(namespaceLevels[0], namespaceLevels[1])); - if (schemaPOs.isEmpty() || schemaPOs.get(0).getCatalogId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.CATALOG.name().toLowerCase(), - namespaceLevels[1]); - } - return schemaPOs.stream().filter(po -> po.getSchemaId() != null).collect(Collectors.toList()); - } - - private SchemaPO getSchemaPOByCatalogId(NameIdentifier identifier) { - Long catalogId = - EntityIdService.getEntityId( - NameIdentifier.of(identifier.namespace().levels()), Entity.EntityType.CATALOG); - return getSchemaPOByCatalogIdAndName(catalogId, identifier.name()); - } - - private SchemaPO getSchemaPOByFullQualifiedName(NameIdentifier identifier) { - String[] namespaceLevels = identifier.namespace().levels(); - SchemaPO schemaPO = - getSchemaByFullQualifiedName(namespaceLevels[0], namespaceLevels[1], identifier.name()); - - if (schemaPO.getCatalogId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.CATALOG.name().toLowerCase(), - namespaceLevels[1]); - } - - if (schemaPO.getSchemaId() == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.SCHEMA.name().toLowerCase(), identifier.name()); } - return schemaPO; } - private Function> schemaListFetcher() { - // If cache is enabled, we can use catalog id to fetch schemas faster or else use full qualified - // name to join several tables to get the schema list. - return GravitinoEnv.getInstance().cacheEnabled() - ? this::listSchemaPOsByCatalogId - : this::listSchemaPOsByFullQualifiedName; - } - - private Function schemaPOFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::getSchemaPOByCatalogId - : this::getSchemaPOByFullQualifiedName; + private List listSchemaPOs(Namespace namespace) { + return SessionUtils.getWithoutCommit( + SchemaMetaMapper.class, + mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, Entity.EntityType.SCHEMA)); } private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, Namespace namespace) { @@ -605,12 +498,14 @@ public List batchGetSchemaByIdentifier(List identi List schemaNames = identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList()); - return SessionUtils.doWithCommitAndFetchResult( + return SessionUtils.getWithoutCommit( SchemaMetaMapper.class, mapper -> { List schemaPOs = - mapper.batchSelectSchemaByIdentifier( - catalogIdent.namespace().level(0), catalogIdent.name(), schemaNames); + ops.listPOs( + mapper, + Namespace.of(catalogIdent.namespace().levels()[0], catalogIdent.name()), + schemaNames); return POConverters.fromSchemaPOs(schemaPOs, firstIdent.namespace()); }); } @@ -623,4 +518,39 @@ private static long nextIdForNestedAncestor() { } return generator.nextId(); } + + private static SchemaPO physicalToLogicalSchemaPO(SchemaPO po) { + String name = po.getSchemaName(); + if (name == null || !name.contains(HierarchicalSchemaUtil.physicalSeparator())) { + return po; + } + return copySchemaPOWithName( + po, + HierarchicalSchemaUtil.physicalToLogical(name, HierarchicalSchemaUtil.schemaSeparator())); + } + + private static SchemaPO logicalToPhysicalSchemaPO(SchemaPO po) { + String name = po.getSchemaName(); + if (name == null || !name.contains(HierarchicalSchemaUtil.schemaSeparator())) { + return po; + } + return copySchemaPOWithName( + po, + HierarchicalSchemaUtil.logicalToPhysical(name, HierarchicalSchemaUtil.schemaSeparator())); + } + + private static SchemaPO copySchemaPOWithName(SchemaPO po, String name) { + return SchemaPO.builder() + .withSchemaId(po.getSchemaId()) + .withSchemaName(name) + .withMetalakeId(po.getMetalakeId()) + .withCatalogId(po.getCatalogId()) + .withSchemaComment(po.getSchemaComment()) + .withProperties(po.getProperties()) + .withAuditInfo(po.getAuditInfo()) + .withCurrentVersion(po.getCurrentVersion()) + .withLastVersion(po.getLastVersion()) + .withDeletedAt(po.getDeletedAt()) + .build(); + } } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java new file mode 100644 index 00000000000..568437b4c51 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper; +import org.apache.gravitino.storage.relational.po.SchemaPO; + +public class SchemaPOStorageOps extends BasePOStorageOps { + + public SchemaPOStorageOps() {} + + @Override + public void batchInsertPOs(SchemaMetaMapper mapper, List schemaPOs, boolean overwrite) { + if (overwrite) { + mapper.batchInsertSchemaMetaOnDuplicateKeyUpdate(schemaPOs); + } else { + mapper.batchInsertSchemaMeta(schemaPOs); + } + } + + @Override + public Integer updatePO(SchemaMetaMapper mapper, SchemaPO oldPO, SchemaPO newPO) { + return mapper.updateSchemaMeta(oldPO, newPO); + } + + @Override + public SchemaPO getPO(SchemaMetaMapper mapper, Long parentId, String name) { + return mapper.selectSchemaMetaByCatalogIdAndName(parentId, name); + } + + @Override + public SchemaPO getPOByFullName(SchemaMetaMapper mapper, NameIdentifier identifier) { + Namespace namespace = identifier.namespace(); + SchemaPO po = + mapper.selectSchemaByFullQualifiedName( + namespace.level(0), namespace.level(1), identifier.name()); + // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog does not exist. + if (po == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // LEFT JOIN on schema_meta: a row with non-null catalogId but null schemaId means + // the catalog exists but the schema does not. + if (po.getSchemaId() == null) { + return null; + } + return po; + } + + @Override + public List listPOs(SchemaMetaMapper schemaMetaMapper, Long parentId) { + return schemaMetaMapper.listSchemaPOsByCatalogId(parentId); + } + + @Override + public List listPOs( + SchemaMetaMapper schemaMetaMapper, Namespace namespace, List names) { + return schemaMetaMapper.batchSelectSchemaByIdentifier( + namespace.level(0), namespace.level(1), names); + } + + @Override + public List listPOs(SchemaMetaMapper schemaMetaMapper, List entityIds) { + return schemaMetaMapper.listSchemaPOsBySchemaIds(entityIds); + } + + @Override + public List listPOsByNSFullName( + SchemaMetaMapper schemaMetaMapper, Namespace namespace) { + List pos = + schemaMetaMapper.listSchemaPOsByFullQualifiedName(namespace.level(0), namespace.level(1)); + // An empty result means the parent metalake or catalog does not exist. + if (pos.isEmpty()) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // Same LEFT JOIN behavior as getPOByFullName: filter out the placeholder row that + // represents an existing catalog without any matching schema. + return pos.stream().filter(po -> po.getSchemaId() != null).collect(Collectors.toList()); + } + + @Override + public boolean supportsParentIdRelationalRead() { + return true; + } + + @Override + protected Entity.EntityType entityType() { + return Entity.EntityType.SCHEMA; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java index 02c094a5092..d4486c88a1d 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java @@ -28,10 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.gravitino.Entity; -import org.apache.gravitino.Entity.EntityType; -import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.HasIdentifier; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.NameIdentifier; @@ -60,29 +57,31 @@ /** The service class for table metadata. It provides the basic database operations for table. */ public class TableMetaService { private static final TableMetaService INSTANCE = new TableMetaService(); + private BasePOStorageOps ops; public static TableMetaService getInstance() { return INSTANCE; } - private TableMetaService() {} + private TableMetaService() { + this.ops = new HierarchicalConventionPOStorageOps<>(new TablePOStorageOps()); + } @Monitored( metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "getTableIdBySchemaIdAndName") public Long getTableIdBySchemaIdAndName(Long schemaId, String tableName) { - Long tableId = + TablePO tablePO = SessionUtils.getWithoutCommit( - TableMetaMapper.class, - mapper -> mapper.selectTableIdBySchemaIdAndName(schemaId, tableName)); + TableMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, tableName)); - if (tableId == null) { + if (tablePO == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.TABLE.name().toLowerCase(), tableName); } - return tableId; + return tablePO.getTableId(); } @Monitored( @@ -124,11 +123,7 @@ public void insertTable(TableEntity tableEntity, boolean overwrite) throws IOExc TableMetaMapper.class, mapper -> { tablePORef.set(po); - if (overwrite) { - mapper.insertTableMetaOnDuplicateKeyUpdate(po); - } else { - mapper.insertTableMeta(po); - } + ops.insertPO(mapper, po, overwrite); }), () -> SessionUtils.doWithoutCommit( @@ -204,7 +199,7 @@ public TableEntity updateTable( updateResult.set( SessionUtils.getWithoutCommit( TableMetaMapper.class, - mapper -> mapper.updateTableMeta(newTablePO, oldTablePO, newSchemaId))), + mapper -> ops.updatePO(mapper, newTablePO, oldTablePO))), () -> SessionUtils.doWithoutCommit( TableVersionMapper.class, @@ -347,131 +342,43 @@ public List batchGetTableByIdentifier(List identifi Objects.equals(schemaIdent, NameIdentifierUtil.getSchemaIdentifier(ident))); tableNames.add(ident.name()); } - Long schemaId = EntityIdService.getEntityId(schemaIdent, Entity.EntityType.SCHEMA); return SessionUtils.doWithCommitAndFetchResult( TableMetaMapper.class, mapper -> { - List tableList = mapper.batchSelectTableByIdentifier(schemaId, tableNames); + List tableList = ops.listPOs(mapper, firstIdent.namespace(), tableNames); return POConverters.fromTablePOs(tableList, firstIdent.namespace()); }); } - private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, Namespace namespace) { - NamespaceUtil.checkTable(namespace); - NamespacedEntityId namespacedEntityId = - EntityIdService.getEntityIds( - NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA); - builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]); - builder.withCatalogId(namespacedEntityId.namespaceIds()[1]); - builder.withSchemaId(namespacedEntityId.entityId()); - } - private TablePO getTablePOByIdentifier(NameIdentifier identifier) { NameIdentifierUtil.checkTable(identifier); - - return tablePOFetcher().apply(identifier); - } - - private TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) { - TablePO tablePO = - SessionUtils.getWithoutCommit( - TableMetaMapper.class, - mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, tableName)); - if (tablePO == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - Entity.EntityType.TABLE.name().toLowerCase(), - tableName); - } - return tablePO; - } - - private TablePO getTableByFullQualifiedName( - String metalakeName, String catalogName, String schemaName, String tableName) { TablePO tablePO = SessionUtils.getWithoutCommit( TableMetaMapper.class, - mapper -> - mapper.selectTableByFullQualifiedName( - metalakeName, catalogName, schemaName, tableName)); + mapper -> POStorageReadRouting.getPO(mapper, identifier, ops, Entity.EntityType.TABLE)); if (tablePO == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.TABLE.name().toLowerCase(), - tableName); + identifier.name()); } return tablePO; } private List listTablePOs(Namespace namespace) { - return tableListFetcher().apply(namespace); - } - - private List listTablePOsBySchemaId(Namespace namespace) { - Long schemaId = - EntityIdService.getEntityId( - NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA); return SessionUtils.getWithoutCommit( - TableMetaMapper.class, mapper -> mapper.listTablePOsBySchemaId(schemaId)); - } - - private List listTablePOsByFullQualifiedName(Namespace namespace) { - String[] namespaceLevels = namespace.levels(); - List tablePOs = - SessionUtils.getWithoutCommit( - TableMetaMapper.class, - mapper -> - mapper.listTablePOsByFullQualifiedName( - namespaceLevels[0], namespaceLevels[1], namespaceLevels[2])); - if (tablePOs.isEmpty() || tablePOs.get(0).getSchemaId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.SCHEMA.name().toLowerCase(), - namespaceLevels[2]); - } - return tablePOs.stream().filter(po -> po.getTableId() != null).collect(Collectors.toList()); - } - - private TablePO getTablePOBySchemaId(NameIdentifier identifier) { - Long schemaId = - EntityIdService.getEntityId( - NameIdentifier.of(identifier.namespace().levels()), Entity.EntityType.SCHEMA); - return getTablePOBySchemaIdAndName(schemaId, identifier.name()); - } - - private TablePO getTablePOByFullQualifiedName(NameIdentifier identifier) { - String[] namespaceLevels = identifier.namespace().levels(); - TablePO tablePO = - getTableByFullQualifiedName( - namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], identifier.name()); - - if (tablePO.getSchemaId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.SCHEMA.name().toLowerCase(), - namespaceLevels[2]); - } - - if (tablePO.getTableId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.TABLE.name().toLowerCase(), - identifier.name()); - } - - return tablePO; - } - - private Function> tableListFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::listTablePOsBySchemaId - : this::listTablePOsByFullQualifiedName; + TableMetaMapper.class, + mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, Entity.EntityType.TABLE)); } - private Function tablePOFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::getTablePOBySchemaId - : this::getTablePOByFullQualifiedName; + private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, Namespace namespace) { + NamespaceUtil.checkTable(namespace); + NamespacedEntityId namespacedEntityId = + EntityIdService.getEntityIds( + NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA); + builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]); + builder.withCatalogId(namespacedEntityId.namespaceIds()[1]); + builder.withSchemaId(namespacedEntityId.entityId()); } } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java new file mode 100644 index 00000000000..7c9654a885c --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.storage.relational.mapper.TableMetaMapper; +import org.apache.gravitino.storage.relational.po.TablePO; + +public class TablePOStorageOps extends BasePOStorageOps { + + public TablePOStorageOps() {} + + @Override + public void insertPO(TableMetaMapper mapper, TablePO tablePO, boolean overwrite) { + if (overwrite) { + mapper.insertTableMetaOnDuplicateKeyUpdate(tablePO); + } else { + mapper.insertTableMeta(tablePO); + } + } + + @Override + public Integer updatePO(TableMetaMapper mapper, TablePO newPO, TablePO oldPO) { + return mapper.updateTableMeta(newPO, oldPO, newPO.getSchemaId()); + } + + @Override + public TablePO getPO(TableMetaMapper mapper, Long parentId, String name) { + return mapper.selectTableMetaBySchemaIdAndName(parentId, name); + } + + @Override + public TablePO getPOByFullName(TableMetaMapper mapper, NameIdentifier identifier) { + Namespace namespace = identifier.namespace(); + TablePO po = + mapper.selectTableByFullQualifiedName( + namespace.level(0), namespace.level(1), namespace.level(2), identifier.name()); + // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog does not exist. + if (po == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // LEFT JOIN on schema_meta: a row with non-null catalogId but null schemaId means + // the catalog exists but the schema does not. + if (po.getSchemaId() == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + namespace.level(2)); + } + // LEFT JOIN on table_meta: a row with non-null schemaId but null tableId means + // the schema exists but the table does not. + if (po.getTableId() == null) { + return null; + } + return po; + } + + @Override + public List listPOs(TableMetaMapper mapper, Long parentId) { + return mapper.listTablePOsBySchemaId(parentId); + } + + @Override + public List listPOs(TableMetaMapper mapper, Namespace namespace, List names) { + Long schemaId = + EntityIdService.getEntityId( + NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA); + return mapper.batchSelectTableByIdentifier(schemaId, names); + } + + @Override + public List listPOs(TableMetaMapper mapper, List entityIds) { + return mapper.listTablePOsByTableIds(entityIds); + } + + @Override + public List listPOsByNSFullName(TableMetaMapper mapper, Namespace namespace) { + List pos = + mapper.listTablePOsByFullQualifiedName( + namespace.level(0), namespace.level(1), namespace.level(2)); + // INNER JOIN on metalake/catalog: an empty result means the metalake or catalog does not exist. + if (pos.isEmpty()) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // LEFT JOIN on schema_meta: rows with non-null catalogId but null schemaId mean the + // catalog exists but the schema does not. + if (pos.get(0).getSchemaId() == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + namespace.level(2)); + } + // LEFT JOIN on table_meta: filter out the placeholder row for a schema without tables. + return pos.stream().filter(po -> po.getTableId() != null).collect(Collectors.toList()); + } + + @Override + public boolean supportsParentIdRelationalRead() { + return true; + } + + @Override + protected Entity.EntityType entityType() { + return Entity.EntityType.TABLE; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java index 177118c8118..4384e2b27b6 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java @@ -31,8 +31,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.gravitino.Entity; -import org.apache.gravitino.Entity.EntityType; -import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.HasIdentifier; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.NameIdentifier; @@ -57,29 +55,31 @@ public class ViewMetaService { private static final ViewMetaService INSTANCE = new ViewMetaService(); + private BasePOStorageOps ops; public static ViewMetaService getInstance() { return INSTANCE; } - private ViewMetaService() {} + private ViewMetaService() { + this.ops = new HierarchicalConventionPOStorageOps<>(new ViewPOStorageOps()); + } @Monitored( metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "getViewIdBySchemaIdAndName") public Long getViewIdBySchemaIdAndName(Long schemaId, String viewName) { - Long viewId = + ViewPO viewPO = SessionUtils.getWithoutCommit( - ViewMetaMapper.class, - mapper -> mapper.selectViewIdBySchemaIdAndName(schemaId, viewName)); + ViewMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, viewName)); - if (viewId == null) { + if (viewPO == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.VIEW.name().toLowerCase(), viewName); } - return viewId; + return viewPO.getViewId(); } @Monitored( @@ -87,7 +87,7 @@ public Long getViewIdBySchemaIdAndName(Long schemaId, String viewName) { baseMetricName = "listViewsByNamespace") public List listViewsByNamespace(Namespace namespace) { NamespaceUtil.checkView(namespace); - List viewPOs = listViewPOsByNamespace(namespace); + List viewPOs = listViewPOs(namespace); return viewPOs.stream().map(po -> fromViewPO(po, namespace)).collect(Collectors.toList()); } @@ -95,8 +95,7 @@ public List listViewsByNamespace(Namespace namespace) { metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "getViewByIdentifier") public ViewEntity getViewByIdentifier(NameIdentifier identifier) { - NameIdentifierUtil.checkView(identifier); - ViewPO viewPO = viewPOFetcher().apply(identifier); + ViewPO viewPO = getViewPOByIdentifier(identifier); return fromViewPO(viewPO, identifier.namespace()); } @@ -111,14 +110,7 @@ public void insertView(ViewEntity viewEntity, boolean overwrite) throws IOExcept SessionUtils.doMultipleWithCommit( () -> SessionUtils.doWithoutCommit( - ViewMetaMapper.class, - mapper -> { - if (overwrite) { - mapper.insertViewMetaOnDuplicateKeyUpdate(po); - } else { - mapper.insertViewMeta(po); - } - }), + ViewMetaMapper.class, mapper -> ops.insertPO(mapper, po, overwrite)), () -> SessionUtils.doWithoutCommit( ViewVersionInfoMapper.class, @@ -141,9 +133,7 @@ public void insertView(ViewEntity viewEntity, boolean overwrite) throws IOExcept baseMetricName = "updateViewByIdentifier") public ViewEntity updateView( NameIdentifier ident, Function updater) throws IOException { - NameIdentifierUtil.checkView(ident); - - ViewPO oldViewPO = viewPOFetcher().apply(ident); + ViewPO oldViewPO = getViewPOByIdentifier(ident); ViewEntity oldViewEntity = fromViewPO(oldViewPO, ident.namespace()); ViewEntity newEntity = (ViewEntity) updater.apply((E) oldViewEntity); Preconditions.checkArgument( @@ -171,7 +161,7 @@ public ViewEntity updateView( () -> { updateResult.set( SessionUtils.getWithoutCommit( - ViewMetaMapper.class, mapper -> mapper.updateViewMeta(newViewPO, oldViewPO))); + ViewMetaMapper.class, mapper -> ops.updatePO(mapper, newViewPO, oldViewPO))); if (updateResult.get() == 0) { throw new RuntimeException("Failed to update the entity: " + ident); } @@ -203,8 +193,7 @@ public ViewEntity updateView( metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "deleteViewByIdentifier") public boolean deleteView(NameIdentifier ident) { - NameIdentifierUtil.checkView(ident); - ViewPO viewPO = viewPOFetcher().apply(ident); + ViewPO viewPO = getViewPOByIdentifier(ident); String metalakeName = ident.namespace().level(0); String catalogName = ident.namespace().level(1); String schemaName = ident.namespace().level(2); @@ -287,88 +276,25 @@ private ViewPO updateViewPO(ViewPO oldViewPO, ViewEntity newEntity) { return buildViewPO(newEntity, builder, newVersion.intValue()); } - private List listViewPOsByNamespace(Namespace namespace) { - return viewListFetcher().apply(namespace); - } - - private Function> viewListFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::listViewPOsBySchemaId - : this::listViewPOsByFullQualifiedName; - } - - private Function viewPOFetcher() { - return GravitinoEnv.getInstance().cacheEnabled() - ? this::getViewPOBySchemaId - : this::getViewPOByFullQualifiedName; - } - - private List listViewPOsBySchemaId(Namespace namespace) { - Long schemaId = - EntityIdService.getEntityId( - NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA); - return SessionUtils.getWithoutCommit( - ViewMetaMapper.class, mapper -> mapper.listViewPOsBySchemaId(schemaId)); - } - - private List listViewPOsByFullQualifiedName(Namespace namespace) { - String[] namespaceLevels = namespace.levels(); - List viewPOs = - SessionUtils.getWithoutCommit( - ViewMetaMapper.class, - mapper -> - mapper.listViewPOsByFullQualifiedName( - namespaceLevels[0], namespaceLevels[1], namespaceLevels[2])); - if (viewPOs.isEmpty() || viewPOs.get(0).getSchemaId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.SCHEMA.name().toLowerCase(), - namespaceLevels[2]); - } - return viewPOs.stream().filter(po -> po.getViewId() != null).collect(Collectors.toList()); - } - - private ViewPO getViewPOBySchemaId(NameIdentifier identifier) { - Long schemaId = - EntityIdService.getEntityId( - NameIdentifier.of(identifier.namespace().levels()), Entity.EntityType.SCHEMA); + private ViewPO getViewPOByIdentifier(NameIdentifier identifier) { + NameIdentifierUtil.checkView(identifier); ViewPO viewPO = SessionUtils.getWithoutCommit( ViewMetaMapper.class, - mapper -> mapper.selectViewMetaBySchemaIdAndName(schemaId, identifier.name())); - + mapper -> POStorageReadRouting.getPO(mapper, identifier, ops, Entity.EntityType.VIEW)); if (viewPO == null) { throw new NoSuchEntityException( NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, Entity.EntityType.VIEW.name().toLowerCase(), identifier.name()); } + return viewPO; } - private ViewPO getViewPOByFullQualifiedName(NameIdentifier identifier) { - String[] namespaceLevels = identifier.namespace().levels(); - ViewPO viewPO = - SessionUtils.getWithoutCommit( - ViewMetaMapper.class, - mapper -> - mapper.selectViewByFullQualifiedName( - namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], identifier.name())); - - if (viewPO == null || viewPO.getSchemaId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.SCHEMA.name().toLowerCase(), - namespaceLevels[2]); - } - - if (viewPO.getViewId() == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, - EntityType.VIEW.name().toLowerCase(), - identifier.name()); - } - - return viewPO; + private List listViewPOs(Namespace namespace) { + return SessionUtils.getWithoutCommit( + ViewMetaMapper.class, + mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, Entity.EntityType.VIEW)); } } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java new file mode 100644 index 00000000000..23aaeb3d075 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.storage.relational.mapper.ViewMetaMapper; +import org.apache.gravitino.storage.relational.po.ViewPO; + +public class ViewPOStorageOps extends BasePOStorageOps { + + public ViewPOStorageOps() {} + + @Override + public void insertPO(ViewMetaMapper mapper, ViewPO viewPO, boolean overwrite) { + if (overwrite) { + mapper.insertViewMetaOnDuplicateKeyUpdate(viewPO); + } else { + mapper.insertViewMeta(viewPO); + } + } + + @Override + public Integer updatePO(ViewMetaMapper mapper, ViewPO newPO, ViewPO oldPO) { + return mapper.updateViewMeta(newPO, oldPO); + } + + @Override + public ViewPO getPO(ViewMetaMapper mapper, Long parentId, String name) { + return mapper.selectViewMetaBySchemaIdAndName(parentId, name); + } + + @Override + public ViewPO getPOByFullName(ViewMetaMapper mapper, NameIdentifier identifier) { + Namespace namespace = identifier.namespace(); + ViewPO po = + mapper.selectViewByFullQualifiedName( + namespace.level(0), namespace.level(1), namespace.level(2), identifier.name()); + // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog does not exist. + if (po == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // LEFT JOIN on schema_meta: a row with non-null catalogId but null schemaId means + // the catalog exists but the schema does not. + if (po.getSchemaId() == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + namespace.level(2)); + } + // LEFT JOIN on view_meta: a row with non-null schemaId but null viewId means + // the schema exists but the view does not. + if (po.getViewId() == null) { + return null; + } + return po; + } + + @Override + public List listPOs(ViewMetaMapper mapper, Long parentId) { + return mapper.listViewPOsBySchemaId(parentId); + } + + @Override + public List listPOs(ViewMetaMapper mapper, List entityIds) { + return mapper.listViewPOsByViewIds(entityIds); + } + + @Override + public List listPOsByNSFullName(ViewMetaMapper mapper, Namespace namespace) { + List pos = + mapper.listViewPOsByFullQualifiedName( + namespace.level(0), namespace.level(1), namespace.level(2)); + // INNER JOIN on metalake/catalog: an empty result means the metalake or catalog does not exist. + if (pos.isEmpty()) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + namespace.level(1)); + } + // LEFT JOIN on schema_meta: rows with non-null catalogId but null schemaId mean the + // catalog exists but the schema does not. + if (pos.get(0).getSchemaId() == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + namespace.level(2)); + } + // LEFT JOIN on view_meta: filter out the placeholder row for a schema without views. + return pos.stream().filter(po -> po.getViewId() != null).collect(Collectors.toList()); + } + + @Override + public boolean supportsParentIdRelationalRead() { + return true; + } + + @Override + protected Entity.EntityType entityType() { + return Entity.EntityType.VIEW; + } +} diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java new file mode 100644 index 00000000000..c1d17aca292 --- /dev/null +++ b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.meta.SchemaEntity; +import org.apache.gravitino.storage.RandomIdGenerator; +import org.apache.gravitino.utils.HierarchicalSchemaUtil; +import org.apache.gravitino.utils.NamespaceUtil; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; + +/** + * Tests schema insert paths that use relational {@code batchInsertSchemaMeta} (single or batch). + */ +public class TestJDBCBackendBatchInsert extends TestJDBCBackend { + + @TestTemplate + public void testBatchInsertSingleSchemaViaBackend() throws IOException { + String metalakeName = "metalake_batch_insert_single"; + String catalogName = "catalog_batch_insert_single"; + createAndInsertMakeLake(metalakeName); + createAndInsertCatalog(metalakeName, catalogName); + + SchemaEntity schema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + NamespaceUtil.ofSchema(metalakeName, catalogName), + "flat_schema_batch", + AUDIT_INFO); + backend.insert(schema, false); + + SchemaEntity loaded = + (SchemaEntity) + backend.get( + NameIdentifier.of(metalakeName, catalogName, "flat_schema_batch"), + Entity.EntityType.SCHEMA); + Assertions.assertEquals(schema.id(), loaded.id()); + Assertions.assertEquals("flat_schema_batch", loaded.name()); + Assertions.assertEquals(schema.namespace(), loaded.namespace()); + + List listed = + backend.list( + NamespaceUtil.ofSchema(metalakeName, catalogName), Entity.EntityType.SCHEMA, true); + Assertions.assertEquals(1, listed.size()); + Assertions.assertEquals("flat_schema_batch", listed.get(0).name()); + } + + @TestTemplate + public void testBatchInsertHierarchicalSchemaCreatesAncestorsAndLeafViaBackend() + throws IOException { + String metalakeName = "metalake_batch_insert_nested"; + String catalogName = "catalog_batch_insert_nested"; + createAndInsertMakeLake(metalakeName); + createAndInsertCatalog(metalakeName, catalogName); + + String logicalLeaf = "ns_a:ns_b:leaf"; + String sep = HierarchicalSchemaUtil.schemaSeparator(); + String physicalLeaf = HierarchicalSchemaUtil.logicalToPhysical(logicalLeaf, sep); + SchemaEntity hierarchical = + SchemaEntity.builder() + .withId(RandomIdGenerator.INSTANCE.nextId()) + .withName(physicalLeaf) + .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)) + .withComment("nested") + .withProperties(Collections.emptyMap()) + .withAuditInfo(AUDIT_INFO) + .build(); + backend.insert(hierarchical, false); + + List schemas = + backend.list( + NamespaceUtil.ofSchema(metalakeName, catalogName), Entity.EntityType.SCHEMA, true); + Set logicalNames = + schemas.stream() + .map(SchemaEntity::name) + .map( + n -> { + if (n != null && n.contains(HierarchicalSchemaUtil.physicalSeparator())) { + return HierarchicalSchemaUtil.physicalToLogical(n, sep); + } + return n; + }) + .collect(Collectors.toSet()); + + Assertions.assertTrue(logicalNames.contains("ns_a")); + Assertions.assertTrue(logicalNames.contains("ns_a:ns_b")); + Assertions.assertTrue(logicalNames.contains(logicalLeaf)); + + SchemaEntity loaded = + (SchemaEntity) + backend.get( + NameIdentifier.of(metalakeName, catalogName, physicalLeaf), + Entity.EntityType.SCHEMA); + Assertions.assertEquals(logicalLeaf, loaded.name()); + Assertions.assertEquals("nested", loaded.comment()); + } + + @TestTemplate + public void testBatchInsertHierarchicalSecondLeafReusesAncestorsViaBackend() throws IOException { + String metalakeName = "metalake_batch_insert_reuse"; + String catalogName = "catalog_batch_insert_reuse"; + createAndInsertMakeLake(metalakeName); + createAndInsertCatalog(metalakeName, catalogName); + + String sep = HierarchicalSchemaUtil.schemaSeparator(); + String physSep = HierarchicalSchemaUtil.physicalSeparator(); + String physicalLeaf1 = HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf1", sep); + String physicalLeaf2 = HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf2", sep); + String[] parts = physicalLeaf1.split(Pattern.quote(physSep), -1); + String ancestorA = parts[0]; + String ancestorAB = String.join(physSep, Arrays.copyOfRange(parts, 0, 2)); + + SchemaEntity first = + SchemaEntity.builder() + .withId(RandomIdGenerator.INSTANCE.nextId()) + .withName(physicalLeaf1) + .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)) + .withComment("first") + .withProperties(Collections.emptyMap()) + .withAuditInfo(AUDIT_INFO) + .build(); + backend.insert(first, false); + + long idA = + ((SchemaEntity) + backend.get( + NameIdentifier.of(metalakeName, catalogName, ancestorA), + Entity.EntityType.SCHEMA)) + .id(); + long idAB = + ((SchemaEntity) + backend.get( + NameIdentifier.of(metalakeName, catalogName, ancestorAB), + Entity.EntityType.SCHEMA)) + .id(); + + SchemaEntity second = + SchemaEntity.builder() + .withId(RandomIdGenerator.INSTANCE.nextId()) + .withName(physicalLeaf2) + .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)) + .withComment("second") + .withProperties(Collections.emptyMap()) + .withAuditInfo(AUDIT_INFO) + .build(); + backend.insert(second, false); + + Assertions.assertEquals( + idA, + ((SchemaEntity) + backend.get( + NameIdentifier.of(metalakeName, catalogName, ancestorA), + Entity.EntityType.SCHEMA)) + .id()); + Assertions.assertEquals( + idAB, + ((SchemaEntity) + backend.get( + NameIdentifier.of(metalakeName, catalogName, ancestorAB), + Entity.EntityType.SCHEMA)) + .id()); + } +} diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConventionPOStorageOps.java b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConventionPOStorageOps.java new file mode 100644 index 00000000000..a2ab01d112b --- /dev/null +++ b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConventionPOStorageOps.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.UnaryOperator; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.utils.HierarchicalSchemaUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +public class TestHierarchicalConventionPOStorageOps { + + private static final String SEP = HierarchicalSchemaUtil.schemaSeparator(); + private static final String PHYS = HierarchicalSchemaUtil.physicalSeparator(); + + private BasePOStorageOps delegate; + private Object mapper; + + @BeforeEach + @SuppressWarnings("unchecked") + public void setUp() { + delegate = mock(BasePOStorageOps.class); + mapper = new Object(); + } + + // ---------- Input name conversion ---------- + + @Test + public void getPOByParentIdConvertsHierarchicalName() { + when(delegate.getPO(eq(mapper), eq(7L), eq("ns_a" + PHYS + "ns_b"))).thenReturn("found"); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + String result = wrapper.getPO(mapper, 7L, "ns_a" + SEP + "ns_b"); + + assertEquals("found", result); + verify(delegate).getPO(mapper, 7L, "ns_a" + PHYS + "ns_b"); + } + + @Test + public void getPOByParentIdLeavesSimpleNameUnchanged() { + when(delegate.getPO(eq(mapper), eq(7L), eq("plain"))).thenReturn("po"); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + String result = wrapper.getPO(mapper, 7L, "plain"); + + assertEquals("po", result); + verify(delegate).getPO(mapper, 7L, "plain"); + } + + @Test + public void getPOByParentIdReturnsNullWhenDelegateMisses() { + when(delegate.getPO(any(), any(Long.class), any(String.class))).thenReturn(null); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, s -> s + "-rewritten", UnaryOperator.identity()); + + assertNull(wrapper.getPO(mapper, 1L, "missing")); + } + + @Test + public void listPOsByNamespaceAndNamesConvertsEachHierarchicalName() { + List names = Arrays.asList("plain", "ns_a" + SEP + "ns_b", "other"); + Namespace ns = Namespace.of("ml", "cat"); + when(delegate.listPOs(eq(mapper), eq(ns), any(List.class))) + .thenReturn(Collections.singletonList("po")); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + wrapper.listPOs(mapper, ns, names); + + ArgumentCaptor> namesCaptor = ArgumentCaptor.forClass(List.class); + verify(delegate).listPOs(eq(mapper), eq(ns), namesCaptor.capture()); + assertEquals(Arrays.asList("plain", "ns_a" + PHYS + "ns_b", "other"), namesCaptor.getValue()); + } + + // ---------- Identifier / namespace logical → physical translation ---------- + + @Test + public void getPOByFullNameConvertsSchemaIdentifierName() { + NameIdentifier ident = NameIdentifier.of(Namespace.of("ml", "cat"), "ns_a" + SEP + "ns_b"); + when(delegate.getPOByFullName(any(), any(NameIdentifier.class))).thenReturn("po"); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + wrapper.getPOByFullName(mapper, ident); + + ArgumentCaptor captor = ArgumentCaptor.forClass(NameIdentifier.class); + verify(delegate).getPOByFullName(eq(mapper), captor.capture()); + NameIdentifier converted = captor.getValue(); + assertEquals("ns_a" + PHYS + "ns_b", converted.name()); + assertEquals(Namespace.of("ml", "cat"), converted.namespace()); + } + + @Test + public void getPOByFullNameConvertsSchemaSegmentForChildIdentifier() { + NameIdentifier ident = + NameIdentifier.of(Namespace.of("ml", "cat", "ns_a" + SEP + "ns_b"), "tbl"); + when(delegate.getPOByFullName(any(), any(NameIdentifier.class))).thenReturn("po"); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + wrapper.getPOByFullName(mapper, ident); + + ArgumentCaptor captor = ArgumentCaptor.forClass(NameIdentifier.class); + verify(delegate).getPOByFullName(eq(mapper), captor.capture()); + NameIdentifier converted = captor.getValue(); + assertEquals("tbl", converted.name()); + assertEquals(Namespace.of("ml", "cat", "ns_a" + PHYS + "ns_b"), converted.namespace()); + } + + @Test + public void getPOByFullNamePassesThroughWhenNoSeparatorPresent() { + NameIdentifier ident = NameIdentifier.of(Namespace.of("ml", "cat", "schema"), "tbl"); + when(delegate.getPOByFullName(any(), any(NameIdentifier.class))).thenReturn("po"); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + wrapper.getPOByFullName(mapper, ident); + + verify(delegate).getPOByFullName(mapper, ident); + } + + @Test + public void listPOsByNSFullNameConvertsSchemaSegment() { + Namespace ns = Namespace.of("ml", "cat", "ns_a" + SEP + "ns_b"); + when(delegate.listPOsByNSFullName(any(), any(Namespace.class))) + .thenReturn(Collections.singletonList("po")); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + wrapper.listPOsByNSFullName(mapper, ns); + + ArgumentCaptor nsCaptor = ArgumentCaptor.forClass(Namespace.class); + verify(delegate).listPOsByNSFullName(eq(mapper), nsCaptor.capture()); + assertEquals(Namespace.of("ml", "cat", "ns_a" + PHYS + "ns_b"), nsCaptor.getValue()); + } + + @Test + public void listPOsByNSFullNameLeavesShortNamespaceUnchanged() { + Namespace ns = Namespace.of("ml", "cat"); + when(delegate.listPOsByNSFullName(any(), any(Namespace.class))) + .thenReturn(Collections.emptyList()); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + wrapper.listPOsByNSFullName(mapper, ns); + + verify(delegate).listPOsByNSFullName(mapper, ns); + } + + // ---------- physicalToLogicalRewriter (read path) ---------- + + @Test + public void physicalToLogicalRewriterAppliedToGetPOByParentId() { + when(delegate.getPO(any(), any(Long.class), any(String.class))).thenReturn("raw"); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, s -> s + "-rewritten", UnaryOperator.identity()); + + assertEquals("raw-rewritten", wrapper.getPO(mapper, 1L, "plain")); + } + + @Test + public void physicalToLogicalRewriterAppliedToListPOsByParentId() { + when(delegate.listPOs(any(), any(Long.class))).thenReturn(Arrays.asList("a", "b")); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, s -> s.toUpperCase(), UnaryOperator.identity()); + + assertEquals(Arrays.asList("A", "B"), wrapper.listPOs(mapper, 1L)); + } + + @Test + public void physicalToLogicalRewriterAppliedToListPOsByIds() { + when(delegate.listPOs(any(), any(List.class))).thenReturn(Arrays.asList("x", "y")); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate, s -> s + "!", UnaryOperator.identity()); + + List ids = Arrays.asList(1L, 2L); + assertEquals(Arrays.asList("x!", "y!"), wrapper.listPOs(mapper, ids)); + } + + @Test + public void physicalToLogicalRewriterIsNotInvokedOnNullResult() { + when(delegate.getPO(any(), any(Long.class), any(String.class))).thenReturn(null); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, + s -> { + throw new AssertionError("rewriter must not be invoked on null"); + }, + UnaryOperator.identity()); + + assertNull(wrapper.getPO(mapper, 1L, "x")); + } + + @Test + public void physicalToLogicalRewriterIsNotInvokedOnEmptyList() { + when(delegate.listPOs(any(), any(Long.class))).thenReturn(Collections.emptyList()); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, + s -> { + throw new AssertionError("rewriter must not be invoked on empty list"); + }, + UnaryOperator.identity()); + + assertEquals(Collections.emptyList(), wrapper.listPOs(mapper, 1L)); + } + + // ---------- logicalToPhysicalRewriter (write path) ---------- + + @Test + public void logicalToPhysicalRewriterAppliedToInsertPO() { + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, UnaryOperator.identity(), s -> s + "-written"); + + wrapper.insertPO(mapper, "logical", true); + + verify(delegate).insertPO(mapper, "logical-written", true); + } + + @Test + public void logicalToPhysicalRewriterAppliedToBatchInsertPOs() { + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate, UnaryOperator.identity(), s -> s + "-W"); + + wrapper.batchInsertPOs(mapper, Arrays.asList("a", "b"), false); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(delegate).batchInsertPOs(eq(mapper), captor.capture(), eq(false)); + assertEquals(Arrays.asList("a-W", "b-W"), captor.getValue()); + } + + @Test + public void logicalToPhysicalRewriterAppliedToBothPOsInUpdate() { + when(delegate.updatePO(any(), eq("new-W"), eq("old-W"))).thenReturn(1); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate, UnaryOperator.identity(), s -> s + "-W"); + + assertEquals(1, wrapper.updatePO(mapper, "new", "old")); + verify(delegate).updatePO(mapper, "new-W", "old-W"); + } + + @Test + public void singleArgConstructorUsesIdentityRewritersForBothDirections() { + when(delegate.getPO(any(), any(Long.class), any(String.class))).thenReturn("po"); + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + assertEquals("po", wrapper.getPO(mapper, 1L, "plain")); + wrapper.insertPO(mapper, "raw", false); + verify(delegate).insertPO(mapper, "raw", false); + + wrapper.batchInsertPOs(mapper, Arrays.asList("a", "b"), true); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(delegate).batchInsertPOs(eq(mapper), captor.capture(), eq(true)); + assertEquals(Arrays.asList("a", "b"), captor.getValue()); + } + + // ---------- Delegation ---------- + + @Test + public void supportsParentIdRelationalReadAndEntityTypeDelegated() { + when(delegate.supportsParentIdRelationalRead()).thenReturn(true); + when(delegate.entityType()).thenReturn(Entity.EntityType.SCHEMA); + + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>(delegate); + + assertTrue(wrapper.supportsParentIdRelationalRead()); + assertSame(Entity.EntityType.SCHEMA, wrapper.entityType()); + } + + @Test + public void batchInsertEmptyListIsForwarded() { + HierarchicalConventionPOStorageOps wrapper = + new HierarchicalConventionPOStorageOps<>( + delegate, + UnaryOperator.identity(), + s -> { + throw new AssertionError("rewriter must not be invoked on empty list"); + }); + + wrapper.batchInsertPOs(mapper, new ArrayList<>(), false); + + verify(delegate).batchInsertPOs(eq(mapper), eq(Collections.emptyList()), eq(false)); + } +} diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java new file mode 100644 index 00000000000..3b57ded40a0 --- /dev/null +++ b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; + +import java.util.Collections; +import java.util.List; +import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +public class TestPOStorageReadRouting { + + private static final Object MAPPER = new Object(); + + @Test + public void getPOUsesFullNameWhenCacheDisabled() { + RecordingOps ops = new RecordingOps(true); + NameIdentifier id = NameIdentifier.of("ml", "cat", "schema"); + String result = POStorageReadRouting.getPO(MAPPER, id, ops, Entity.EntityType.SCHEMA, false); + assertEquals("by-full", result); + assertFalse(ops.getPOByParentCalled); + assertTrue(ops.getPOByFullNameCalled); + } + + @Test + public void getPOUsesFullNameWhenCacheEnabledButParentPathUnsupported() { + RecordingOps ops = new RecordingOps(false); + String result = + POStorageReadRouting.getPO( + MAPPER, NameIdentifier.of("ml", "cat", "schema"), ops, Entity.EntityType.SCHEMA, true); + assertEquals("by-full", result); + assertFalse(ops.getPOByParentCalled); + assertTrue(ops.getPOByFullNameCalled); + } + + @Test + public void getPOUsesParentIdWhenCacheEnabledAndSupported() { + try (MockedStatic entityIds = mockStatic(EntityIdService.class)) { + entityIds + .when(() -> EntityIdService.getEntityId(any(), eq(Entity.EntityType.CATALOG))) + .thenReturn(42L); + RecordingOps ops = new RecordingOps(true); + NameIdentifier id = NameIdentifier.of("ml", "cat", "schema"); + String result = POStorageReadRouting.getPO(MAPPER, id, ops, Entity.EntityType.SCHEMA, true); + assertEquals("by-parent", result); + assertTrue(ops.getPOByParentCalled); + assertFalse(ops.getPOByFullNameCalled); + assertEquals(Long.valueOf(42), ops.seenParentId); + assertEquals("schema", ops.seenShortName); + } + } + + @Test + public void listPOsUsesFullNamespaceWhenCacheDisabled() { + RecordingOps ops = new RecordingOps(true); + Namespace ns = Namespace.of("ml", "cat", "sch"); + List result = + POStorageReadRouting.listPOs(MAPPER, ns, ops, Entity.EntityType.TABLE, false); + assertTrue(result.isEmpty()); + assertFalse(ops.listByParentCalled); + assertTrue(ops.listByFullNsCalled); + } + + @Test + public void listPOsUsesParentIdWhenCacheEnabledAndSupported() { + try (MockedStatic entityIds = mockStatic(EntityIdService.class)) { + entityIds + .when(() -> EntityIdService.getEntityId(any(), eq(Entity.EntityType.SCHEMA))) + .thenReturn(7L); + RecordingOps ops = new RecordingOps(true); + Namespace ns = Namespace.of("ml", "cat", "sch"); + List out = + POStorageReadRouting.listPOs(MAPPER, ns, ops, Entity.EntityType.TABLE, true); + assertEquals(Collections.singletonList("row"), out); + assertTrue(ops.listByParentCalled); + assertFalse(ops.listByFullNsCalled); + assertEquals(Long.valueOf(7), ops.seenListParentId); + } + } + + private static final class RecordingOps extends BasePOStorageOps { + private final boolean supportsParent; + boolean getPOByParentCalled; + boolean getPOByFullNameCalled; + boolean listByParentCalled; + boolean listByFullNsCalled; + Long seenParentId; + String seenShortName; + Long seenListParentId; + + RecordingOps(boolean supportsParent) { + this.supportsParent = supportsParent; + } + + @Override + public boolean supportsParentIdRelationalRead() { + return supportsParent; + } + + @Override + public String getPO(Object mapper, Long parentId, String name) { + getPOByParentCalled = true; + seenParentId = parentId; + seenShortName = name; + return "by-parent"; + } + + @Override + public String getPOByFullName(Object mapper, NameIdentifier identifier) { + getPOByFullNameCalled = true; + return "by-full"; + } + + @Override + public List listPOs(Object mapper, Long parentId) { + listByParentCalled = true; + seenListParentId = parentId; + return Collections.singletonList("row"); + } + + @Override + public List listPOsByNSFullName(Object mapper, Namespace namespace) { + listByFullNsCalled = true; + return Collections.emptyList(); + } + + @Override + protected Entity.EntityType entityType() { + return Entity.EntityType.SCHEMA; + } + } +} diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java index 2c19502caa6..599c32886e3 100644 --- a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java +++ b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java @@ -24,11 +24,9 @@ import java.io.IOException; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.gravitino.Entity; import org.apache.gravitino.EntityAlreadyExistsException; @@ -38,7 +36,6 @@ import org.apache.gravitino.meta.TopicEntity; import org.apache.gravitino.storage.RandomIdGenerator; import org.apache.gravitino.storage.relational.TestJDBCBackend; -import org.apache.gravitino.utils.HierarchicalSchemaUtil; import org.apache.gravitino.utils.NameIdentifierUtil; import org.apache.gravitino.utils.NamespaceUtil; import org.junit.jupiter.api.Assertions; @@ -220,12 +217,10 @@ public void testInsertHierarchicalSchemaCreatesAncestorsAndLeaf() throws IOExcep SchemaMetaService schemaMetaService = SchemaMetaService.getInstance(); String logicalLeaf = "ns_a:ns_b:leaf"; - String sep = HierarchicalSchemaUtil.schemaSeparator(); - String physicalLeaf = HierarchicalSchemaUtil.logicalToPhysical(logicalLeaf, sep); SchemaEntity hierarchical = SchemaEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) - .withName(physicalLeaf) + .withName(logicalLeaf) .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)) .withComment("nested") .withProperties(Collections.emptyMap()) @@ -235,17 +230,7 @@ public void testInsertHierarchicalSchemaCreatesAncestorsAndLeaf() throws IOExcep List schemas = schemaMetaService.listSchemasByNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)); - Set logicalNames = - schemas.stream() - .map(SchemaEntity::name) - .map( - n -> { - if (n != null && n.contains(HierarchicalSchemaUtil.physicalSeparator())) { - return HierarchicalSchemaUtil.physicalToLogical(n, sep); - } - return n; - }) - .collect(Collectors.toSet()); + Set logicalNames = schemas.stream().map(SchemaEntity::name).collect(Collectors.toSet()); Assertions.assertTrue(logicalNames.contains("ns_a")); Assertions.assertTrue(logicalNames.contains("ns_a:ns_b")); @@ -253,8 +238,8 @@ public void testInsertHierarchicalSchemaCreatesAncestorsAndLeaf() throws IOExcep SchemaEntity loaded = schemaMetaService.getSchemaByIdentifier( - NameIdentifier.of(metalakeName, catalogName, physicalLeaf)); - Assertions.assertEquals(physicalLeaf, loaded.name()); + NameIdentifier.of(metalakeName, catalogName, logicalLeaf)); + Assertions.assertEquals(logicalLeaf, loaded.name()); Assertions.assertEquals("nested", loaded.comment()); } @@ -264,18 +249,15 @@ public void testInsertHierarchicalSecondLeafReusesAncestorsWithoutUpsert() throw createAndInsertCatalog(metalakeName, catalogName); SchemaMetaService schemaMetaService = SchemaMetaService.getInstance(); - String sep = HierarchicalSchemaUtil.schemaSeparator(); - String physSep = HierarchicalSchemaUtil.physicalSeparator(); - String physicalLeaf1 = HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf1", sep); - String physicalLeaf2 = HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf2", sep); - String[] parts = physicalLeaf1.split(Pattern.quote(physSep), -1); - String ancestorA = parts[0]; - String ancestorAB = String.join(physSep, Arrays.copyOfRange(parts, 0, 2)); + String leaf1 = "ns_a:ns_b:leaf1"; + String leaf2 = "ns_a:ns_b:leaf2"; + String ancestorA = "ns_a"; + String ancestorAB = "ns_a:ns_b"; SchemaEntity first = SchemaEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) - .withName(physicalLeaf1) + .withName(leaf1) .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)) .withComment("first") .withProperties(Collections.emptyMap()) @@ -295,7 +277,7 @@ public void testInsertHierarchicalSecondLeafReusesAncestorsWithoutUpsert() throw SchemaEntity second = SchemaEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) - .withName(physicalLeaf2) + .withName(leaf2) .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName)) .withComment("second") .withProperties(Collections.emptyMap())