Skip to content

Commit 03ec01d

Browse files
committed
[fix][client] Move MessageIdAdv to the pulsar-common module (#20139)
(cherry picked from commit 99a68e4)
1 parent e27abe9 commit 03ec01d

13 files changed

Lines changed: 133 additions & 105 deletions

File tree

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java

Lines changed: 2 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21-
import java.util.BitSet;
21+
import org.apache.pulsar.client.internal.DefaultImplementation;
2222

2323
/**
2424
* The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
@@ -45,84 +45,6 @@ static TopicMessageId create(String topic, MessageId messageId) {
4545
if (messageId instanceof TopicMessageId) {
4646
return (TopicMessageId) messageId;
4747
}
48-
return new Impl(topic, messageId);
49-
}
50-
51-
/**
52-
* The simplest implementation of a TopicMessageId interface.
53-
*/
54-
class Impl implements MessageIdAdv, TopicMessageId {
55-
private final String topic;
56-
private final MessageIdAdv messageId;
57-
58-
public Impl(String topic, MessageId messageId) {
59-
this.topic = topic;
60-
this.messageId = (MessageIdAdv) messageId;
61-
}
62-
63-
@Override
64-
public byte[] toByteArray() {
65-
return messageId.toByteArray();
66-
}
67-
68-
@Override
69-
public String getOwnerTopic() {
70-
return topic;
71-
}
72-
73-
@Override
74-
public long getLedgerId() {
75-
return messageId.getLedgerId();
76-
}
77-
78-
@Override
79-
public long getEntryId() {
80-
return messageId.getEntryId();
81-
}
82-
83-
@Override
84-
public int getPartitionIndex() {
85-
return messageId.getPartitionIndex();
86-
}
87-
88-
@Override
89-
public int getBatchIndex() {
90-
return messageId.getBatchIndex();
91-
}
92-
93-
@Override
94-
public int getBatchSize() {
95-
return messageId.getBatchSize();
96-
}
97-
98-
@Override
99-
public BitSet getAckSet() {
100-
return messageId.getAckSet();
101-
}
102-
103-
@Override
104-
public MessageIdAdv getFirstChunkMessageId() {
105-
return messageId.getFirstChunkMessageId();
106-
}
107-
108-
@Override
109-
public int compareTo(MessageId o) {
110-
return messageId.compareTo(o);
111-
}
112-
113-
@Override
114-
public boolean equals(Object obj) {
115-
return messageId.equals(obj);
116-
}
117-
118-
@Override
119-
public int hashCode() {
120-
return messageId.hashCode();
121-
}
122-
123-
@Override
124-
public String toString() {
125-
return messageId.toString();
126-
}
48+
return DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, messageId);
12749
}
12850
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.pulsar.client.api.MessagePayloadFactory;
3838
import org.apache.pulsar.client.api.PulsarClientException;
3939
import org.apache.pulsar.client.api.Schema;
40+
import org.apache.pulsar.client.api.TopicMessageId;
4041
import org.apache.pulsar.client.api.schema.GenericRecord;
4142
import org.apache.pulsar.client.api.schema.GenericSchema;
4243
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
@@ -252,4 +253,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) {
252253

253254
SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
254255
Map<String, String> propertiesValue);
256+
257+
TopicMessageId newTopicMessageId(String topic, MessageId messageId);
255258
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2345,7 +2345,7 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
23452345
@Override
23462346
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
23472347
return getLastMessageIdAsync()
2348-
.thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId)));
2348+
.thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
23492349
}
23502350

23512351
public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)
128128
throw new IOException(e);
129129
}
130130

131-
MessageId messageId;
131+
MessageIdAdv messageId;
132132
if (idData.hasBatchIndex()) {
133133
if (idData.hasBatchSize()) {
134134
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
@@ -143,7 +143,7 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)
143143
}
144144
if (idData.getPartition() > -1 && topicName != null) {
145145
messageId = new TopicMessageIdImpl(
146-
topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId);
146+
topicName.getPartition(idData.getPartition()).toString(), messageId);
147147
}
148148

149149
return messageId;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21-
2221
import java.io.IOException;
2322
import java.nio.ByteBuffer;
2423
import java.nio.charset.Charset;
@@ -35,9 +34,11 @@
3534
import org.apache.pulsar.client.api.BatcherBuilder;
3635
import org.apache.pulsar.client.api.ClientBuilder;
3736
import org.apache.pulsar.client.api.MessageId;
37+
import org.apache.pulsar.client.api.MessageIdAdv;
3838
import org.apache.pulsar.client.api.MessagePayloadFactory;
3939
import org.apache.pulsar.client.api.PulsarClientException;
4040
import org.apache.pulsar.client.api.Schema;
41+
import org.apache.pulsar.client.api.TopicMessageId;
4142
import org.apache.pulsar.client.api.schema.GenericRecord;
4243
import org.apache.pulsar.client.api.schema.GenericSchema;
4344
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
@@ -387,4 +388,19 @@ public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type,
387388
Map<String, String> propertiesValue) {
388389
return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue);
389390
}
391+
392+
@Override
393+
public TopicMessageId newTopicMessageId(String topic, MessageId messageId) {
394+
final MessageIdAdv messageIdAdv;
395+
if (messageId instanceof MessageIdAdv) {
396+
messageIdAdv = (MessageIdAdv) messageId;
397+
} else {
398+
try {
399+
messageIdAdv = (MessageIdAdv) MessageId.fromByteArray(messageId.toByteArray());
400+
} catch (IOException e) {
401+
throw new RuntimeException(e);
402+
}
403+
}
404+
return new TopicMessageIdImpl(topic, messageIdAdv);
405+
}
390406
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,27 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import java.util.BitSet;
2122
import org.apache.pulsar.client.api.MessageId;
23+
import org.apache.pulsar.client.api.MessageIdAdv;
2224
import org.apache.pulsar.client.api.TopicMessageId;
2325

24-
public class TopicMessageIdImpl extends TopicMessageId.Impl {
26+
public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {
2527

26-
private final String topicName;
28+
private final String ownerTopic;
29+
private final MessageIdAdv msgId;
30+
private final String topicName; // it's never used
2731

32+
public TopicMessageIdImpl(String topic, MessageIdAdv msgId) {
33+
this.ownerTopic = topic;
34+
this.msgId = msgId;
35+
this.topicName = "";
36+
}
37+
38+
@Deprecated
2839
public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
29-
super(topicPartitionName, messageId);
40+
this.msgId = (MessageIdAdv) messageId;
41+
this.ownerTopic = topicPartitionName;
3042
this.topicName = topicName;
3143
}
3244

@@ -55,11 +67,66 @@ public MessageId getInnerMessageId() {
5567

5668
@Override
5769
public boolean equals(Object obj) {
58-
return super.equals(obj);
70+
return msgId.equals(obj);
5971
}
6072

6173
@Override
6274
public int hashCode() {
63-
return super.hashCode();
75+
return msgId.hashCode();
76+
}
77+
78+
@Override
79+
public int compareTo(MessageId o) {
80+
return msgId.compareTo(o);
81+
}
82+
83+
@Override
84+
public byte[] toByteArray() {
85+
return msgId.toByteArray();
86+
}
87+
88+
@Override
89+
public String getOwnerTopic() {
90+
return ownerTopic;
91+
}
92+
93+
@Override
94+
public long getLedgerId() {
95+
return msgId.getLedgerId();
96+
}
97+
98+
@Override
99+
public long getEntryId() {
100+
return msgId.getEntryId();
101+
}
102+
103+
@Override
104+
public int getPartitionIndex() {
105+
return msgId.getPartitionIndex();
106+
}
107+
108+
@Override
109+
public int getBatchIndex() {
110+
return msgId.getBatchIndex();
111+
}
112+
113+
@Override
114+
public int getBatchSize() {
115+
return msgId.getBatchSize();
116+
}
117+
118+
@Override
119+
public BitSet getAckSet() {
120+
return msgId.getAckSet();
121+
}
122+
123+
@Override
124+
public MessageIdAdv getFirstChunkMessageId() {
125+
return msgId.getFirstChunkMessageId();
126+
}
127+
128+
@Override
129+
public String toString() {
130+
return msgId.toString();
64131
}
65132
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Optional;
2323
import org.apache.pulsar.client.api.Message;
2424
import org.apache.pulsar.client.api.MessageId;
25+
import org.apache.pulsar.client.api.MessageIdAdv;
2526
import org.apache.pulsar.client.api.Schema;
2627
import org.apache.pulsar.common.api.EncryptionContext;
2728

@@ -42,7 +43,7 @@ public class TopicMessageImpl<T> implements Message<T> {
4243
this.receivedByconsumer = receivedByConsumer;
4344

4445
this.msg = msg;
45-
this.messageId = new TopicMessageIdImpl(topicPartitionName, topicPartitionName, msg.getMessageId());
46+
this.messageId = new TopicMessageIdImpl(topicPartitionName, (MessageIdAdv) msg.getMessageId());
4647
}
4748

4849
/**

pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,12 @@ public void testMessageIdImplCompareToTopicMessageId() {
148148
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
149149
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
150150
"test-topic-partition-0",
151-
"test-topic",
152151
new BatchMessageIdImpl(123L, 345L, 566, 789));
153152
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
154153
"test-topic-partition-0",
155-
"test-topic",
156154
new BatchMessageIdImpl(123L, 345L, 567, 789));
157155
TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
158156
"test-topic-partition-0",
159-
"test-topic",
160157
new BatchMessageIdImpl(messageIdImpl));
161158
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than");
162159
assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than");
@@ -173,11 +170,9 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
173170
BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1);
174171
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
175172
"test-topic-partition-0",
176-
"test-topic",
177173
new MessageIdImpl(123L, 345L, 566));
178174
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
179175
"test-topic-partition-0",
180-
"test-topic",
181176
new MessageIdImpl(123L, 345L, 567));
182177
assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than");
183178
assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than");

pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ public class TopicMessageIdImplTest {
2828
public void hashCodeTest() {
2929
MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
3030
MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
31-
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
32-
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
33-
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
31+
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", msgId1);
32+
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", msgId1);
33+
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", msgId2);
3434

3535
assertEquals(topicMsgId1.hashCode(), topicMsgId1.hashCode());
3636
assertEquals(topic2MsgId1.hashCode(), topic2MsgId1.hashCode());
@@ -43,9 +43,9 @@ public void hashCodeTest() {
4343
public void equalsTest() {
4444
MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
4545
MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
46-
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
47-
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
48-
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
46+
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", msgId1);
47+
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", msgId1);
48+
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", msgId2);
4949

5050
assertEquals(topicMsgId1, topicMsgId1);
5151
assertEquals(topicMsgId1, topic2MsgId1);

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java renamed to pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java

File renamed without changes.

0 commit comments

Comments
 (0)