Skip to content

Commit 3ac0ce6

Browse files
committed
Deprecate getInnerMessageId
1 parent 92a6962 commit 3ac0ce6

14 files changed

Lines changed: 93 additions & 123 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@
4040
import org.apache.pulsar.client.api.ConsumerEventListener;
4141
import org.apache.pulsar.client.api.Message;
4242
import org.apache.pulsar.client.api.MessageId;
43+
import org.apache.pulsar.client.api.MessageIdAdv;
4344
import org.apache.pulsar.client.api.MessageRoutingMode;
4445
import org.apache.pulsar.client.api.Producer;
4546
import org.apache.pulsar.client.api.PulsarClientException;
4647
import org.apache.pulsar.client.api.SubscriptionType;
47-
import org.apache.pulsar.client.impl.MessageIdImpl;
48-
import org.apache.pulsar.client.impl.TopicMessageImpl;
4948
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5049
import org.apache.pulsar.common.naming.TopicName;
5150
import org.apache.pulsar.common.util.FutureUtil;
@@ -337,7 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
337336
}
338337
totalMessages++;
339338
consumer1.acknowledge(msg);
340-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
339+
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
341340
receivedPtns.add(msgId.getPartitionIndex());
342341
}
343342

@@ -354,7 +353,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
354353
}
355354
totalMessages++;
356355
consumer2.acknowledge(msg);
357-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
356+
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
358357
receivedPtns.add(msgId.getPartitionIndex());
359358
}
360359
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.pulsar.client.api.SubscriptionType;
5151
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5252
import org.apache.pulsar.client.impl.MessageIdImpl;
53-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
5453
import org.apache.pulsar.common.naming.TopicName;
5554
import org.apache.pulsar.common.util.RelativeTimeUtil;
5655
import org.awaitility.Awaitility;
@@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
679678
if (message == null) {
680679
break;
681680
}
682-
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
683-
received.add(topicMessageId.getInnerMessageId());
681+
received.add(message.getMessageId());
684682
}
685683
int msgNumFromPartition1 = list.size() / 2;
686684
int msgNumFromPartition2 = 1;

pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.pulsar.client.impl.MessageIdImpl;
4040
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
4141
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
42-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
4342
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
4443
import org.apache.pulsar.common.naming.TopicName;
4544
import org.awaitility.Awaitility;
@@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {
768767

769768
for (int i = 0; i < totalMessages; i ++) {
770769
msg = consumer1.receive(5, TimeUnit.SECONDS);
771-
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
770+
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
772771
consumer1.acknowledge(msg);
773772
}
774773

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
118118
Message<byte[]> message = consumer.receive();
119119
assertEquals(new String(message.getData()), messagePrefix + i);
120120
MessageId messageId = message.getMessageId();
121-
if (topicType == TopicType.PARTITIONED) {
122-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
123-
}
124121
assertTrue(messageIds.remove(messageId), "Failed to receive message");
125122
}
126123
log.info("Remaining message IDs = {}", messageIds);
@@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls
166163

167164
for (int i = 0; i < numberOfMessages; i++) {
168165
MessageId messageId = consumer.receive().getMessageId();
169-
if (topicType == TopicType.PARTITIONED) {
170-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
171-
}
172166
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
173167
}
174168
log.info("Remaining message IDs = {}", messageIds);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424
import org.apache.pulsar.client.api.MessageId;
25+
import org.apache.pulsar.client.api.MessageIdAdv;
2526
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
2627

2728
/**
@@ -31,7 +32,7 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
3132

3233
boolean isDuplicate(MessageId messageId);
3334

34-
CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
35+
CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties);
3536

3637
CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
3738
Map<String, Long> properties);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
177177
private final TopicName topicName;
178178
private final String topicNameWithoutPartition;
179179

180-
private final Map<MessageIdImpl, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
180+
private final Map<MessageIdAdv, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
181181

182182
private final DeadLetterPolicy deadLetterPolicy;
183183

@@ -530,7 +530,6 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
530530
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
531531
Map<String, Long> properties,
532532
TransactionImpl txn) {
533-
checkArgument(messageId instanceof MessageIdImpl);
534533
if (getState() != State.Ready && getState() != State.Connecting) {
535534
stats.incrementNumAcksFailed();
536535
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
@@ -546,7 +545,7 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
546545
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
547546
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
548547
}
549-
return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
548+
return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdAdv) messageId, ackType, properties);
550549
}
551550

552551
@Override
@@ -587,10 +586,6 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
587586
.InvalidMessageException("Cannot handle message with null messageId"));
588587
}
589588

590-
if (messageId instanceof TopicMessageIdImpl) {
591-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
592-
}
593-
checkArgument(messageId instanceof MessageIdImpl);
594589
if (getState() != State.Ready && getState() != State.Connecting) {
595590
stats.incrementNumAcksFailed();
596591
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
@@ -626,7 +621,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
626621
if (retryLetterProducer != null) {
627622
try {
628623
MessageImpl<T> retryMessage = (MessageImpl<T>) getMessageImpl(message);
629-
String originMessageIdStr = getOriginMessageIdStr(message);
624+
String originMessageIdStr = message.getMessageId().toString();
630625
String originTopicNameStr = getOriginTopicNameStr(message);
631626
SortedMap<String, String> propertiesMap =
632627
getPropertiesMap(message, originMessageIdStr, originTopicNameStr);
@@ -718,15 +713,6 @@ private SortedMap<String, String> getPropertiesMap(Message<?> message,
718713
return propertiesMap;
719714
}
720715

721-
private String getOriginMessageIdStr(Message<?> message) {
722-
if (message instanceof TopicMessageImpl) {
723-
return ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
724-
} else if (message instanceof MessageImpl) {
725-
return message.getMessageId().toString();
726-
}
727-
return null;
728-
}
729-
730716
private String getOriginTopicNameStr(Message<?> message) {
731717
if (message instanceof TopicMessageImpl) {
732718
return ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
@@ -2005,7 +1991,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId)
20051991
MessageIdImpl finalMessageId = messageId;
20061992
deadLetterProducer.thenAcceptAsync(producerDLQ -> {
20071993
for (MessageImpl<T> message : finalDeadLetterMessages) {
2008-
String originMessageIdStr = getOriginMessageIdStr(message);
1994+
String originMessageIdStr = message.getMessageId().toString();
20091995
String originTopicNameStr = getOriginTopicNameStr(message);
20101996
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
20111997
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
@@ -2178,24 +2164,24 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
21782164
String seekBy = String.format("the message %s", messageId.toString());
21792165
return seekAsyncCheckState(seekBy).orElseGet(() -> {
21802166
long requestId = client.newRequestId();
2181-
ByteBuf seek = null;
2182-
if (messageId instanceof BatchMessageIdImpl) {
2183-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
2184-
// Initialize ack set
2185-
BitSetRecyclable ackSet = BitSetRecyclable.create();
2186-
ackSet.set(0, msgId.getBatchSize());
2187-
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
2188-
long[] ackSetArr = ackSet.toLongArray();
2189-
ackSet.recycle();
2190-
2191-
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
2192-
} else if (messageId instanceof ChunkMessageIdImpl) {
2193-
ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId;
2194-
seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(),
2195-
msgId.getFirstChunkMessageId().getEntryId(), new long[0]);
2167+
final MessageIdAdv msgId = (MessageIdAdv) messageId;
2168+
final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
2169+
final ByteBuf seek;
2170+
if (msgId.getFirstChunkMessageId() != null) {
2171+
seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
2172+
firstChunkMsgId.getEntryId(), new long[0]);
21962173
} else {
2197-
MessageIdImpl msgId = (MessageIdImpl) messageId;
2198-
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
2174+
final long[] ackSetArr;
2175+
if (msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0) {
2176+
final BitSetRecyclable ackSet = BitSetRecyclable.create();
2177+
ackSet.set(0, msgId.getBatchSize());
2178+
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
2179+
ackSetArr = ackSet.toLongArray();
2180+
ackSet.recycle();
2181+
} else {
2182+
ackSetArr = new long[0];
2183+
}
2184+
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
21992185
}
22002186
return seekAsyncInternal(requestId, seek, messageId, seekBy);
22012187
});
@@ -2227,9 +2213,8 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
22272213
}
22282214

22292215
future.thenAccept(response -> {
2230-
MessageIdImpl lastMessageId = MessageIdImpl.convertToMessageIdImpl(response.lastMessageId);
2231-
MessageIdImpl markDeletePosition = MessageIdImpl
2232-
.convertToMessageIdImpl(response.markDeletePosition);
2216+
MessageIdAdv lastMessageId = (MessageIdAdv) response.lastMessageId;
2217+
MessageIdAdv markDeletePosition = (MessageIdAdv) response.markDeletePosition;
22332218

22342219
if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0
22352220
&& markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) {
@@ -2766,7 +2751,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<Message
27662751
return cnx().newAckForReceipt(cmd, requestId);
27672752
}
27682753

2769-
public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
2754+
public Map<MessageIdAdv, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
27702755
return possibleSendToDeadLetterTopicMessages;
27712756
}
27722757

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
117117
return messageId;
118118
}
119119

120+
@Deprecated
120121
public static MessageIdImpl convertToMessageIdImpl(MessageId messageId) {
121122
if (messageId instanceof BatchMessageIdImpl) {
122123
return (BatchMessageIdImpl) messageId;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.pulsar.client.api.ConsumerStats;
5555
import org.apache.pulsar.client.api.Message;
5656
import org.apache.pulsar.client.api.MessageId;
57+
import org.apache.pulsar.client.api.MessageIdAdv;
5758
import org.apache.pulsar.client.api.Messages;
5859
import org.apache.pulsar.client.api.PulsarClientException;
5960
import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
@@ -98,7 +99,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
9899
private final MultiTopicConsumerStatsRecorderImpl stats;
99100
private final ConsumerConfigurationData<T> internalConfig;
100101

101-
private volatile BatchMessageIdImpl startMessageId = null;
102+
private volatile MessageIdAdv startMessageId;
102103
private final long startMessageRollbackDurationInSec;
103104
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
104105
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
@@ -137,9 +138,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
137138
this.consumers = new ConcurrentHashMap<>();
138139
this.pausedConsumers = new ConcurrentLinkedQueue<>();
139140
this.allTopicPartitionsNumber = new AtomicInteger(0);
140-
this.startMessageId = startMessageId != null
141-
? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
142-
: null;
141+
this.startMessageId = (MessageIdAdv) startMessageId;
143142
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
144143
this.paused = conf.isStartPaused();
145144

@@ -451,18 +450,16 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
451450
}
452451

453452
if (ackType == AckType.Cumulative) {
454-
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
453+
Consumer<T> individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
455454
if (individualConsumer != null) {
456-
MessageId innerId = topicMessageId.getInnerMessageId();
457-
return individualConsumer.acknowledgeCumulativeAsync(innerId);
455+
return individualConsumer.acknowledgeCumulativeAsync(topicMessageId);
458456
} else {
459457
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
460458
}
461459
} else {
462460
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
463461

464-
MessageId innerId = topicMessageId.getInnerMessageId();
465-
return consumer.doAcknowledgeWithTxn(innerId, ackType, properties, txnImpl)
462+
return consumer.doAcknowledgeWithTxn(topicMessageId, ackType, properties, txnImpl)
466463
.thenRun(() ->
467464
unAckedMessageTracker.remove(topicMessageId));
468465
}
@@ -489,7 +486,7 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList,
489486
}
490487
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
491488
topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>());
492-
topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId());
489+
topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId);
493490
}
494491
topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> {
495492
ConsumerImpl<T> consumer = consumers.get(topicPartitionName);
@@ -536,7 +533,7 @@ public void negativeAcknowledge(MessageId messageId) {
536533
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
537534

538535
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
539-
consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
536+
consumer.negativeAcknowledge(topicMessageId);
540537
}
541538

542539
@Override
@@ -689,12 +686,11 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
689686
return;
690687
}
691688
removeExpiredMessagesFromQueue(messageIds);
692-
messageIds.stream().map(messageId -> (TopicMessageIdImpl) messageId)
693-
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet()))
694-
.forEach((topicName, messageIds1) ->
695-
consumers.get(topicName)
696-
.redeliverUnacknowledgedMessages(messageIds1.stream()
697-
.map(mid -> mid.getInnerMessageId()).collect(Collectors.toSet())));
689+
messageIds.stream()
690+
.collect(Collectors.groupingBy(
691+
msgId -> ((TopicMessageIdImpl) msgId).getTopicPartitionName(), Collectors.toSet()))
692+
.forEach((topicName, messageIds1) ->
693+
consumers.get(topicName).redeliverUnacknowledgedMessages(messageIds1));
698694
resumeReceivingFromPausedConsumersIfNeeded();
699695
}
700696

@@ -748,7 +744,7 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
748744

749745
@Override
750746
public CompletableFuture<Void> seekAsync(MessageId messageId) {
751-
MessageIdImpl targetMessageId = MessageIdImpl.convertToMessageIdImpl(messageId);
747+
MessageIdAdv targetMessageId = (MessageIdAdv) messageId;
752748
if (targetMessageId == null || isIllegalMultiTopicsMessageId(messageId)) {
753749
return FutureUtil.failedFuture(
754750
new PulsarClientException("Illegal messageId, messageId can only be earliest/latest")

0 commit comments

Comments
 (0)