Skip to content

Commit ea67835

Browse files
lizhiminsclaude
andcommitted
[ISSUE #10270] Expose Pop RocksDB memtable size as configurable properties
Add writeBufferSize to MessageStoreConfig with a default of 32MB, replacing the hardcoded value in createPopCFOptions(). maxWriteBufferNumber and minWriteBufferNumberToMerge remain hardcoded as they rarely need tuning. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent f5943a4 commit ea67835

5 files changed

Lines changed: 23 additions & 8 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,12 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P
4747
private WriteOptions deleteOptions;
4848
protected ColumnFamilyHandle columnFamilyHandle;
4949
private final long blockCacheSize;
50+
private final long writeBufferSize;
5051

51-
public PopConsumerRocksdbStore(String filePath, long blockCacheSize) {
52+
public PopConsumerRocksdbStore(String filePath, long blockCacheSize, long writeBufferSize) {
5253
super(filePath);
5354
this.blockCacheSize = blockCacheSize;
55+
this.writeBufferSize = writeBufferSize;
5456
}
5557

5658
// https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
@@ -85,8 +87,8 @@ protected boolean postLoad() {
8587
initOptions();
8688

8789
// init column family here
88-
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize);
89-
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize);
90+
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize, writeBufferSize);
91+
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize, writeBufferSize);
9092
this.cfOptions.add(defaultOptions);
9193
this.cfOptions.add(popStateOptions);
9294

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ public PopConsumerService(BrokerController brokerController) {
9999
this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
100100
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
101101
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString(),
102-
brokerController.getMessageStoreConfig().getPopRocksdbBlockCacheSize());
102+
brokerController.getMessageStoreConfig().getPopRocksdbBlockCacheSize(),
103+
brokerController.getMessageStoreConfig().getPopRocksdbWriteBufferSize());
103104
this.popConsumerCache = brokerConfig.isEnablePopBufferMerge() ? new PopConsumerCache(
104105
brokerController, this.popConsumerStore, this.consumerLockService, this::revive) : null;
105106

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public static PopConsumerRecord getConsumerRecord() {
6666
@Test
6767
public void rocksdbStoreWriteDeleteTest() {
6868
String filePath = getRandomStorePath();
69-
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath, 256 * SizeUnit.MB);
69+
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(
70+
filePath, 256 * SizeUnit.MB, 32 * SizeUnit.MB);
7071
Assert.assertEquals(filePath, consumerStore.getFilePath());
7172

7273
consumerStore.start();
@@ -128,7 +129,8 @@ private long getDirectorySizeRecursive(File directory) {
128129
@Ignore
129130
@SuppressWarnings("ConstantValue")
130131
public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException {
131-
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath(), 256 * SizeUnit.MB);
132+
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(
133+
getRandomStorePath(), 256 * SizeUnit.MB, 32 * SizeUnit.MB);
132134
rocksdbStore.start();
133135

134136
int iterCount = 1000 * 1000;

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,8 @@ public class MessageStoreConfig {
507507

508508
private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB;
509509

510+
private long popRocksdbWriteBufferSize = 32 * SizeUnit.MB;
511+
510512
/**
511513
* Flush RocksDB WAL frequency, aka, flush WAL every N write ops.
512514
*/
@@ -541,6 +543,14 @@ public void setPopRocksdbBlockCacheSize(long popRocksdbBlockCacheSize) {
541543
this.popRocksdbBlockCacheSize = popRocksdbBlockCacheSize;
542544
}
543545

546+
public long getPopRocksdbWriteBufferSize() {
547+
return popRocksdbWriteBufferSize;
548+
}
549+
550+
public void setPopRocksdbWriteBufferSize(long popRocksdbWriteBufferSize) {
551+
this.popRocksdbWriteBufferSize = popRocksdbWriteBufferSize;
552+
}
553+
544554
/**
545555
* Spin number in the retreat strategy of spin lock
546556
* Default is 1000

store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static ColumnFamilyOptions createOffsetCFOptions() {
132132
setInplaceUpdateSupport(true);
133133
}
134134

135-
public static ColumnFamilyOptions createPopCFOptions(long blockCacheSize) {
135+
public static ColumnFamilyOptions createPopCFOptions(long blockCacheSize, long writeBufferSize) {
136136
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig()
137137
.setFormatVersion(5)
138138
.setIndexType(IndexType.kBinarySearch)
@@ -160,7 +160,7 @@ public static ColumnFamilyOptions createPopCFOptions(long blockCacheSize) {
160160
//noinspection resource
161161
return new ColumnFamilyOptions()
162162
.setMaxWriteBufferNumber(4)
163-
.setWriteBufferSize(128 * SizeUnit.MB)
163+
.setWriteBufferSize(writeBufferSize)
164164
.setMinWriteBufferNumberToMerge(1)
165165
.setTableFormatConfig(blockBasedTableConfig)
166166
.setMemTableConfig(new SkipListMemTableConfig())

0 commit comments

Comments
 (0)