Skip to content

Commit 9e2d877

Browse files
authored
[ISSUE #10375] Fix race condition between deleteTopic and FlushConsumeQueueService by removing getLifeCycle indirection (#10376)
1 parent 980f3d7 commit 9e2d877

2 files changed

Lines changed: 80 additions & 50 deletions

File tree

broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.broker.lite;
1919

2020
import org.apache.rocketmq.broker.BrokerController;
21+
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
2122
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
2223
import org.apache.rocketmq.broker.topic.TopicConfigManager;
2324
import org.apache.rocketmq.common.BrokerConfig;
@@ -76,11 +77,16 @@ public static void setUp() throws Exception {
7677
LiteSharding liteSharding = Mockito.mock(LiteSharding.class);
7778
TopicConfigManager topicConfigManager = Mockito.mock(TopicConfigManager.class);
7879
SubscriptionGroupManager subscriptionGroupManager = Mockito.mock(SubscriptionGroupManager.class);
80+
LiteSubscriptionRegistry liteSubscriptionRegistry = Mockito.mock(LiteSubscriptionRegistry.class);
81+
ConsumerOffsetManager consumerOffsetManager = Mockito.mock(ConsumerOffsetManager.class);
82+
when(consumerOffsetManager.getPullOffsetTable()).thenReturn(new ConcurrentHashMap<>());
7983

8084
when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
8185
when(brokerController.getMessageStore()).thenReturn(messageStore);
8286
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
8387
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
88+
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
89+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
8490
when(topicConfigManager.getTopicConfigTable()).thenReturn(TOPIC_CONFIG_TABLE);
8591
when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(mockTopicConfig);
8692
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>());
@@ -177,7 +183,6 @@ public void testCleanExpiredLiteTopic() {
177183
}
178184
}
179185

180-
@Ignore("Flaky: fails 2/100 runs (2.0%)")
181186
@Test
182187
public void testCleanByParentTopic() {
183188
int num = 3;

store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void recover(boolean concurrently) {
9999
} else {
100100
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
101101
for (ConsumeQueueInterface logic : maps.values()) {
102-
this.recover(logic);
102+
logic.recover();
103103
}
104104
}
105105
}
@@ -111,7 +111,7 @@ public void recover(boolean concurrently) {
111111
* from it.
112112
*/
113113
@Override
114-
public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBException {
114+
public Long getDispatchFromPhyOffset(boolean recoverNormally) {
115115
if (recoverNormally) {
116116
return getMaxPhyOffsetInConsumeQueue();
117117
} else {
@@ -127,7 +127,7 @@ public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBExce
127127
public boolean recoverConcurrently() {
128128
int count = 0;
129129
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
130-
count += maps.values().size();
130+
count += maps.size();
131131
}
132132
final CountDownLatch countDownLatch = new CountDownLatch(count);
133133
BlockingQueue<Runnable> recoverQueue = new LinkedBlockingQueue<>();
@@ -206,15 +206,6 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
206206
return 0;
207207
}
208208

209-
private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
210-
return findOrCreateConsumeQueue(topic, queueId);
211-
}
212-
213-
public boolean load(ConsumeQueueInterface consumeQueue) {
214-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
215-
return fileQueueLifeCycle.load();
216-
}
217-
218209
private boolean loadConsumeQueues(String storePath, CQType cqType) {
219210
File dirLogic = new File(storePath);
220211
File[] fileTopicList = dirLogic.listFiles();
@@ -237,7 +228,7 @@ private boolean loadConsumeQueues(String storePath, CQType cqType) {
237228

238229
ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
239230
this.putConsumeQueue(topic, queueId, logic);
240-
if (!this.load(logic)) {
231+
if (!logic.load()) {
241232
return false;
242233
}
243234
}
@@ -291,11 +282,6 @@ private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQue
291282
new ThreadFactoryImpl(threadNamePrefix));
292283
}
293284

294-
public void recover(ConsumeQueueInterface consumeQueue) {
295-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
296-
fileQueueLifeCycle.recover();
297-
}
298-
299285
@Override
300286
public long getMaxPhyOffsetInConsumeQueue() {
301287
long maxPhysicOffset = -1L;
@@ -319,71 +305,110 @@ public long getMinOffsetInQueue(String topic, int queueId) {
319305
return -1;
320306
}
321307

322-
public void checkSelf(ConsumeQueueInterface consumeQueue) {
323-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
324-
fileQueueLifeCycle.checkSelf();
325-
}
326-
327308
@Override
328309
public void checkSelf() {
329310
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) {
330311
for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) {
331-
this.checkSelf(cqEntry.getValue());
312+
cqEntry.getValue().checkSelf();
332313
}
333314
}
334315
}
335316

336-
public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
337-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
338-
return fileQueueLifeCycle.flush(flushLeastPages);
339-
}
340-
341317
public void flush() throws StoreException {
342318
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) {
343319
for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) {
344-
flush(cqEntry.getValue(), 0);
320+
cqEntry.getValue().flush(0);
345321
}
346322
}
347323
}
348324

349325
@Override
350326
public void destroy(ConsumeQueueInterface consumeQueue) {
351-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
352-
fileQueueLifeCycle.destroy();
327+
consumeQueue.destroy();
353328
if (MixAll.isLmq(consumeQueue.getTopic())) {
354329
lmqCounter.decrementAndGet();
355330
}
356331
}
357332

333+
/**
334+
* @deprecated Use {@link ConsumeQueueInterface#load()} directly instead.
335+
*/
336+
@Deprecated
337+
public boolean load(ConsumeQueueInterface consumeQueue) {
338+
return consumeQueue.load();
339+
}
340+
341+
/**
342+
* @deprecated Use {@link ConsumeQueueInterface#recover()} directly instead.
343+
*/
344+
@Deprecated
345+
public void recover(ConsumeQueueInterface consumeQueue) {
346+
consumeQueue.recover();
347+
}
348+
349+
/**
350+
* @deprecated Use {@link ConsumeQueueInterface#checkSelf()} directly instead.
351+
*/
352+
@Deprecated
353+
public void checkSelf(ConsumeQueueInterface consumeQueue) {
354+
consumeQueue.checkSelf();
355+
}
356+
357+
/**
358+
* @deprecated Use {@link ConsumeQueueInterface#flush(int)} directly instead.
359+
*/
360+
@Deprecated
361+
public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
362+
return consumeQueue.flush(flushLeastPages);
363+
}
364+
365+
/**
366+
* @deprecated Use {@link ConsumeQueueInterface#deleteExpiredFile(long)} directly instead.
367+
*/
368+
@Deprecated
358369
public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) {
359-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
360-
return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
370+
return consumeQueue.deleteExpiredFile(minCommitLogPos);
361371
}
362372

373+
/**
374+
* @deprecated Use {@link ConsumeQueueInterface#truncateDirtyLogicFiles(long)} directly instead.
375+
*/
376+
@Deprecated
363377
public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) {
364-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
365-
fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset);
378+
consumeQueue.truncateDirtyLogicFiles(phyOffset);
366379
}
367380

381+
/**
382+
* @deprecated Use {@link ConsumeQueueInterface#swapMap(int, long, long)} directly instead.
383+
*/
384+
@Deprecated
368385
public void swapMap(ConsumeQueueInterface consumeQueue, int reserveNum, long forceSwapIntervalMs,
369386
long normalSwapIntervalMs) {
370-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
371-
fileQueueLifeCycle.swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs);
387+
consumeQueue.swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs);
372388
}
373389

390+
/**
391+
* @deprecated Use {@link ConsumeQueueInterface#cleanSwappedMap(long)} directly instead.
392+
*/
393+
@Deprecated
374394
public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long forceCleanSwapIntervalMs) {
375-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
376-
fileQueueLifeCycle.cleanSwappedMap(forceCleanSwapIntervalMs);
395+
consumeQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
377396
}
378397

398+
/**
399+
* @deprecated Use {@link ConsumeQueueInterface#isFirstFileAvailable()} directly instead.
400+
*/
401+
@Deprecated
379402
public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) {
380-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
381-
return fileQueueLifeCycle.isFirstFileAvailable();
403+
return consumeQueue.isFirstFileAvailable();
382404
}
383405

406+
/**
407+
* @deprecated Use {@link ConsumeQueueInterface#isFirstFileExist()} directly instead.
408+
*/
409+
@Deprecated
384410
public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) {
385-
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
386-
return fileQueueLifeCycle.isFirstFileExist();
411+
return consumeQueue.isFirstFileExist();
387412
}
388413

389414
@Override
@@ -604,7 +629,7 @@ public void truncateDirty(long offsetToTruncate) {
604629
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, offsetToTruncate);
605630
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
606631
for (ConsumeQueueInterface logic : maps.values()) {
607-
this.truncateDirtyLogicFiles(logic, offsetToTruncate);
632+
logic.truncateDirtyLogicFiles(offsetToTruncate);
608633
}
609634
}
610635
}
@@ -667,7 +692,7 @@ private void doFlush(int retryTimes) {
667692
for (ConsumeQueueInterface cq : maps.values()) {
668693
boolean result = false;
669694
for (int i = 0; i < retryTimes && !result; i++) {
670-
result = flush(cq, flushConsumeQueueLeastPages);
695+
result = cq.flush(flushConsumeQueueLeastPages);
671696
}
672697
}
673698
}
@@ -736,7 +761,7 @@ private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long
736761
return false;
737762
}
738763
// If first exist and not available, it means first file may destroy failed, delete it.
739-
if (isFirstFileExist(logic) && !isFirstFileAvailable(logic)) {
764+
if (logic.isFirstFileExist() && !logic.isFirstFileAvailable()) {
740765
log.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." +
741766
" topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " +
742767
"in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}"
@@ -821,7 +846,7 @@ private void correctLogicMinOffset() {
821846
}
822847

823848
private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) {
824-
deleteExpiredFile(logic, minPhyOffset);
849+
logic.deleteExpiredFile(minPhyOffset);
825850
int sleepIntervalWhenCorrectMinOffset = messageStoreConfig.getCorrectLogicMinOffsetSleepInterval();
826851
if (sleepIntervalWhenCorrectMinOffset > 0) {
827852
try {
@@ -859,7 +884,7 @@ protected void deleteExpiredFiles() {
859884

860885
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : consumeQueueTable.values()) {
861886
for (ConsumeQueueInterface logic : maps.values()) {
862-
int deleteCount = deleteExpiredFile(logic, minOffset);
887+
int deleteCount = logic.deleteExpiredFile(minOffset);
863888
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
864889
try {
865890
Thread.sleep(deleteLogicsFilesInterval);

0 commit comments

Comments
 (0)