Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2b11f1f
feat(core): JDBC backend and entity store for nested namespace naming
roryqi May 11, 2026
61dcc02
fix(core): address code review issues in nested namespace JDBC backend
roryqi May 13, 2026
5e175bb
refactor(core): simplify RelationalSchemaNamingBridge and its tests
roryqi May 13, 2026
cb21732
fix(core): complete cache write-through convention in RelationalEntit…
roryqi May 13, 2026
caf3b96
Revert "fix(core): complete cache write-through convention in Relatio…
roryqi May 13, 2026
d87ba6a
refactor(core): replace convertMetadataObjectDottedFullName with Name…
roryqi May 13, 2026
4ff4163
fix(core): guard genericEntityMetadataFullNameForApi against non-Meta…
roryqi May 13, 2026
3c4f7db
refactor
roryqi May 13, 2026
6d23138
revert unused changes
roryqi May 13, 2026
cbbdb8a
address id resolver
roryqi May 13, 2026
6fa7da4
revert
roryqi May 13, 2026
42d4809
refactor
roryqi May 13, 2026
3d3f4dd
remove convention
roryqi May 13, 2026
993e482
revert change
roryqi May 13, 2026
ae98cf3
revert
roryqi May 13, 2026
7cfe982
fix
roryqi May 13, 2026
26319b7
cache
roryqi May 13, 2026
dbcf096
cache
roryqi May 13, 2026
29cb2ac
refactor
roryqi May 14, 2026
7b0c466
fix
roryqi May 14, 2026
74a0a7d
refactor
roryqi May 14, 2026
e9134fd
refactor
roryqi May 14, 2026
8cf90f7
refactor
roryqi May 14, 2026
bac3a3c
refactor
roryqi May 14, 2026
060815a
refactor
roryqi May 14, 2026
b2c78e5
refactor
roryqi May 14, 2026
f8690dc
refactor
roryqi May 14, 2026
16d9a56
refactor
roryqi May 14, 2026
df927bb
refactor
roryqi May 14, 2026
a960062
test: add unit tests for HierarchicalConventionPOStorageOps
roryqi May 14, 2026
3d652c8
fix
roryqi May 14, 2026
e72918b
fix
roryqi May 14, 2026
6148465
fix ut
roryqi May 14, 2026
f31fe4a
fix ut
roryqi May 14, 2026
60a12ec
revert
roryqi May 14, 2026
617ba19
revert
roryqi May 14, 2026
525f2c6
fix ut
roryqi May 14, 2026
722859c
distinguish missing catalog/schema in join-based lookups
roryqi May 15, 2026
9809cb6
apply join-aware missing-entity checks to view and function
roryqi May 15, 2026
09be3a6
revert function get-by-full-name sql and storage-ops changes
roryqi May 15, 2026
4e40b48
Address the comments
roryqi May 15, 2026
9be45fc
rename
roryqi May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public static void setUp() throws IllegalAccessException {
MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
Mockito.mockStatic(CatalogMetaService.class);
MockedStatic<SchemaMetaService> schemaMetaServiceMockedStatic =
Mockito.mockStatic(SchemaMetaService.class);
Mockito.mockStatic(SchemaMetaService.class, Mockito.CALLS_REAL_METHODS);

metalakeMetaServiceMockedStatic
.when(MetalakeMetaService::getInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,34 @@ void insertRelation(
boolean override)
throws IOException;

/**
* Batch inserts the same relation from many source entities to one destination. Parameter order
* matches {@link #insertRelation}. All sources share one {@code srcType}. Semantics match
* repeated {@link #insertRelation} with the same {@code relType}; a relational backend may use
* fewer round-trips for some relation types (e.g. {@link Type#OWNER_REL}).
*
* @param relType the relation type (e.g. {@link Type#OWNER_REL})
* @param srcIdentifiers identifiers of the source side for each relation row
* @param srcType entity type shared by every identifier in {@code srcIdentifiers}
* @param dstIdentifier destination entity identifier (shared by all rows)
* @param dstType destination entity type
* @param override if true, replace existing relations of each source entity first, per {@link
* #insertRelation}
* @throws IOException if the storage operation fails
*/
default void batchInsertRelations(
Type relType,
List<NameIdentifier> srcIdentifiers,
Entity.EntityType srcType,
NameIdentifier dstIdentifier,
Entity.EntityType dstType,
boolean override)
throws IOException {
for (NameIdentifier srcIdent : srcIdentifiers) {
insertRelation(relType, srcIdent, srcType, dstIdentifier, dstType, override);
}
}

/**
* Updates the relations for a given entity by adding a set of new relations and removing another
* set of relations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,29 @@ public void insertRelation(
}
}

@Override
public void batchInsertRelations(
Type relType,
List<NameIdentifier> srcIdentifiers,
Entity.EntityType srcType,
NameIdentifier dstIdentifier,
Entity.EntityType dstType,
boolean override)
throws IOException {
if (srcIdentifiers == null || srcIdentifiers.isEmpty()) {
return;
}
switch (relType) {
case OWNER_REL:
OwnerMetaService.getInstance()
.batchSetOwners(srcIdentifiers, srcType, dstIdentifier, dstType);
break;
default:
throw new IllegalArgumentException(
String.format("Doesn't support batch insert for the relation type %s", relType));
}
}

@Override
public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
Type relType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
/**
* Relation store to store entities. This means we can store entities in a relational store. I.e.,
* MySQL, PostgreSQL, etc. If you want to use a different backend, you can implement the {@link
* RelationalBackend} interface
* RelationalBackend} interface. The default JDBC backend is {@link JDBCBackend}.
*/
public class RelationalEntityStore implements EntityStore, SupportsRelationOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalEntityStore.class);
Expand Down Expand Up @@ -327,6 +327,26 @@ public void insertRelation(
cache.invalidate(dstIdentifier, dstType, relType);
}

@Override
public void batchInsertRelations(
Type relType,
List<NameIdentifier> srcIdentifiers,
Entity.EntityType srcType,
NameIdentifier dstIdentifier,
Entity.EntityType dstType,
boolean override)
throws IOException {
if (srcIdentifiers == null || srcIdentifiers.isEmpty()) {
return;
}
backend.batchInsertRelations(
relType, srcIdentifiers, srcType, dstIdentifier, dstType, override);
for (NameIdentifier ident : srcIdentifiers) {
cache.invalidate(ident, srcType, relType);
}
cache.invalidate(dstIdentifier, dstType, relType);
Comment on lines +344 to +347
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more proper to invalidate the cache first and then operate the database?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take a look. Other relation ops follows this pattern, too. Do we need to modify them together?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diqiu50
Please take a look, and I recall that you have modified this point.

}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will set owners for multiple metadata objects. So I need to insert multiple relations. Although we already have the update multiple relations, it can't cover the cases when we need to insert multiple records.


@Override
public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
Type relType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.storage.relational.service;

import java.util.List;
import org.apache.gravitino.Entity;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;

public abstract class BasePOStorageOps<PO, Mapper> {
public void insertPO(Mapper mapper, PO po, boolean overwrite) {
throw new UnsupportedOperationException(
"insertPO is not supported by " + getClass().getSimpleName());
}

public void batchInsertPOs(Mapper mapper, List<PO> pos, boolean overwrite) {
throw new UnsupportedOperationException(
"batchInsertPOs is not supported by " + getClass().getSimpleName());
}

public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) {
throw new UnsupportedOperationException(
"updatePO is not supported by " + getClass().getSimpleName());
}

/**
* When {@code true} and the entity-id cache is enabled, callers may resolve rows by parent entity
* id plus short name via {@link #getPO} and {@link #listPOs}; see {@link POStorageReadRouting}.
*/
public boolean supportsParentIdRelationalRead() {
return false;
}

public PO getPO(Mapper mapper, Long parentId, String name) {
throw new UnsupportedOperationException(
"getPO by parent id is not supported by " + getClass().getSimpleName());
}

public List<PO> listPOs(Mapper mapper, Long parentId) {
throw new UnsupportedOperationException(
"listPOs by parent id is not supported by " + getClass().getSimpleName());
}

public List<PO> listPOs(Mapper mapper, Namespace namespace, List<String> names) {
throw new UnsupportedOperationException(
"listPOs by namespace and names is not supported by " + getClass().getSimpleName());
}

public List<PO> listPOs(Mapper mapper, List<Long> entityIds) {
throw new UnsupportedOperationException(
"listPOs by entityIds is not supported by " + getClass().getSimpleName());
}

public PO getPOByFullName(Mapper mapper, NameIdentifier identifier) {
throw new UnsupportedOperationException(
"getPOByFullName is not supported by " + getClass().getSimpleName());
}

public List<PO> listPOsByNSFullName(Mapper mapper, Namespace namespace) {
throw new UnsupportedOperationException(
"listPOsByNSFullName is not supported by " + getClass().getSimpleName());
}

protected abstract Entity.EntityType entityType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
Expand All @@ -56,14 +55,17 @@
import org.slf4j.LoggerFactory;

public class FunctionMetaService {
private static final Logger LOG = LoggerFactory.getLogger(FunctionMetaService.class);
private static final FunctionMetaService INSTANCE = new FunctionMetaService();
private BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops;

public static FunctionMetaService getInstance() {
return INSTANCE;
}

private static final Logger LOG = LoggerFactory.getLogger(FunctionMetaService.class);
private static final FunctionMetaService INSTANCE = new FunctionMetaService();

private FunctionMetaService() {}
private FunctionMetaService() {
this.ops = new HierarchicalConventionPOStorageOps<>(new FunctionPOStorageOps());
}

@Monitored(
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
Expand All @@ -87,18 +89,17 @@ public FunctionEntity getFunctionByIdentifier(NameIdentifier ident) {
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getFunctionIdBySchemaIdAndFunctionName")
public Long getFunctionIdBySchemaIdAndFunctionName(Long schemaId, String functionName) {
Long functionId =
FunctionPO functionPO =
SessionUtils.getWithoutCommit(
FunctionMetaMapper.class,
mapper -> mapper.selectFunctionIdBySchemaIdAndFunctionName(schemaId, functionName));
FunctionMetaMapper.class, mapper -> ops.getPO(mapper, schemaId, functionName));

if (functionId == null) {
if (functionPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
functionName);
}
return functionId;
return functionPO.functionId();
}

@Monitored(
Expand All @@ -115,14 +116,7 @@ public void insertFunction(FunctionEntity functionEntity, boolean overwrite) thr
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
FunctionMetaMapper.class,
mapper -> {
if (overwrite) {
mapper.insertFunctionMetaOnDuplicateKeyUpdate(po);
} else {
mapper.insertFunctionMeta(po);
}
}),
FunctionMetaMapper.class, mapper -> ops.insertPO(mapper, po, overwrite)),
() ->
SessionUtils.doWithoutCommit(
FunctionVersionMetaMapper.class,
Expand Down Expand Up @@ -231,8 +225,18 @@ public int deleteFunctionVersionsByRetentionCount(Long versionRetentionCount, in
baseMetricName = "getFunctionPOByIdentifier")
FunctionPO getFunctionPOByIdentifier(NameIdentifier ident) {
NameIdentifierUtil.checkFunction(ident);
FunctionPO functionPO =
SessionUtils.getWithoutCommit(
FunctionMetaMapper.class,
mapper -> POStorageReadRouting.getPO(mapper, ident, ops, Entity.EntityType.FUNCTION));

return functionPOFetcher().apply(ident);
if (functionPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
ident.name());
}
return functionPO;
}

@Monitored(
Expand Down Expand Up @@ -260,7 +264,7 @@ public <E extends Entity & HasIdentifier> FunctionEntity updateFunction(
() ->
SessionUtils.doWithoutCommit(
FunctionMetaMapper.class,
mapper -> mapper.updateFunctionMeta(newFunctionPO, oldFunctionPO)));
mapper -> ops.updatePO(mapper, newFunctionPO, oldFunctionPO)));

return newEntity;
} catch (RuntimeException re) {
Expand All @@ -270,89 +274,10 @@ public <E extends Entity & HasIdentifier> FunctionEntity updateFunction(
}
}

private Function<NameIdentifier, FunctionPO> functionPOFetcher() {
return GravitinoEnv.getInstance().cacheEnabled()
? this::getFunctionPOBySchemaId
: this::getFunctionPOByFullQualifiedName;
}

private FunctionPO getFunctionPOBySchemaId(NameIdentifier ident) {
Long schemaId =
EntityIdService.getEntityId(
NameIdentifier.of(ident.namespace().levels()), Entity.EntityType.SCHEMA);

FunctionPO functionPO =
SessionUtils.getWithoutCommit(
FunctionMetaMapper.class,
mapper -> mapper.selectFunctionMetaBySchemaIdAndName(schemaId, ident.name()));

if (functionPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
ident.toString());
}
return functionPO;
}

private FunctionPO getFunctionPOByFullQualifiedName(NameIdentifier ident) {
String[] namespaceLevels = ident.namespace().levels();
FunctionPO functionPO =
SessionUtils.getWithoutCommit(
FunctionMetaMapper.class,
mapper ->
mapper.selectFunctionMetaByFullQualifiedName(
namespaceLevels[0], namespaceLevels[1], namespaceLevels[2], ident.name()));

if (functionPO == null || functionPO.functionId() == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
ident.name());
}

if (functionPO.schemaId() == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
namespaceLevels[2]);
}
return functionPO;
}

private List<FunctionPO> listFunctionPOs(Namespace namespace) {
return functionListFetcher().apply(namespace);
}

private List<FunctionPO> listFunctionPOsBySchemaId(Namespace namespace) {
Long schemaId =
EntityIdService.getEntityId(
NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
return SessionUtils.getWithoutCommit(
FunctionMetaMapper.class, mapper -> mapper.listFunctionPOsBySchemaId(schemaId));
}

private Function<Namespace, List<FunctionPO>> functionListFetcher() {
return GravitinoEnv.getInstance().cacheEnabled()
? this::listFunctionPOsBySchemaId
: this::listFunctionPOsByFullQualifiedName;
}

private List<FunctionPO> listFunctionPOsByFullQualifiedName(Namespace namespace) {
String[] namespaceLevels = namespace.levels();
List<FunctionPO> functionPOs =
SessionUtils.getWithoutCommit(
FunctionMetaMapper.class,
mapper ->
mapper.listFunctionPOsByFullQualifiedName(
namespaceLevels[0], namespaceLevels[1], namespaceLevels[2]));
if (functionPOs.isEmpty() || functionPOs.get(0).schemaId() == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
namespaceLevels[2]);
}
return functionPOs.stream().filter(po -> po.functionId() != null).collect(Collectors.toList());
FunctionMetaMapper.class,
mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops, Entity.EntityType.FUNCTION));
}

private void fillFunctionPOBuilderParentEntityId(
Expand Down
Loading
Loading