From ec92ad5f99a0bc4452d0fdd649011294f01864ce Mon Sep 17 00:00:00 2001 From: Quan Date: Mon, 25 May 2026 19:22:30 +0800 Subject: [PATCH] fix(store): Fix race condition between deleteTopic and FlushConsumeQueueService by removing getLifeCycle indirection - Remove getLifeCycle() which delegates to findOrCreateConsumeQueue(), eliminating the risk of recreating a deleted topic's ConsumeQueue during concurrent flush operations - Call ConsumeQueueInterface methods directly on the instance instead of re-looking up through findOrCreateConsumeQueue - Deprecate delegate methods (load, recover, checkSelf, flush, deleteExpiredFile, truncateDirtyLogicFiles, swapMap, cleanSwappedMap, isFirstFileAvailable, isFirstFileExist) that previously used getLifeCycle indirection - Remove unnecessary RocksDBException from getDispatchFromPhyOffset signature - Fix maps.values().size() to maps.size() in recoverConcurrently --- .../broker/lite/LiteLifecycleManagerTest.java | 7 +- .../store/queue/ConsumeQueueStore.java | 123 +++++++++++------- 2 files changed, 80 insertions(+), 50 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java index 312b8a29fa4..00dcb79c8de 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.lite; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; @@ -76,11 +77,16 @@ public static void setUp() throws Exception { LiteSharding liteSharding = Mockito.mock(LiteSharding.class); TopicConfigManager topicConfigManager = Mockito.mock(TopicConfigManager.class); SubscriptionGroupManager subscriptionGroupManager = Mockito.mock(SubscriptionGroupManager.class); + LiteSubscriptionRegistry liteSubscriptionRegistry = Mockito.mock(LiteSubscriptionRegistry.class); + ConsumerOffsetManager consumerOffsetManager = Mockito.mock(ConsumerOffsetManager.class); + when(consumerOffsetManager.getPullOffsetTable()).thenReturn(new ConcurrentHashMap<>()); when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG); when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); + when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry); + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); when(topicConfigManager.getTopicConfigTable()).thenReturn(TOPIC_CONFIG_TABLE); when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(mockTopicConfig); when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>()); @@ -177,7 +183,6 @@ public void testCleanExpiredLiteTopic() { } } - @Ignore("Flaky: fails 2/100 runs (2.0%)") @Test public void testCleanByParentTopic() { int num = 3; diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 7a5616bab7f..950aa483082 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -99,7 +99,7 @@ public void recover(boolean concurrently) { } else { for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueueInterface logic : maps.values()) { - this.recover(logic); + logic.recover(); } } } @@ -111,7 +111,7 @@ public void recover(boolean concurrently) { * from it. */ @Override - public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBException { + public Long getDispatchFromPhyOffset(boolean recoverNormally) { if (recoverNormally) { return getMaxPhyOffsetInConsumeQueue(); } else { @@ -127,7 +127,7 @@ public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBExce public boolean recoverConcurrently() { int count = 0; for (ConcurrentMap maps : this.consumeQueueTable.values()) { - count += maps.values().size(); + count += maps.size(); } final CountDownLatch countDownLatch = new CountDownLatch(count); BlockingQueue recoverQueue = new LinkedBlockingQueue<>(); @@ -206,15 +206,6 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo return 0; } - private FileQueueLifeCycle getLifeCycle(String topic, int queueId) { - return findOrCreateConsumeQueue(topic, queueId); - } - - public boolean load(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - return fileQueueLifeCycle.load(); - } - private boolean loadConsumeQueues(String storePath, CQType cqType) { File dirLogic = new File(storePath); File[] fileTopicList = dirLogic.listFiles(); @@ -237,7 +228,7 @@ private boolean loadConsumeQueues(String storePath, CQType cqType) { ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath); this.putConsumeQueue(topic, queueId, logic); - if (!this.load(logic)) { + if (!logic.load()) { return false; } } @@ -291,11 +282,6 @@ private ExecutorService buildExecutorService(BlockingQueue blockingQue new ThreadFactoryImpl(threadNamePrefix)); } - public void recover(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - fileQueueLifeCycle.recover(); - } - @Override public long getMaxPhyOffsetInConsumeQueue() { long maxPhysicOffset = -1L; @@ -319,71 +305,110 @@ public long getMinOffsetInQueue(String topic, int queueId) { return -1; } - public void checkSelf(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - fileQueueLifeCycle.checkSelf(); - } - @Override public void checkSelf() { for (Map.Entry> topicEntry : this.consumeQueueTable.entrySet()) { for (Map.Entry cqEntry : topicEntry.getValue().entrySet()) { - this.checkSelf(cqEntry.getValue()); + cqEntry.getValue().checkSelf(); } } } - public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - return fileQueueLifeCycle.flush(flushLeastPages); - } - public void flush() throws StoreException { for (Map.Entry> topicEntry : this.consumeQueueTable.entrySet()) { for (Map.Entry cqEntry : topicEntry.getValue().entrySet()) { - flush(cqEntry.getValue(), 0); + cqEntry.getValue().flush(0); } } } @Override public void destroy(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - fileQueueLifeCycle.destroy(); + consumeQueue.destroy(); if (MixAll.isLmq(consumeQueue.getTopic())) { lmqCounter.decrementAndGet(); } } + /** + * @deprecated Use {@link ConsumeQueueInterface#load()} directly instead. + */ + @Deprecated + public boolean load(ConsumeQueueInterface consumeQueue) { + return consumeQueue.load(); + } + + /** + * @deprecated Use {@link ConsumeQueueInterface#recover()} directly instead. + */ + @Deprecated + public void recover(ConsumeQueueInterface consumeQueue) { + consumeQueue.recover(); + } + + /** + * @deprecated Use {@link ConsumeQueueInterface#checkSelf()} directly instead. + */ + @Deprecated + public void checkSelf(ConsumeQueueInterface consumeQueue) { + consumeQueue.checkSelf(); + } + + /** + * @deprecated Use {@link ConsumeQueueInterface#flush(int)} directly instead. + */ + @Deprecated + public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) { + return consumeQueue.flush(flushLeastPages); + } + + /** + * @deprecated Use {@link ConsumeQueueInterface#deleteExpiredFile(long)} directly instead. + */ + @Deprecated public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos); + return consumeQueue.deleteExpiredFile(minCommitLogPos); } + /** + * @deprecated Use {@link ConsumeQueueInterface#truncateDirtyLogicFiles(long)} directly instead. + */ + @Deprecated public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset); + consumeQueue.truncateDirtyLogicFiles(phyOffset); } + /** + * @deprecated Use {@link ConsumeQueueInterface#swapMap(int, long, long)} directly instead. + */ + @Deprecated public void swapMap(ConsumeQueueInterface consumeQueue, int reserveNum, long forceSwapIntervalMs, long normalSwapIntervalMs) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - fileQueueLifeCycle.swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs); + consumeQueue.swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs); } + /** + * @deprecated Use {@link ConsumeQueueInterface#cleanSwappedMap(long)} directly instead. + */ + @Deprecated public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long forceCleanSwapIntervalMs) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - fileQueueLifeCycle.cleanSwappedMap(forceCleanSwapIntervalMs); + consumeQueue.cleanSwappedMap(forceCleanSwapIntervalMs); } + /** + * @deprecated Use {@link ConsumeQueueInterface#isFirstFileAvailable()} directly instead. + */ + @Deprecated public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - return fileQueueLifeCycle.isFirstFileAvailable(); + return consumeQueue.isFirstFileAvailable(); } + /** + * @deprecated Use {@link ConsumeQueueInterface#isFirstFileExist()} directly instead. + */ + @Deprecated public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); - return fileQueueLifeCycle.isFirstFileExist(); + return consumeQueue.isFirstFileExist(); } @Override @@ -604,7 +629,7 @@ public void truncateDirty(long offsetToTruncate) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, offsetToTruncate); for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueueInterface logic : maps.values()) { - this.truncateDirtyLogicFiles(logic, offsetToTruncate); + logic.truncateDirtyLogicFiles(offsetToTruncate); } } } @@ -667,7 +692,7 @@ private void doFlush(int retryTimes) { for (ConsumeQueueInterface cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { - result = flush(cq, flushConsumeQueueLeastPages); + result = cq.flush(flushConsumeQueueLeastPages); } } } @@ -736,7 +761,7 @@ private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long return false; } // If first exist and not available, it means first file may destroy failed, delete it. - if (isFirstFileExist(logic) && !isFirstFileAvailable(logic)) { + if (logic.isFirstFileExist() && !logic.isFirstFileAvailable()) { log.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." + " topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " + "in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}" @@ -821,7 +846,7 @@ private void correctLogicMinOffset() { } private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) { - deleteExpiredFile(logic, minPhyOffset); + logic.deleteExpiredFile(minPhyOffset); int sleepIntervalWhenCorrectMinOffset = messageStoreConfig.getCorrectLogicMinOffsetSleepInterval(); if (sleepIntervalWhenCorrectMinOffset > 0) { try { @@ -859,7 +884,7 @@ protected void deleteExpiredFiles() { for (ConcurrentMap maps : consumeQueueTable.values()) { for (ConsumeQueueInterface logic : maps.values()) { - int deleteCount = deleteExpiredFile(logic, minOffset); + int deleteCount = logic.deleteExpiredFile(minOffset); if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { Thread.sleep(deleteLogicsFilesInterval);