Skip to content

Commit 32b171a

Browse files
committed
[fix][client] Move MessageIdAdv to the pulsar-common module
### Motivation apache#19414 does not follow the design of apache#18950 > Since the aimed developers are Pulsar core developers, it's added in > the pulsar-common module (PulsarApi.proto is also in this module), not > the pulsar-client-api module. The reason is that `TopicMessageId#create` now cannot be a `MessageIdAdv` if `MessageIdAdv` is not in the `pulsar-client-api` module. ### Modifications - Move the `MessageIdAdv` class to the `pulsar-common` module. - To handle the instance created by `TopicMessageId` well, add `MessageIdAdvUtils#convert` to add an extra deserialization and serialization of `MessageId`.
1 parent 9b72302 commit 32b171a

7 files changed

Lines changed: 103 additions & 92 deletions

File tree

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java

Lines changed: 17 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21-
import java.util.BitSet;
22-
2321
/**
2422
* The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
2523
*
@@ -45,84 +43,22 @@ static TopicMessageId create(String topic, MessageId messageId) {
4543
if (messageId instanceof TopicMessageId) {
4644
return (TopicMessageId) messageId;
4745
}
48-
return new Impl(topic, messageId);
49-
}
50-
51-
/**
52-
* The simplest implementation of a TopicMessageId interface.
53-
*/
54-
class Impl implements MessageIdAdv, TopicMessageId {
55-
private final String topic;
56-
private final MessageIdAdv messageId;
57-
58-
public Impl(String topic, MessageId messageId) {
59-
this.topic = topic;
60-
this.messageId = (MessageIdAdv) messageId;
61-
}
62-
63-
@Override
64-
public byte[] toByteArray() {
65-
return messageId.toByteArray();
66-
}
67-
68-
@Override
69-
public String getOwnerTopic() {
70-
return topic;
71-
}
72-
73-
@Override
74-
public long getLedgerId() {
75-
return messageId.getLedgerId();
76-
}
77-
78-
@Override
79-
public long getEntryId() {
80-
return messageId.getEntryId();
81-
}
82-
83-
@Override
84-
public int getPartitionIndex() {
85-
return messageId.getPartitionIndex();
86-
}
87-
88-
@Override
89-
public int getBatchIndex() {
90-
return messageId.getBatchIndex();
91-
}
92-
93-
@Override
94-
public int getBatchSize() {
95-
return messageId.getBatchSize();
96-
}
97-
98-
@Override
99-
public BitSet getAckSet() {
100-
return messageId.getAckSet();
101-
}
102-
103-
@Override
104-
public MessageIdAdv getFirstChunkMessageId() {
105-
return messageId.getFirstChunkMessageId();
106-
}
107-
108-
@Override
109-
public int compareTo(MessageId o) {
110-
return messageId.compareTo(o);
111-
}
112-
113-
@Override
114-
public boolean equals(Object obj) {
115-
return messageId.equals(obj);
116-
}
117-
118-
@Override
119-
public int hashCode() {
120-
return messageId.hashCode();
121-
}
122-
123-
@Override
124-
public String toString() {
125-
return messageId.toString();
126-
}
46+
return new TopicMessageId() {
47+
48+
@Override
49+
public String getOwnerTopic() {
50+
return topic;
51+
}
52+
53+
@Override
54+
public byte[] toByteArray() {
55+
return messageId.toByteArray();
56+
}
57+
58+
@Override
59+
public int compareTo(MessageId o) {
60+
return messageId.compareTo(o);
61+
}
62+
};
12763
}
12864
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2345,7 +2345,7 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
23452345
@Override
23462346
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
23472347
return getLastMessageIdAsync()
2348-
.thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId)));
2348+
.thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
23492349
}
23502350

23512351
public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import java.io.IOException;
2122
import java.util.BitSet;
2223
import org.apache.pulsar.client.api.MessageId;
2324
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -71,4 +72,16 @@ static MessageIdAdv discardBatch(MessageId messageId) {
7172
static MessageIdAdv prevMessageId(MessageIdAdv msgId) {
7273
return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId() - 1, msgId.getPartitionIndex());
7374
}
75+
76+
static MessageIdAdv convert(MessageId msgId) {
77+
if (msgId instanceof MessageIdAdv) {
78+
return (MessageIdAdv) msgId;
79+
} else {
80+
try {
81+
return (MessageIdAdv) MessageId.fromByteArray(msgId.toByteArray());
82+
} catch (IOException e) {
83+
throw new IllegalArgumentException(e);
84+
}
85+
}
86+
}
7487
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.google.common.collect.Lists;
2727
import io.netty.util.Timeout;
2828
import io.netty.util.TimerTask;
29+
30+
import java.io.IOException;
2931
import java.util.ArrayList;
3032
import java.util.Collection;
3133
import java.util.Collections;
@@ -458,9 +460,9 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
458460
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
459461
}
460462
if (ackType == AckType.Cumulative) {
461-
return consumer.acknowledgeCumulativeAsync(messageId);
463+
return consumer.acknowledgeCumulativeAsync(MessageIdAdvUtils.convert(messageId));
462464
} else {
463-
return consumer.doAcknowledgeWithTxn(messageId, ackType, properties, txnImpl)
465+
return consumer.doAcknowledgeWithTxn(MessageIdAdvUtils.convert(messageId), ackType, properties, txnImpl)
464466
.thenRun(() -> unAckedMessageTracker.remove(messageId));
465467
}
466468
}
@@ -488,7 +490,7 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList,
488490
for (MessageId messageId : messageIdList) {
489491
String ownerTopic = ((TopicMessageId) messageId).getOwnerTopic();
490492
topicToMessageIdMap.putIfAbsent(ownerTopic, new ArrayList<>());
491-
topicToMessageIdMap.get(ownerTopic).add(messageId);
493+
topicToMessageIdMap.get(ownerTopic).add(MessageIdAdvUtils.convert(messageId));
492494
}
493495
final Map<ConsumerImpl<T>, List<MessageId>> consumerToMessageIds = new IdentityHashMap<>();
494496
for (Map.Entry<String, List<MessageId>> entry : topicToMessageIdMap.entrySet()) {
@@ -773,13 +775,14 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
773775
);
774776
}
775777

778+
final MessageIdAdv messageIdAdv = MessageIdAdvUtils.convert(messageId);
776779
final CompletableFuture<Void> seekFuture;
777780
if (internalConsumer == null) {
778781
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
779-
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(messageId)));
782+
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(messageIdAdv)));
780783
seekFuture = FutureUtil.waitForAll(futures);
781784
} else {
782-
seekFuture = internalConsumer.seekAsync(messageId);
785+
seekFuture = internalConsumer.seekAsync(messageIdAdv);
783786
}
784787

785788
unAckedMessageTracker.clear();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,27 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import java.util.BitSet;
2122
import org.apache.pulsar.client.api.MessageId;
23+
import org.apache.pulsar.client.api.MessageIdAdv;
2224
import org.apache.pulsar.client.api.TopicMessageId;
2325

24-
public class TopicMessageIdImpl extends TopicMessageId.Impl {
26+
public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {
2527

26-
private final String topicName;
28+
private final String ownerTopic;
29+
private final MessageIdAdv msgId;
30+
private final String topicName; // it's never used
2731

32+
public TopicMessageIdImpl(String topic, MessageIdAdv msgId) {
33+
this.ownerTopic = topic;
34+
this.msgId = msgId;
35+
this.topicName = "";
36+
}
37+
38+
@Deprecated
2839
public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
29-
super(topicPartitionName, messageId);
40+
this.msgId = (MessageIdAdv) messageId;
41+
this.ownerTopic = topicPartitionName;
3042
this.topicName = topicName;
3143
}
3244

@@ -62,4 +74,49 @@ public boolean equals(Object obj) {
6274
public int hashCode() {
6375
return super.hashCode();
6476
}
77+
78+
@Override
79+
public byte[] toByteArray() {
80+
return msgId.toByteArray();
81+
}
82+
83+
@Override
84+
public String getOwnerTopic() {
85+
return ownerTopic;
86+
}
87+
88+
@Override
89+
public long getLedgerId() {
90+
return msgId.getLedgerId();
91+
}
92+
93+
@Override
94+
public long getEntryId() {
95+
return msgId.getEntryId();
96+
}
97+
98+
@Override
99+
public int getPartitionIndex() {
100+
return msgId.getPartitionIndex();
101+
}
102+
103+
@Override
104+
public int getBatchIndex() {
105+
return msgId.getBatchIndex();
106+
}
107+
108+
@Override
109+
public int getBatchSize() {
110+
return msgId.getBatchSize();
111+
}
112+
113+
@Override
114+
public BitSet getAckSet() {
115+
return msgId.getAckSet();
116+
}
117+
118+
@Override
119+
public MessageIdAdv getFirstChunkMessageId() {
120+
return msgId.getFirstChunkMessageId();
121+
}
65122
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java renamed to pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java

File renamed without changes.

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@
3838
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
3939
import org.apache.pulsar.client.api.DeadLetterPolicy;
4040
import org.apache.pulsar.client.api.MessageId;
41+
import org.apache.pulsar.client.api.MessageIdAdv;
4142
import org.apache.pulsar.client.api.PulsarClient;
4243
import org.apache.pulsar.client.api.PulsarClientException;
4344
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
4445
import org.apache.pulsar.client.api.SubscriptionMode;
4546
import org.apache.pulsar.client.api.SubscriptionType;
4647
import org.apache.pulsar.client.api.TopicMessageId;
4748
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
49+
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
4850
import org.apache.pulsar.common.util.Codec;
4951
import org.apache.pulsar.common.util.DateFormatter;
5052
import org.apache.pulsar.websocket.data.ConsumerCommand;
@@ -293,8 +295,8 @@ private void checkResumeReceive() {
293295

294296
private void handleAck(ConsumerCommand command) throws IOException {
295297
// We should have received an ack
296-
TopicMessageId msgId = TopicMessageId.create(topic.toString(),
297-
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
298+
TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
299+
(MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
298300
if (log.isDebugEnabled()) {
299301
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
300302
subscription, msgId, getRemote().getInetSocketAddress().toString());

0 commit comments

Comments
 (0)