Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ protected BackendColumnIterator getById(RocksDBSessions.Session session, Id id)
return BackendColumnIterator.iterator(col);
}

protected BackendColumnIterator getByIds(RocksDBSessions.Session session, Set<Id> ids) {
protected BackendColumnIterator getByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Set<Id>Collection<Id> 语义变化需注意

原方法接收 Set<Id>(天然去重),改为 Collection<Id> 后,传入 List 时若含重复 ID,RocksDB multiGet 会对同一 key 重复查询并返回重复结果。

测试 testVertexQueryByIdsWithDuplicateIds 验证了这个行为(id1 返回 2 次),但这与原 Set 语义不一致。需要确认上层 IdQuery 的 ids 是否可能含重复——如果含重复,行为变更可能导致上层重复处理数据。

建议:在 getByIds 入口去重以保持原语义,或者明确文档说明 Collection 含重复返回的行为变更。

if (ids.size() == 1) {
return this.getById(session, ids.iterator().next());
}
Expand Down Expand Up @@ -309,7 +310,7 @@ protected static BackendEntryIterator newEntryIterator(BackendColumnIterator col
}

protected static BackendEntryIterator newEntryIteratorOlap(
BackendColumnIterator cols, Query query, boolean isOlap) {
BackendColumnIterator cols, Query query, boolean isOlap) {
return new BinaryEntryIterator<>(cols, query, (entry, col) -> {
if (entry == null || !entry.belongToMe(col)) {
HugeType type = query.resultType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ protected BackendColumnIterator queryById(RocksDBSessions.Session session, Id id
@Override
protected BackendColumnIterator queryByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 PR 描述与实际改动不完全匹配

PR 描述提到 "improves the performance of Gremlin queries... when using RPC-based backends such as HBase and HStore",但代码改动全在 hugegraph-rocksdb 模块。建议更新 PR 描述,准确反映本次优化的范围是 RocksDB 后端。

// TODO: use getByIds() after batch version multi-get is ready
if (!session.hasChanges()) {
return this.getByIds(session, ids);
Copy link
Copy Markdown
Member

@imbajin imbajin Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This RocksDB-specific optimization looks reasonable to me as a focused improvement on its own. One thing that may be worth clarifying is how HStore should be handled, since it is the main distributed backend we maintain and its batch-query path is a separate concern from RocksDB.

If you think it makes sense, we could either cover HStore in this PR as well, or keep this PR scoped to RocksDB and follow up with a separate PR for HStore so the behavior and performance trade-offs can be reviewed independently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let's keep this PR focused on RocksDB for now, and I'll handle HStore in a separate follow-up PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ fallback 路径的实际效果需确认

hasChanges() == true 时,fallback 到 super.queryByIds() 会对每个 ID 调用 this.queryById()。而 Vertex/Edge 已经 override queryByIdgetById(point get),所以 fallback 路径实际也会调用 getById

RocksDBStdSessionsget() 方法有 assert !this.hasChanges() 断言——也就是说 fallback 路径在开启断言的环境下依然会触发 assert 失败。

需要确认:hasChanges() == true 时是否真的能安全走 fallback?还是说这个分支在 RocksDB 后端实际上永远不会被触发?

}
return super.queryByIds(session, ids);
}
}
Expand All @@ -208,6 +210,15 @@ public static Edge in(String database) {
protected BackendColumnIterator queryById(RocksDBSessions.Session session, Id id) {
return this.getById(session, id);
}

@Override
protected BackendColumnIterator queryByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
if (!session.hasChanges()) {
return this.getByIds(session, ids);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Since this adds a new multi-get path for vertex/edge id queries, it would be great to add a small RocksDB regression test here. The core cases that seem worth covering are:

  1. batch query with multiple existing ids
  2. batch query with existing + missing ids mixed together
  3. duplicate ids in the input
  4. fallback to the old path when session.hasChanges() is true

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing the fallback path (session.hasChanges() = true) is not feasible in unit tests, as RocksDBStdSessions asserts !this.hasChanges() on all read operations (get/scan) when assertions are enabled.

}
return super.queryByIds(session, ids);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Vertex 和 Edge 的 queryByIds 逻辑完全重复

Vertex.queryByIds()Edge.queryByIds() 代码完全一致(5 行相同逻辑)。可以考虑:

  1. 提取到父类 RocksDBTable.queryByIds() 中统一做 hasChanges() 判断
  2. 或者提供一个公共的 helper 方法

这样可以减少重复,也降低后续维护时两处不一致的风险。


public static class IndexTable extends RocksDBTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hugegraph.unit.id.SplicingIdGeneratorTest;
import org.apache.hugegraph.unit.mysql.MysqlUtilTest;
import org.apache.hugegraph.unit.mysql.WhereBuilderTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBTableQueryByIdsTest;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 import 未按字母序排列

RocksDBTableQueryByIdsTest 的 import 插在 RocksDBCountersTest 之前,不符合字母顺序。建议调整为:

Suggested change
import org.apache.hugegraph.unit.rocksdb.RocksDBTableQueryByIdsTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBCountersTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBSessionTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBSessionsTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBTableQueryByIdsTest;

import org.apache.hugegraph.unit.rocksdb.RocksDBCountersTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBSessionTest;
import org.apache.hugegraph.unit.rocksdb.RocksDBSessionsTest;
Expand Down Expand Up @@ -141,6 +142,7 @@
RocksDBSessionsTest.class,
RocksDBSessionTest.class,
RocksDBCountersTest.class,
RocksDBTableQueryByIdsTest.class,

/* utils */
VersionTest.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.unit.rocksdb;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBSessions;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBTables;
import org.apache.hugegraph.testutil.Assert;
import org.junit.Before;
import org.junit.Test;
import org.rocksdb.RocksDBException;

public class RocksDBTableQueryByIdsTest extends BaseRocksDBUnitTest {

private static final String DATABASE = "db";

private TestVertexTable vertexTable;
private TestEdgeTable edgeTable;

@Override
@Before
public void setup() throws RocksDBException {
super.setup();
this.vertexTable = new TestVertexTable(DATABASE);
this.edgeTable = new TestEdgeTable(DATABASE);
this.rocks.createTable(this.vertexTable.table());
this.rocks.createTable(this.edgeTable.table());
}

@Test
public void testVertexQueryByIdsWithAllExistingIds() {
Id id1 = IdGenerator.of("v1");
Id id2 = IdGenerator.of("v2");
Id id3 = IdGenerator.of("v3");

this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1"));
this.rocks.session().put(this.vertexTable.table(), id2.asBytes(), getBytes("value2"));
this.rocks.session().put(this.vertexTable.table(), id3.asBytes(), getBytes("value3"));
this.commit();

List<Id> ids = Arrays.asList(id1, id2, id3);
BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids);

Map<String, String> results = toResultMap(iter);

Assert.assertEquals(3, results.size());
Assert.assertEquals("value1", results.get("v1"));
Assert.assertEquals("value2", results.get("v2"));
Assert.assertEquals("value3", results.get("v3"));
}

@Test
public void testVertexQueryByIdsWithExistingAndMissingIdsMixed() {
Id id1 = IdGenerator.of("v1");
Id id2 = IdGenerator.of("v2");
Id id3 = IdGenerator.of("v3");

this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1"));
this.rocks.session().put(this.vertexTable.table(), id3.asBytes(), getBytes("value3"));
this.commit();

List<Id> ids = Arrays.asList(id1, id2, id3);
BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids);

Map<String, String> results = toResultMap(iter);

Assert.assertEquals(2, results.size());
Assert.assertEquals("value1", results.get("v1"));
Assert.assertEquals("value3", results.get("v3"));
Assert.assertFalse(results.containsKey("v2"));
}

@Test
public void testVertexQueryByIdsWithDuplicateIds() {
Id id1 = IdGenerator.of("v1");
Id id2 = IdGenerator.of("v2");

this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1"));
this.rocks.session().put(this.vertexTable.table(), id2.asBytes(), getBytes("value2"));
this.commit();

List<Id> ids = Arrays.asList(id1, id2, id1);
BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids);

Map<String, Integer> countMap = new HashMap<>();
Map<String, String> results = new HashMap<>();
while (iter.hasNext()) {
BackendColumn col = iter.next();
String key = getString(col.name);
results.put(key, getString(col.value));
countMap.put(key, countMap.getOrDefault(key, 0) + 1);
}

Assert.assertEquals(2, results.size());
Assert.assertEquals("value1", results.get("v1"));
Assert.assertEquals("value2", results.get("v2"));
// Verify duplicate ids produce duplicate results
Assert.assertEquals(Integer.valueOf(2), countMap.get("v1"));
Assert.assertEquals(Integer.valueOf(1), countMap.get("v2"));
}

@Test
public void testEdgeQueryByIdsWithAllExistingIds() {
Id id1 = IdGenerator.of("e1");
Id id2 = IdGenerator.of("e2");

this.rocks.session().put(this.edgeTable.table(), id1.asBytes(), getBytes("edge-value1"));
this.rocks.session().put(this.edgeTable.table(), id2.asBytes(), getBytes("edge-value2"));
this.commit();

List<Id> ids = Arrays.asList(id1, id2);
BackendColumnIterator iter = this.edgeTable.queryByIds(this.rocks.session(), ids);

Map<String, String> results = toResultMap(iter);

Assert.assertEquals(2, results.size());
Assert.assertEquals("edge-value1", results.get("e1"));
Assert.assertEquals("edge-value2", results.get("e2"));
}

/**
* NOTE: Testing the fallback path (session.hasChanges() == true) is not
* feasible here because both the optimized multi-get path and the fallback
* scan-based path ultimately delegate to session.get() / session.scan(),
* which have a pre-existing assertion `assert !this.hasChanges()` in
* RocksDBStdSessions. This assertion is disabled in production but fires
* during unit tests when assertions are enabled. The dispatch logic itself
* is covered by the implementation in RocksDBTables.Vertex/Edge.queryByIds().
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ fallback 路径(hasChanges() == true)缺乏测试覆盖

当 session 有未提交变更时走 super.queryByIds()(scan 路径),但测试中因 RocksDBStdSessionsassert !this.hasChanges() 断言无法覆盖此场景。

核心分支逻辑(if/else)只测了一半。建议考虑用 mock session 或在集成测试中补充覆盖 hasChanges() == true 的场景。

*/

private Map<String, String> toResultMap(BackendColumnIterator iter) {
Map<String, String> results = new HashMap<>();
while (iter.hasNext()) {
BackendColumn col = iter.next();
results.put(getString(col.name), getString(col.value));
}
return results;
}

/**
* Subclass that exposes the protected queryByIds for testing.
*/
private static class TestVertexTable extends RocksDBTables.Vertex {

public TestVertexTable(String database) {
super(database);
}

@Override
public BackendColumnIterator queryByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
return super.queryByIds(session, ids);
}
}

/**
* Subclass that exposes the protected queryByIds for testing.
*/
private static class TestEdgeTable extends RocksDBTables.Edge {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 只测了 out edge 方向

TestEdgeTable 固定传 true(out edge),in edge 方向未覆盖。虽然逻辑相同,但建议至少补一个 in 方向的 case 或在注释中说明为什么只测 out。


public TestEdgeTable(String database) {
super(true, database);
}

@Override
public BackendColumnIterator queryByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
return super.queryByIds(session, ids);
}
}
}
Loading