Skip to content

Commit 8f2183e

Browse files
committed
[improve][client] Pulsar client supports multi-topic messageId deserialization to ack messages
1 parent 32ad906 commit 8f2183e

4 files changed

Lines changed: 89 additions & 4 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import static org.mockito.Mockito.times;
2525
import static org.mockito.Mockito.verify;
2626
import static org.testng.Assert.assertFalse;
27+
import static org.testng.Assert.assertNull;
2728
import static org.testng.Assert.assertEquals;
2829
import static org.testng.Assert.assertTrue;
2930

3031
import com.google.common.collect.Lists;
32+
import com.google.common.collect.Range;
33+
3134
import java.util.ArrayList;
3235
import java.util.Collections;
3336
import java.util.HashMap;
@@ -42,6 +45,9 @@
4245
import java.util.stream.Collectors;
4346
import java.util.stream.IntStream;
4447
import lombok.Cleanup;
48+
49+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
50+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
4551
import org.apache.pulsar.client.admin.PulsarAdminException;
4652
import org.apache.pulsar.client.impl.ClientBuilderImpl;
4753
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -351,4 +357,66 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
351357
}
352358
consumer.close();
353359
}
360+
361+
/**
362+
* It tests acking of messageId created from byte[] and validates client acks messages successfully.
363+
* @throws Exception
364+
*/
365+
@Test
366+
public void testMultiTopicAckWithByteMessageId() throws Exception {
367+
String topicName = newTopicName();
368+
int numPartitions = 2;
369+
int numMessages = 100000;
370+
admin.topics().createPartitionedTopic(topicName, numPartitions);
371+
372+
Producer<Long>[] producers = new Producer[numPartitions];
373+
374+
for (int i = 0; i < numPartitions; i++) {
375+
producers[i] = pulsarClient.newProducer(Schema.INT64)
376+
// produce to each partition directly so that order can be maintained in sending
377+
.topic(topicName + "-partition-" + i).enableBatching(true).maxPendingMessages(30000)
378+
.maxPendingMessagesAcrossPartitions(60000).batchingMaxMessages(10000)
379+
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxBytes(4 * 1024 * 1024)
380+
.blockIfQueueFull(true).create();
381+
}
382+
383+
@Cleanup
384+
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
385+
// consume on the partitioned topic
386+
.topic(topicName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
387+
.receiverQueueSize(numMessages).subscriptionName(methodName).subscribe();
388+
389+
// produce sequence numbers to each partition topic
390+
long sequenceNumber = 1L;
391+
for (int i = 0; i < numMessages; i++) {
392+
for (Producer<Long> producer : producers) {
393+
producer.newMessage().value(sequenceNumber).sendAsync();
394+
}
395+
sequenceNumber++;
396+
}
397+
for (Producer<Long> producer : producers) {
398+
producer.flush();
399+
producer.close();
400+
}
401+
402+
// receive and validate sequences in the partitioned topic
403+
Map<String, AtomicLong> receivedSequences = new HashMap<>();
404+
int receivedCount = 0;
405+
while (receivedCount < numPartitions * numMessages) {
406+
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
407+
byte[] idByte = message.getMessageId().toByteArray();
408+
MessageId id = MessageId.fromByteArray(idByte);
409+
consumer.acknowledge(id);
410+
receivedCount++;
411+
AtomicLong receivedSequenceCounter = receivedSequences.computeIfAbsent(message.getTopicName(),
412+
k -> new AtomicLong(1L));
413+
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
414+
}
415+
Assert.assertEquals(numPartitions * numMessages, receivedCount);
416+
consumer.close();
417+
418+
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName + "-partition-0", false).get().get();
419+
Range<PositionImpl> range = topic.getManagedLedger().getCursors().iterator().next().getLastIndividualDeletedRange();
420+
assertNull(range);
421+
}
354422
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.IOException;
2727
import java.util.Objects;
2828
import javax.annotation.Nonnull;
29+
import org.apache.commons.lang3.StringUtils;
2930
import org.apache.pulsar.client.api.MessageId;
3031
import org.apache.pulsar.client.api.TopicMessageId;
3132
import org.apache.pulsar.common.api.proto.MessageIdData;
@@ -96,7 +97,7 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
9697
throw new IOException(e);
9798
}
9899

99-
MessageIdImpl messageId;
100+
MessageId messageId;
100101
if (idData.hasBatchIndex()) {
101102
if (idData.hasBatchSize()) {
102103
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
@@ -115,6 +116,11 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
115116
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
116117
}
117118

119+
if (idData.hasTopicName()) {
120+
String topicName = idData.getTopicName();
121+
messageId = new TopicMessageIdImpl(topicName, topicName, messageId);
122+
}
123+
118124
return messageId;
119125
}
120126

@@ -193,7 +199,14 @@ protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex,
193199

194200
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
195201
protected byte[] toByteArray(int batchIndex, int batchSize) {
202+
return toByteArray(batchIndex, batchSize, null);
203+
}
204+
205+
protected byte[] toByteArray(int batchIndex, int batchSize, String topicName) {
196206
MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize);
207+
if (StringUtils.isNotBlank(topicName)) {
208+
msgId.setTopicName(topicName);
209+
}
197210

198211
int size = msgId.getSerializedSize();
199212
ByteBuf serialized = Unpooled.buffer(size, size);
@@ -205,7 +218,7 @@ protected byte[] toByteArray(int batchIndex, int batchSize) {
205218
@Override
206219
public byte[] toByteArray() {
207220
// there is no message batch so we pass -1
208-
return toByteArray(-1, 0);
221+
return toByteArray(-1, 0, null);
209222
}
210223

211224
@Override

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ public String toString() {
6363

6464
@Override
6565
public byte[] toByteArray() {
66-
return messageId.toByteArray();
66+
if (messageId instanceof MessageIdImpl) {
67+
return ((MessageIdImpl) messageId).toByteArray(-1, 0, topicPartitionName);
68+
} else {
69+
return messageId.toByteArray();
70+
}
6771
}
6872

6973
@Override

pulsar-common/src/main/proto/PulsarApi.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ message MessageIdData {
6262
optional int32 batch_index = 4 [default = -1];
6363
repeated int64 ack_set = 5;
6464
optional int32 batch_size = 6;
65-
6665
// For the chunk message id, we need to specify the first chunk message id.
6766
optional MessageIdData first_chunk_message_id = 7;
67+
optional string topicName = 8;
6868
}
6969

7070
message KeyValue {

0 commit comments

Comments
 (0)