Skip to content

Commit 0413eb8

Browse files
BewareMyPowerTechnoboy-
authored andcommitted
[fix][client] Fix NPE when acknowledging multiple messages (#19874)
1 parent f0b4a71 commit 0413eb8

2 files changed

Lines changed: 50 additions & 2 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,43 @@ private void sendMessagesAsyncAndWait(Producer<String> producer, int messages) t
111111
latch.await();
112112
}
113113

114+
@Test(timeOut = 30000)
115+
public void testAckMessageInAnotherTopic() throws Exception {
116+
final String[] topics = {
117+
"persistent://my-property/my-ns/test-ack-message-in-other-topic1" + UUID.randomUUID(),
118+
"persistent://my-property/my-ns/test-ack-message-in-other-topic2" + UUID.randomUUID(),
119+
"persistent://my-property/my-ns/test-ack-message-in-other-topic3" + UUID.randomUUID()
120+
};
121+
@Cleanup final Consumer<String> allTopicsConsumer = pulsarClient.newConsumer(Schema.STRING)
122+
.topic(topics)
123+
.subscriptionName("sub1")
124+
.subscribe();
125+
Consumer<String> partialTopicsConsumer = pulsarClient.newConsumer(Schema.STRING)
126+
.topic(topics[0], topics[1])
127+
.subscriptionName("sub2")
128+
.subscribe();
129+
for (int i = 0; i < topics.length; i++) {
130+
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
131+
.topic(topics[i])
132+
.create();
133+
producer.send("msg-" + i);
134+
producer.close();
135+
}
136+
final List<MessageId> messageIdList = new ArrayList<>();
137+
for (int i = 0; i < topics.length; i++) {
138+
messageIdList.add(allTopicsConsumer.receive().getMessageId());
139+
}
140+
try {
141+
partialTopicsConsumer.acknowledge(messageIdList);
142+
Assert.fail();
143+
} catch (PulsarClientException.NotConnectedException ignored) {
144+
}
145+
partialTopicsConsumer.close();
146+
partialTopicsConsumer = pulsarClient.newConsumer(Schema.STRING).topic(topics[0])
147+
.subscriptionName("sub2").subscribe();
148+
pulsarClient.newProducer(Schema.STRING).topic(topics[0]).create().send("done");
149+
final Message<String> msg = partialTopicsConsumer.receive();
150+
Assert.assertEquals(msg.getValue(), "msg-0");
151+
partialTopicsConsumer.close();
152+
}
114153
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Comparator;
3333
import java.util.HashMap;
3434
import java.util.HashSet;
35+
import java.util.IdentityHashMap;
3536
import java.util.Iterator;
3637
import java.util.List;
3738
import java.util.Map;
@@ -539,8 +540,16 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList,
539540
topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>());
540541
topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId());
541542
}
542-
topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> {
543-
ConsumerImpl<T> consumer = consumers.get(topicPartitionName);
543+
final Map<ConsumerImpl<T>, List<MessageId>> consumerToMessageIds = new IdentityHashMap<>();
544+
for (Map.Entry<String, List<MessageId>> entry : topicToMessageIdMap.entrySet()) {
545+
ConsumerImpl<T> consumer = consumers.get(entry.getKey());
546+
if (consumer == null) {
547+
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
548+
}
549+
// Trigger the acknowledgment later to avoid sending partial acknowledgments
550+
consumerToMessageIds.put(consumer, entry.getValue());
551+
}
552+
consumerToMessageIds.forEach((consumer, messageIds) -> {
544553
resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn)
545554
.thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove)));
546555
});

0 commit comments

Comments
 (0)