Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.lite;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
Expand Down Expand Up @@ -76,11 +77,16 @@ public static void setUp() throws Exception {
LiteSharding liteSharding = Mockito.mock(LiteSharding.class);
TopicConfigManager topicConfigManager = Mockito.mock(TopicConfigManager.class);
SubscriptionGroupManager subscriptionGroupManager = Mockito.mock(SubscriptionGroupManager.class);
LiteSubscriptionRegistry liteSubscriptionRegistry = Mockito.mock(LiteSubscriptionRegistry.class);
ConsumerOffsetManager consumerOffsetManager = Mockito.mock(ConsumerOffsetManager.class);
when(consumerOffsetManager.getPullOffsetTable()).thenReturn(new ConcurrentHashMap<>());

when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(topicConfigManager.getTopicConfigTable()).thenReturn(TOPIC_CONFIG_TABLE);
when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(mockTopicConfig);
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>());
Expand Down Expand Up @@ -177,7 +183,6 @@ public void testCleanExpiredLiteTopic() {
}
}

@Ignore("Flaky: fails 2/100 runs (2.0%)")
@Test
public void testCleanByParentTopic() {
int num = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void recover(boolean concurrently) {
} else {
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
this.recover(logic);
logic.recover();
}
}
}
Expand All @@ -111,7 +111,7 @@ public void recover(boolean concurrently) {
* from it.
*/
@Override
public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBException {
public Long getDispatchFromPhyOffset(boolean recoverNormally) {
if (recoverNormally) {
return getMaxPhyOffsetInConsumeQueue();
} else {
Expand All @@ -127,7 +127,7 @@ public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBExce
public boolean recoverConcurrently() {
int count = 0;
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
count += maps.values().size();
count += maps.size();
}
final CountDownLatch countDownLatch = new CountDownLatch(count);
BlockingQueue<Runnable> recoverQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -206,15 +206,6 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
return 0;
}

private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
return findOrCreateConsumeQueue(topic, queueId);
}

public boolean load(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.load();
}

private boolean loadConsumeQueues(String storePath, CQType cqType) {
File dirLogic = new File(storePath);
File[] fileTopicList = dirLogic.listFiles();
Expand All @@ -237,7 +228,7 @@ private boolean loadConsumeQueues(String storePath, CQType cqType) {

ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
this.putConsumeQueue(topic, queueId, logic);
if (!this.load(logic)) {
if (!logic.load()) {
return false;
}
}
Expand Down Expand Up @@ -291,11 +282,6 @@ private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQue
new ThreadFactoryImpl(threadNamePrefix));
}

public void recover(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.recover();
}

@Override
public long getMaxPhyOffsetInConsumeQueue() {
long maxPhysicOffset = -1L;
Expand All @@ -319,71 +305,110 @@ public long getMinOffsetInQueue(String topic, int queueId) {
return -1;
}

public void checkSelf(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.checkSelf();
}

@Override
public void checkSelf() {
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) {
for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) {
this.checkSelf(cqEntry.getValue());
cqEntry.getValue().checkSelf();
}
}
}

public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.flush(flushLeastPages);
}

public void flush() throws StoreException {
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) {
for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) {
flush(cqEntry.getValue(), 0);
cqEntry.getValue().flush(0);
}
}
}

@Override
public void destroy(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.destroy();
consumeQueue.destroy();
if (MixAll.isLmq(consumeQueue.getTopic())) {
lmqCounter.decrementAndGet();
}
}

/**
* @deprecated Use {@link ConsumeQueueInterface#load()} directly instead.
*/
@Deprecated
public boolean load(ConsumeQueueInterface consumeQueue) {
return consumeQueue.load();
}

/**
* @deprecated Use {@link ConsumeQueueInterface#recover()} directly instead.
*/
@Deprecated
public void recover(ConsumeQueueInterface consumeQueue) {
consumeQueue.recover();
}

/**
* @deprecated Use {@link ConsumeQueueInterface#checkSelf()} directly instead.
*/
@Deprecated
public void checkSelf(ConsumeQueueInterface consumeQueue) {
consumeQueue.checkSelf();
}

/**
* @deprecated Use {@link ConsumeQueueInterface#flush(int)} directly instead.
*/
@Deprecated
public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
return consumeQueue.flush(flushLeastPages);
}

/**
* @deprecated Use {@link ConsumeQueueInterface#deleteExpiredFile(long)} directly instead.
*/
@Deprecated
public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
return consumeQueue.deleteExpiredFile(minCommitLogPos);
}

/**
* @deprecated Use {@link ConsumeQueueInterface#truncateDirtyLogicFiles(long)} directly instead.
*/
@Deprecated
public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset);
consumeQueue.truncateDirtyLogicFiles(phyOffset);
}

/**
* @deprecated Use {@link ConsumeQueueInterface#swapMap(int, long, long)} directly instead.
*/
@Deprecated
public void swapMap(ConsumeQueueInterface consumeQueue, int reserveNum, long forceSwapIntervalMs,
long normalSwapIntervalMs) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs);
consumeQueue.swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs);
}

/**
* @deprecated Use {@link ConsumeQueueInterface#cleanSwappedMap(long)} directly instead.
*/
@Deprecated
public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long forceCleanSwapIntervalMs) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.cleanSwappedMap(forceCleanSwapIntervalMs);
consumeQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
}

/**
* @deprecated Use {@link ConsumeQueueInterface#isFirstFileAvailable()} directly instead.
*/
@Deprecated
public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.isFirstFileAvailable();
return consumeQueue.isFirstFileAvailable();
}

/**
* @deprecated Use {@link ConsumeQueueInterface#isFirstFileExist()} directly instead.
*/
@Deprecated
public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.isFirstFileExist();
return consumeQueue.isFirstFileExist();
}

@Override
Expand Down Expand Up @@ -604,7 +629,7 @@ public void truncateDirty(long offsetToTruncate) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, offsetToTruncate);
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
this.truncateDirtyLogicFiles(logic, offsetToTruncate);
logic.truncateDirtyLogicFiles(offsetToTruncate);
}
}
}
Expand Down Expand Up @@ -667,7 +692,7 @@ private void doFlush(int retryTimes) {
for (ConsumeQueueInterface cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
result = flush(cq, flushConsumeQueueLeastPages);
result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
Expand Down Expand Up @@ -736,7 +761,7 @@ private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long
return false;
}
// If first exist and not available, it means first file may destroy failed, delete it.
if (isFirstFileExist(logic) && !isFirstFileAvailable(logic)) {
if (logic.isFirstFileExist() && !logic.isFirstFileAvailable()) {
log.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." +
" topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " +
"in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}"
Expand Down Expand Up @@ -821,7 +846,7 @@ private void correctLogicMinOffset() {
}

private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) {
deleteExpiredFile(logic, minPhyOffset);
logic.deleteExpiredFile(minPhyOffset);
int sleepIntervalWhenCorrectMinOffset = messageStoreConfig.getCorrectLogicMinOffsetSleepInterval();
if (sleepIntervalWhenCorrectMinOffset > 0) {
try {
Expand Down Expand Up @@ -859,7 +884,7 @@ protected void deleteExpiredFiles() {

for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
int deleteCount = deleteExpiredFile(logic, minOffset);
int deleteCount = logic.deleteExpiredFile(minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
Expand Down
Loading