Skip to content

Commit ff73cd8

Browse files
committed
Fix tests
1 parent 34fa858 commit ff73cd8

3 files changed

Lines changed: 43 additions & 7 deletions

File tree

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import java.util.BitSet;
22+
2123
/**
2224
* The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
2325
*
@@ -49,13 +51,13 @@ static TopicMessageId create(String topic, MessageId messageId) {
4951
/**
5052
* The simplest implementation of a TopicMessageId interface.
5153
*/
52-
class Impl implements TopicMessageId {
54+
class Impl implements MessageIdAdv, TopicMessageId {
5355
private final String topic;
54-
private final MessageId messageId;
56+
private final MessageIdAdv messageId;
5557

5658
public Impl(String topic, MessageId messageId) {
5759
this.topic = topic;
58-
this.messageId = messageId;
60+
this.messageId = (MessageIdAdv) messageId;
5961
}
6062

6163
@Override
@@ -68,6 +70,41 @@ public String getOwnerTopic() {
6870
return topic;
6971
}
7072

73+
@Override
74+
public long getLedgerId() {
75+
return messageId.getLedgerId();
76+
}
77+
78+
@Override
79+
public long getEntryId() {
80+
return messageId.getEntryId();
81+
}
82+
83+
@Override
84+
public int getPartitionIndex() {
85+
return messageId.getPartitionIndex();
86+
}
87+
88+
@Override
89+
public int getBatchIndex() {
90+
return messageId.getBatchIndex();
91+
}
92+
93+
@Override
94+
public int getBatchSize() {
95+
return messageId.getBatchSize();
96+
}
97+
98+
@Override
99+
public BitSet getAckSet() {
100+
return messageId.getAckSet();
101+
}
102+
103+
@Override
104+
public MessageIdAdv getFirstChunkMessageId() {
105+
return messageId.getFirstChunkMessageId();
106+
}
107+
71108
@Override
72109
public int compareTo(MessageId o) {
73110
return messageId.compareTo(o);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2115,7 +2115,7 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
21152115
ClientCnx cnx = cnx();
21162116

21172117
MessageIdAdv originSeekMessageId = seekMessageId;
2118-
seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
2118+
seekMessageId = (MessageIdAdv) seekId;
21192119
duringSeek.set(true);
21202120
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
21212121

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ public CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType ackTyp
182182
Map<String, Long> properties) {
183183
MessageIdAdv msgIdAdv = (MessageIdAdv) msgId;
184184
if (MessageIdAdvUtils.isBatch(msgIdAdv)) {
185-
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
186-
return addAcknowledgment(MessageIdAdvUtils.discardBatch(msgId), ackType, properties, batchMessageId);
185+
return addAcknowledgment(MessageIdAdvUtils.discardBatch(msgId), ackType, properties, msgIdAdv);
187186
} else {
188187
return addAcknowledgment(msgIdAdv, ackType, properties, null);
189188
}
@@ -229,7 +228,7 @@ private CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId,
229228
} else {
230229
consumer.onAcknowledgeCumulative(msgId, null);
231230
}
232-
if (batchMessageId == null || MessageIdAdvUtils.acknowledge(msgId, false)) {
231+
if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
233232
return doCumulativeAck(msgId, properties, null);
234233
} else if (batchIndexAckEnabled) {
235234
return doCumulativeBatchIndexAck(batchMessageId, properties);

0 commit comments

Comments
 (0)