Skip to content

Commit 3f3d7bf

Browse files
codelipenghuiTechnoboy-
authored andcommitted
[fix][broker] Fix the behavior of delayed message in Key_Shared mode (#20233)
1 parent 2fb9366 commit 3f3d7bf

2 files changed

Lines changed: 81 additions & 5 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,14 +1048,15 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
10481048
}
10491049

10501050
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
1051-
if (!redeliveryMessages.isEmpty()) {
1052-
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
1053-
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
1051+
if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
10541052
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
10551053
NavigableSet<PositionImpl> messagesAvailableNow =
10561054
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
10571055
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
1058-
return messagesAvailableNow;
1056+
}
1057+
1058+
if (!redeliveryMessages.isEmpty()) {
1059+
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
10591060
} else {
10601061
return Collections.emptyNavigableSet();
10611062
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public Object[][] topicDomainProvider() {
112112
@BeforeMethod(alwaysRun = true)
113113
@Override
114114
protected void setup() throws Exception {
115-
super.resetConfig();
115+
this.conf.setUnblockStuckSubscriptionEnabled(true);
116116
super.internalSetup();
117117
super.producerBaseSetup();
118118
this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
@@ -1524,4 +1524,79 @@ public void testStickyKeyRangesRestartConsumers() throws PulsarClientException,
15241524
count3.await();
15251525
assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages);
15261526
}
1527+
1528+
@Test
1529+
public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
1530+
int delayedMessages = 40;
1531+
int messages = 40;
1532+
int sum = 0;
1533+
final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
1534+
final String subName = "my-sub";
1535+
1536+
@Cleanup
1537+
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
1538+
.topic(topic)
1539+
.subscriptionName(subName)
1540+
.receiverQueueSize(10)
1541+
.subscriptionType(SubscriptionType.Key_Shared)
1542+
.subscribe();
1543+
1544+
@Cleanup
1545+
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
1546+
.topic(topic)
1547+
.create();
1548+
1549+
for (int i = 0; i < delayedMessages; i++) {
1550+
MessageId messageId = producer.newMessage()
1551+
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
1552+
.value(100 + i)
1553+
.deliverAfter(10, TimeUnit.SECONDS)
1554+
.send();
1555+
log.info("Published delayed message :{}", messageId);
1556+
}
1557+
1558+
for (int i = 0; i < messages; i++) {
1559+
MessageId messageId = producer.newMessage()
1560+
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
1561+
.value(i)
1562+
.send();
1563+
log.info("Published message :{}", messageId);
1564+
}
1565+
1566+
@Cleanup
1567+
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
1568+
.topic(topic)
1569+
.subscriptionName(subName)
1570+
.receiverQueueSize(30)
1571+
.subscriptionType(SubscriptionType.Key_Shared)
1572+
.subscribe();
1573+
1574+
for (int i = 0; i < delayedMessages + messages; i++) {
1575+
Message<Integer> msg = consumer1.receive(30, TimeUnit.SECONDS);
1576+
if (msg != null) {
1577+
log.info("c1 message: {}, {}", msg.getValue(), msg.getMessageId());
1578+
consumer1.acknowledge(msg);
1579+
} else {
1580+
break;
1581+
}
1582+
sum++;
1583+
}
1584+
1585+
log.info("Got {} messages...", sum);
1586+
1587+
int remaining = delayedMessages + messages - sum;
1588+
for (int i = 0; i < remaining; i++) {
1589+
Message<Integer> msg = consumer2.receive(30, TimeUnit.SECONDS);
1590+
if (msg != null) {
1591+
log.info("c2 message: {}, {}", msg.getValue(), msg.getMessageId());
1592+
consumer2.acknowledge(msg);
1593+
} else {
1594+
break;
1595+
}
1596+
sum++;
1597+
}
1598+
1599+
log.info("Got {} other messages...", sum);
1600+
Assert.assertEquals(sum, delayedMessages + messages);
1601+
}
15271602
}

0 commit comments

Comments
 (0)