Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), popCheckPoint.getCId(), messageExt));
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
Expand Down Expand Up @@ -166,12 +166,13 @@ private void initPopRetryOffset(String retryTopic, String consumerGroup, int ret
public void addRetryTopicIfNotExist(String retryTopic, String consumerGroup) {
if (brokerController != null) {
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
if (topicConfig != null && !brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
boolean useSeparate = checkUseSeparateRetryQueue(retryTopic, consumerGroup);
if (topicConfig != null && !useSeparate) {
return;
}

int retryQueueNum = PopAckConstants.retryQueueNum;
if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
if (useSeparate) {
String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, consumerGroup);
TopicConfig normalConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // always exists
retryQueueNum = normalConfig.getWriteQueueNums();
Expand All @@ -193,8 +194,8 @@ public void addRetryTopicIfNotExist(String retryTopic, String consumerGroup) {
}
}

private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
private int getRetryQueueId(String retryTopic, String consumerGroup, MessageExt messageExt) {
if (!checkUseSeparateRetryQueue(retryTopic, consumerGroup)) {
return 0;
}
int oriQueueId = messageExt.getQueueId(); // original qid of normal or retry topic
Expand Down Expand Up @@ -729,4 +730,19 @@ ArrayList<PopCheckPoint> genSortList() {
return sortList;
}
}

private boolean checkUseSeparateRetryQueue(String retryTopic, String consumerGroup) {
if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
return true;
}
if (brokerController.getBrokerConfig().isUseSeparateRetryQueueForPriorityTopic()) {
String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, consumerGroup);
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic);
if (topicConfig != null && topicConfig.getAttributes() != null) {
String priorityStr = topicConfig.getAttributes().get("priority");
return Boolean.parseBoolean(priorityStr);
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -104,7 +107,6 @@ public class PopReviveServiceTest {
private BrokerMetricsManager brokerMetricsManager;
@Mock
private PopMetricsManager popMetricsManager;
private PopMessageProcessor popMessageProcessor;

private BrokerConfig brokerConfig;
private PopReviveService popReviveService;
Expand All @@ -129,8 +131,7 @@ public void before() {
// Initialize BrokerMetricsManager for tests
when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager);

popMessageProcessor = new PopMessageProcessor(brokerController); // a real one, not mock
PopMessageProcessor popMessageProcessor = new PopMessageProcessor(brokerController); // a real one, not mock
when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);

popReviveService = spy(new PopReviveService(brokerController, REVIVE_TOPIC, REVIVE_QUEUE_ID));
Expand Down Expand Up @@ -555,4 +556,143 @@ public static MessageExtBrokerInner buildAckInnerMessage(String reviveTopic, Ack

return msgInner;
}

@Test
public void testRetryQueueRouting_GlobalSwitchOverridesAll() throws Exception {
brokerConfig.setUseSeparateRetryQueue(true);
brokerConfig.setUseSeparateRetryQueueForPriorityTopic(false);

Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class);
method.setAccessible(true);

boolean result = (boolean) method.invoke(popReviveService, "%RETRY%groupA", "groupA");
Assert.assertTrue("Global switch should force separate retry queue", result);
}

@Test
public void testRetryQueueRouting_PriorityTopicWithNewFlag() throws Exception {
brokerConfig.setUseSeparateRetryQueue(false);
brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true);

TopicConfig priorityTopicConfig = new TopicConfig("NormalPriorityTopic");
Map<String, String> attributes = new HashMap<>();
attributes.put("priority", "true");
priorityTopicConfig.setAttributes(attributes);
when(topicConfigManager.selectTopicConfig("NormalPriorityTopic")).thenReturn(priorityTopicConfig);

Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class);
method.setAccessible(true);

String retryTopic = KeyBuilder.buildPopRetryTopic("NormalPriorityTopic", "groupA", false);
boolean result = (boolean) method.invoke(popReviveService, retryTopic, "groupA");

Assert.assertTrue("Priority topic with new flag should use separate retry queue", result);
}

@Test
public void testRetryQueueRouting_NormalTopicWithNewFlag() throws Exception {
brokerConfig.setUseSeparateRetryQueue(false);
brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true);

TopicConfig normalTopicConfig = new TopicConfig("JustANormalTopic");
when(topicConfigManager.selectTopicConfig("JustANormalTopic")).thenReturn(normalTopicConfig);

java.lang.reflect.Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class);
method.setAccessible(true);

String retryTopic = KeyBuilder.buildPopRetryTopic("JustANormalTopic", "groupA", false);
boolean result = (boolean) method.invoke(popReviveService, retryTopic, "groupA");

Assert.assertFalse("Normal topic should NOT use separate retry queue (will fallback to queue 0)", result);
}

@Test
public void testRetryQueueRouting_BothFlagsDisabled() throws Exception {
brokerConfig.setUseSeparateRetryQueue(false);
brokerConfig.setUseSeparateRetryQueueForPriorityTopic(false);

Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class);
method.setAccessible(true);

boolean result = (boolean) method.invoke(popReviveService, "%RETRY%groupA", "groupA");
Assert.assertFalse("Should use shared retry queue 0 when all flags are false", result);
}

@Test
public void testPriorityInversionBugIsFixed_QueueIsolation() throws Throwable {
brokerConfig.setUseSeparateRetryQueue(false);
brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true);

String consumerGroup = "PriorityGroup";
String normalTopic = "NormalTopic";
String priorityTopic = "PriorityTopic";

TopicConfig normalTopicConfig = new TopicConfig(normalTopic);
when(topicConfigManager.selectTopicConfig(normalTopic)).thenReturn(normalTopicConfig);

TopicConfig priorityTopicConfig = new TopicConfig(priorityTopic);
java.util.Map<String, String> attributes = new java.util.HashMap<>();
attributes.put("priority", "true");
priorityTopicConfig.setAttributes(attributes);
when(topicConfigManager.selectTopicConfig(priorityTopic)).thenReturn(priorityTopicConfig);

long pastPopTime = System.currentTimeMillis() - 5000;

PopCheckPoint normalCk = buildPopCheckPoint(100, pastPopTime, 1);
normalCk.setTopic(normalTopic);
normalCk.setCId(consumerGroup);
normalCk.setQueueId(0);

PopCheckPoint priorityCk = buildPopCheckPoint(200, pastPopTime, 2);
priorityCk.setTopic(priorityTopic);
priorityCk.setCId(consumerGroup);
priorityCk.setQueueId(1);

PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj();
reviveObj.map.put("normal", normalCk);
reviveObj.map.put("priority", priorityCk);
reviveObj.endTime = System.currentTimeMillis();

when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean()))
.thenAnswer(invocation -> {
long offset = invocation.getArgument(1);
MessageExt msgExt = new MessageExt();

if (offset == 100) {
msgExt.setTopic(normalTopic);
msgExt.setQueueId(0);
msgExt.putUserProperty("TEST_REAL_TOPIC", normalTopic);
} else {
msgExt.setTopic(priorityTopic);
msgExt.setQueueId(1);
msgExt.putUserProperty("TEST_REAL_TOPIC", priorityTopic);
}
return CompletableFuture.completedFuture(Triple.of(msgExt, "", false));
});

List<MessageExtBrokerInner> savedRetryMessages = new ArrayList<>();
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> {
MessageExtBrokerInner msg = invocation.getArgument(0);
savedRetryMessages.add(msg);
return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK));
});

popReviveService.mergeAndRevive(reviveObj);

Assert.assertEquals("Both messages should be revived", 2, savedRetryMessages.size());

MessageExtBrokerInner savedNormalMsg = savedRetryMessages.stream()
.filter(m -> normalTopic.equals(m.getProperty("TEST_REAL_TOPIC")))
.findFirst().orElseThrow(() -> new AssertionError("Normal message not found"));

MessageExtBrokerInner savedPriorityMsg = savedRetryMessages.stream()
.filter(m -> priorityTopic.equals(m.getProperty("TEST_REAL_TOPIC")))
.findFirst().orElseThrow(() -> new AssertionError("Priority message not found"));

Assert.assertEquals("Normal retry messages must be blocked in shared Queue 0",
0, savedNormalMsg.getQueueId());

Assert.assertEquals("High-priority retry messages MUST be isolated to their own queue to prevent priority inversion!",
1, savedPriorityMsg.getQueueId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ public class BrokerConfig extends BrokerIdentity {

private int liteLagLatencyTopK = 50;

private boolean useSeparateRetryQueueForPriorityTopic = false;
// HashedWheelTimer config for pop order lock manager
private long popOrderLockTimerTickMs = 100;
private int popOrderLockTimerTicksPerWheel = 512;
Expand Down Expand Up @@ -2524,4 +2525,12 @@ public int getMaxMessageFilterNumForNotification() {
public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) {
this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification;
}

public boolean isUseSeparateRetryQueueForPriorityTopic() {
return useSeparateRetryQueueForPriorityTopic;
}

public void setUseSeparateRetryQueueForPriorityTopic(boolean useSeparateRetryQueueForPriorityTopic) {
this.useSeparateRetryQueueForPriorityTopic = useSeparateRetryQueueForPriorityTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,21 @@ public void testBrokerConfigAttribute() {
brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
brokerConfig.setAutoDeleteUnusedStats(true);
brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true);

assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true);
assertThat(brokerConfig.isUseSeparateRetryQueueForPriorityTopic()).isEqualTo(true);
}

@Test
public void testPriorityTopicRetryQueueConfigDefaultValue() {
BrokerConfig brokerConfig = new BrokerConfig();
assertThat(brokerConfig.isUseSeparateRetryQueueForPriorityTopic()).isFalse();
}
}