Skip to content

Commit 7be139d

Browse files
committed
[fix][client] Move MessageIdAdv to the pulsar-common module
### Motivation apache#19414 does not follow the design of apache#18950 > Since the aimed developers are Pulsar core developers, it's added in > the pulsar-common module (PulsarApi.proto is also in this module), not > the pulsar-client-api module. The reason is that `TopicMessageId#create` now cannot be a `MessageIdAdv` if `MessageIdAdv` is not in the `pulsar-client-api` module. ### Modifications - Move the `MessageIdAdv` class to the `pulsar-common` module. - Create a `TopicMessageIdImpl` instance for `TopicMessageId#create` via the `DefaultImplementation` class with the overhead of reflection.
1 parent 9b72302 commit 7be139d

8 files changed

Lines changed: 109 additions & 87 deletions

File tree

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java

Lines changed: 2 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21-
import java.util.BitSet;
21+
import org.apache.pulsar.client.internal.DefaultImplementation;
2222

2323
/**
2424
* The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
@@ -45,84 +45,6 @@ static TopicMessageId create(String topic, MessageId messageId) {
4545
if (messageId instanceof TopicMessageId) {
4646
return (TopicMessageId) messageId;
4747
}
48-
return new Impl(topic, messageId);
49-
}
50-
51-
/**
52-
* The simplest implementation of a TopicMessageId interface.
53-
*/
54-
class Impl implements MessageIdAdv, TopicMessageId {
55-
private final String topic;
56-
private final MessageIdAdv messageId;
57-
58-
public Impl(String topic, MessageId messageId) {
59-
this.topic = topic;
60-
this.messageId = (MessageIdAdv) messageId;
61-
}
62-
63-
@Override
64-
public byte[] toByteArray() {
65-
return messageId.toByteArray();
66-
}
67-
68-
@Override
69-
public String getOwnerTopic() {
70-
return topic;
71-
}
72-
73-
@Override
74-
public long getLedgerId() {
75-
return messageId.getLedgerId();
76-
}
77-
78-
@Override
79-
public long getEntryId() {
80-
return messageId.getEntryId();
81-
}
82-
83-
@Override
84-
public int getPartitionIndex() {
85-
return messageId.getPartitionIndex();
86-
}
87-
88-
@Override
89-
public int getBatchIndex() {
90-
return messageId.getBatchIndex();
91-
}
92-
93-
@Override
94-
public int getBatchSize() {
95-
return messageId.getBatchSize();
96-
}
97-
98-
@Override
99-
public BitSet getAckSet() {
100-
return messageId.getAckSet();
101-
}
102-
103-
@Override
104-
public MessageIdAdv getFirstChunkMessageId() {
105-
return messageId.getFirstChunkMessageId();
106-
}
107-
108-
@Override
109-
public int compareTo(MessageId o) {
110-
return messageId.compareTo(o);
111-
}
112-
113-
@Override
114-
public boolean equals(Object obj) {
115-
return messageId.equals(obj);
116-
}
117-
118-
@Override
119-
public int hashCode() {
120-
return messageId.hashCode();
121-
}
122-
123-
@Override
124-
public String toString() {
125-
return messageId.toString();
126-
}
48+
return DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, messageId);
12749
}
12850
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.pulsar.client.api.MessagePayloadFactory;
3838
import org.apache.pulsar.client.api.PulsarClientException;
3939
import org.apache.pulsar.client.api.Schema;
40+
import org.apache.pulsar.client.api.TopicMessageId;
4041
import org.apache.pulsar.client.api.schema.GenericRecord;
4142
import org.apache.pulsar.client.api.schema.GenericSchema;
4243
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
@@ -252,4 +253,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) {
252253

253254
SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
254255
Map<String, String> propertiesValue);
256+
257+
TopicMessageId newTopicMessageId(String topic, MessageId messageId);
255258
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2345,7 +2345,7 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
23452345
@Override
23462346
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
23472347
return getLastMessageIdAsync()
2348-
.thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId)));
2348+
.thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
23492349
}
23502350

23512351
public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21-
2221
import java.io.IOException;
2322
import java.nio.ByteBuffer;
2423
import java.nio.charset.Charset;
@@ -35,9 +34,11 @@
3534
import org.apache.pulsar.client.api.BatcherBuilder;
3635
import org.apache.pulsar.client.api.ClientBuilder;
3736
import org.apache.pulsar.client.api.MessageId;
37+
import org.apache.pulsar.client.api.MessageIdAdv;
3838
import org.apache.pulsar.client.api.MessagePayloadFactory;
3939
import org.apache.pulsar.client.api.PulsarClientException;
4040
import org.apache.pulsar.client.api.Schema;
41+
import org.apache.pulsar.client.api.TopicMessageId;
4142
import org.apache.pulsar.client.api.schema.GenericRecord;
4243
import org.apache.pulsar.client.api.schema.GenericSchema;
4344
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
@@ -387,4 +388,19 @@ public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type,
387388
Map<String, String> propertiesValue) {
388389
return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue);
389390
}
391+
392+
@Override
393+
public TopicMessageId newTopicMessageId(String topic, MessageId messageId) {
394+
final MessageIdAdv messageIdAdv;
395+
if (messageId instanceof MessageIdAdv) {
396+
messageIdAdv = (MessageIdAdv) messageId;
397+
} else {
398+
try {
399+
messageIdAdv = (MessageIdAdv) MessageId.fromByteArray(messageId.toByteArray());
400+
} catch (IOException e) {
401+
throw new RuntimeException(e);
402+
}
403+
}
404+
return new TopicMessageIdImpl(topic, messageIdAdv);
405+
}
390406
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,27 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import java.util.BitSet;
2122
import org.apache.pulsar.client.api.MessageId;
23+
import org.apache.pulsar.client.api.MessageIdAdv;
2224
import org.apache.pulsar.client.api.TopicMessageId;
2325

24-
public class TopicMessageIdImpl extends TopicMessageId.Impl {
26+
public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {
2527

26-
private final String topicName;
28+
private final String ownerTopic;
29+
private final MessageIdAdv msgId;
30+
private final String topicName; // it's never used
2731

32+
public TopicMessageIdImpl(String topic, MessageIdAdv msgId) {
33+
this.ownerTopic = topic;
34+
this.msgId = msgId;
35+
this.topicName = "";
36+
}
37+
38+
@Deprecated
2839
public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
29-
super(topicPartitionName, messageId);
40+
this.msgId = (MessageIdAdv) messageId;
41+
this.ownerTopic = topicPartitionName;
3042
this.topicName = topicName;
3143
}
3244

@@ -62,4 +74,49 @@ public boolean equals(Object obj) {
6274
public int hashCode() {
6375
return super.hashCode();
6476
}
77+
78+
@Override
79+
public byte[] toByteArray() {
80+
return msgId.toByteArray();
81+
}
82+
83+
@Override
84+
public String getOwnerTopic() {
85+
return ownerTopic;
86+
}
87+
88+
@Override
89+
public long getLedgerId() {
90+
return msgId.getLedgerId();
91+
}
92+
93+
@Override
94+
public long getEntryId() {
95+
return msgId.getEntryId();
96+
}
97+
98+
@Override
99+
public int getPartitionIndex() {
100+
return msgId.getPartitionIndex();
101+
}
102+
103+
@Override
104+
public int getBatchIndex() {
105+
return msgId.getBatchIndex();
106+
}
107+
108+
@Override
109+
public int getBatchSize() {
110+
return msgId.getBatchSize();
111+
}
112+
113+
@Override
114+
public BitSet getAckSet() {
115+
return msgId.getAckSet();
116+
}
117+
118+
@Override
119+
public MessageIdAdv getFirstChunkMessageId() {
120+
return msgId.getFirstChunkMessageId();
121+
}
65122
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java renamed to pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java

File renamed without changes.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
/**
20+
* Additional helper classes to the pulsar-client-api module.
21+
*/
22+
package org.apache.pulsar.client.api;

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@
3838
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
3939
import org.apache.pulsar.client.api.DeadLetterPolicy;
4040
import org.apache.pulsar.client.api.MessageId;
41+
import org.apache.pulsar.client.api.MessageIdAdv;
4142
import org.apache.pulsar.client.api.PulsarClient;
4243
import org.apache.pulsar.client.api.PulsarClientException;
4344
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
4445
import org.apache.pulsar.client.api.SubscriptionMode;
4546
import org.apache.pulsar.client.api.SubscriptionType;
4647
import org.apache.pulsar.client.api.TopicMessageId;
4748
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
49+
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
4850
import org.apache.pulsar.common.util.Codec;
4951
import org.apache.pulsar.common.util.DateFormatter;
5052
import org.apache.pulsar.websocket.data.ConsumerCommand;
@@ -293,8 +295,8 @@ private void checkResumeReceive() {
293295

294296
private void handleAck(ConsumerCommand command) throws IOException {
295297
// We should have received an ack
296-
TopicMessageId msgId = TopicMessageId.create(topic.toString(),
297-
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
298+
TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
299+
(MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
298300
if (log.isDebugEnabled()) {
299301
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
300302
subscription, msgId, getRemote().getInetSocketAddress().toString());

0 commit comments

Comments
 (0)