Skip to content

Commit 0752ddf

Browse files
lizhiminsclaude
andauthored
[ISSUE #10270] Make Pop RocksDB BlockCache size configurable via MessageStoreConfig (#10271)
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent f04eafc commit 0752ddf

5 files changed

Lines changed: 38 additions & 9 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P
4646
private WriteOptions writeOptions;
4747
private WriteOptions deleteOptions;
4848
protected ColumnFamilyHandle columnFamilyHandle;
49+
private final long blockCacheSize;
50+
private final long writeBufferSize;
4951

50-
public PopConsumerRocksdbStore(String filePath) {
52+
public PopConsumerRocksdbStore(String filePath, long blockCacheSize, long writeBufferSize) {
5153
super(filePath);
54+
this.blockCacheSize = blockCacheSize;
55+
this.writeBufferSize = writeBufferSize;
5256
}
5357

5458
// https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
@@ -83,8 +87,8 @@ protected boolean postLoad() {
8387
initOptions();
8488

8589
// init column family here
86-
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions();
87-
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions();
90+
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize, writeBufferSize);
91+
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize, writeBufferSize);
8892
this.cfOptions.add(defaultOptions);
8993
this.cfOptions.add(popStateOptions);
9094

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ public PopConsumerService(BrokerController brokerController) {
9898
this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
9999
this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
100100
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
101-
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString());
101+
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString(),
102+
brokerController.getMessageStoreConfig().getPopRocksdbBlockCacheSize(),
103+
brokerController.getMessageStoreConfig().getPopRocksdbWriteBufferSize());
102104
this.popConsumerCache = brokerConfig.isEnablePopBufferMerge() ? new PopConsumerCache(
103105
brokerController, this.popConsumerStore, this.consumerLockService, this::revive) : null;
104106

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.stream.Collectors;
2929
import java.util.stream.IntStream;
3030
import org.apache.commons.io.FileUtils;
31+
import org.rocksdb.util.SizeUnit;
3132
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
3233
import org.apache.rocketmq.common.constant.LoggerName;
3334
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
@@ -65,7 +66,8 @@ public static PopConsumerRecord getConsumerRecord() {
6566
@Test
6667
public void rocksdbStoreWriteDeleteTest() {
6768
String filePath = getRandomStorePath();
68-
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath);
69+
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(
70+
filePath, 256 * SizeUnit.MB, 32 * SizeUnit.MB);
6971
Assert.assertEquals(filePath, consumerStore.getFilePath());
7072

7173
consumerStore.start();
@@ -127,7 +129,8 @@ private long getDirectorySizeRecursive(File directory) {
127129
@Ignore
128130
@SuppressWarnings("ConstantValue")
129131
public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException {
130-
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath());
132+
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(
133+
getRandomStorePath(), 256 * SizeUnit.MB, 32 * SizeUnit.MB);
131134
rocksdbStore.start();
132135

133136
int iterCount = 1000 * 1000;

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,10 @@ public class MessageStoreConfig {
505505

506506
private String rocksdbCompressionType = CompressionType.LZ4_COMPRESSION.getLibraryName();
507507

508+
private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB;
509+
510+
private long popRocksdbWriteBufferSize = 32 * SizeUnit.MB;
511+
508512
/**
509513
* Flush RocksDB WAL frequency, aka, flush WAL every N write ops.
510514
*/
@@ -531,6 +535,22 @@ public void setRocksdbCompressionType(String compressionType) {
531535
this.rocksdbCompressionType = compressionType;
532536
}
533537

538+
public long getPopRocksdbBlockCacheSize() {
539+
return popRocksdbBlockCacheSize;
540+
}
541+
542+
public void setPopRocksdbBlockCacheSize(long popRocksdbBlockCacheSize) {
543+
this.popRocksdbBlockCacheSize = popRocksdbBlockCacheSize;
544+
}
545+
546+
public long getPopRocksdbWriteBufferSize() {
547+
return popRocksdbWriteBufferSize;
548+
}
549+
550+
public void setPopRocksdbWriteBufferSize(long popRocksdbWriteBufferSize) {
551+
this.popRocksdbWriteBufferSize = popRocksdbWriteBufferSize;
552+
}
553+
534554
/**
535555
* Spin number in the retreat strategy of spin lock
536556
* Default is 1000

store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static ColumnFamilyOptions createOffsetCFOptions() {
132132
setInplaceUpdateSupport(true);
133133
}
134134

135-
public static ColumnFamilyOptions createPopCFOptions() {
135+
public static ColumnFamilyOptions createPopCFOptions(long blockCacheSize, long writeBufferSize) {
136136
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig()
137137
.setFormatVersion(5)
138138
.setIndexType(IndexType.kBinarySearch)
@@ -145,7 +145,7 @@ public static ColumnFamilyOptions createPopCFOptions() {
145145
.setCacheIndexAndFilterBlocksWithHighPriority(true)
146146
.setPinL0FilterAndIndexBlocksInCache(false)
147147
.setPinTopLevelIndexAndFilter(true)
148-
.setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false))
148+
.setBlockCache(new LRUCache(blockCacheSize, 8, false))
149149
.setWholeKeyFiltering(true);
150150

151151
CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal()
@@ -160,7 +160,7 @@ public static ColumnFamilyOptions createPopCFOptions() {
160160
//noinspection resource
161161
return new ColumnFamilyOptions()
162162
.setMaxWriteBufferNumber(4)
163-
.setWriteBufferSize(128 * SizeUnit.MB)
163+
.setWriteBufferSize(writeBufferSize)
164164
.setMinWriteBufferNumberToMerge(1)
165165
.setTableFormatConfig(blockBasedTableConfig)
166166
.setMemTableConfig(new SkipListMemTableConfig())

0 commit comments

Comments
 (0)