Skip to content

Commit f5943a4

Browse files
lizhiminsclaude
andcommitted
[ISSUE #10270] Make Pop RocksDB BlockCache size configurable via MessageStoreConfig
Add popRocksdbBlockCacheSize to MessageStoreConfig with a default of 256MB, replacing the hardcoded value in RocksDBOptionsFactory.createPopCFOptions(). Default ColumnFamily uses a minimal 16MB cache since it stores no meaningful data. This allows operators to tune Pop RocksDB memory usage based on container memory budgets. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent f04eafc commit f5943a4

5 files changed

Lines changed: 22 additions & 8 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P
4646
private WriteOptions writeOptions;
4747
private WriteOptions deleteOptions;
4848
protected ColumnFamilyHandle columnFamilyHandle;
49+
private final long blockCacheSize;
4950

50-
public PopConsumerRocksdbStore(String filePath) {
51+
public PopConsumerRocksdbStore(String filePath, long blockCacheSize) {
5152
super(filePath);
53+
this.blockCacheSize = blockCacheSize;
5254
}
5355

5456
// https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
@@ -83,8 +85,8 @@ protected boolean postLoad() {
8385
initOptions();
8486

8587
// init column family here
86-
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions();
87-
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions();
88+
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize);
89+
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize);
8890
this.cfOptions.add(defaultOptions);
8991
this.cfOptions.add(popStateOptions);
9092

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public PopConsumerService(BrokerController brokerController) {
9898
this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
9999
this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
100100
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
101-
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString());
101+
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString(),
102+
brokerController.getMessageStoreConfig().getPopRocksdbBlockCacheSize());
102103
this.popConsumerCache = brokerConfig.isEnablePopBufferMerge() ? new PopConsumerCache(
103104
brokerController, this.popConsumerStore, this.consumerLockService, this::revive) : null;
104105

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.stream.Collectors;
2929
import java.util.stream.IntStream;
3030
import org.apache.commons.io.FileUtils;
31+
import org.rocksdb.util.SizeUnit;
3132
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
3233
import org.apache.rocketmq.common.constant.LoggerName;
3334
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
@@ -65,7 +66,7 @@ public static PopConsumerRecord getConsumerRecord() {
6566
@Test
6667
public void rocksdbStoreWriteDeleteTest() {
6768
String filePath = getRandomStorePath();
68-
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath);
69+
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath, 256 * SizeUnit.MB);
6970
Assert.assertEquals(filePath, consumerStore.getFilePath());
7071

7172
consumerStore.start();
@@ -127,7 +128,7 @@ private long getDirectorySizeRecursive(File directory) {
127128
@Ignore
128129
@SuppressWarnings("ConstantValue")
129130
public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException {
130-
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath());
131+
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath(), 256 * SizeUnit.MB);
131132
rocksdbStore.start();
132133

133134
int iterCount = 1000 * 1000;

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,8 @@ public class MessageStoreConfig {
505505

506506
private String rocksdbCompressionType = CompressionType.LZ4_COMPRESSION.getLibraryName();
507507

508+
private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB;
509+
508510
/**
509511
* Flush RocksDB WAL frequency, aka, flush WAL every N write ops.
510512
*/
@@ -531,6 +533,14 @@ public void setRocksdbCompressionType(String compressionType) {
531533
this.rocksdbCompressionType = compressionType;
532534
}
533535

536+
public long getPopRocksdbBlockCacheSize() {
537+
return popRocksdbBlockCacheSize;
538+
}
539+
540+
public void setPopRocksdbBlockCacheSize(long popRocksdbBlockCacheSize) {
541+
this.popRocksdbBlockCacheSize = popRocksdbBlockCacheSize;
542+
}
543+
534544
/**
535545
* Spin number in the retreat strategy of spin lock
536546
* Default is 1000

store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static ColumnFamilyOptions createOffsetCFOptions() {
132132
setInplaceUpdateSupport(true);
133133
}
134134

135-
public static ColumnFamilyOptions createPopCFOptions() {
135+
public static ColumnFamilyOptions createPopCFOptions(long blockCacheSize) {
136136
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig()
137137
.setFormatVersion(5)
138138
.setIndexType(IndexType.kBinarySearch)
@@ -145,7 +145,7 @@ public static ColumnFamilyOptions createPopCFOptions() {
145145
.setCacheIndexAndFilterBlocksWithHighPriority(true)
146146
.setPinL0FilterAndIndexBlocksInCache(false)
147147
.setPinTopLevelIndexAndFilter(true)
148-
.setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false))
148+
.setBlockCache(new LRUCache(blockCacheSize, 8, false))
149149
.setWholeKeyFiltering(true);
150150

151151
CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal()

0 commit comments

Comments
 (0)