Skip to content

Commit 878a412

Browse files
authored
[improve][client] Deprecate MessageIdUtils.getOffset and MessageIdUtils.getMessageId (#22747)
### Motivation After discussing [here](#22698 (comment)), the pulsar client shouldn't expose the `offset` term to users. ### Modifications - Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` - For connectors, use `FunctionCommon.getOffset` and `FunctionCommon.getMessageId`
1 parent b5bc390 commit 878a412

4 files changed

Lines changed: 16 additions & 8 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.pulsar.client.impl.MessageIdImpl;
2323

2424
public class MessageIdUtils {
25+
@Deprecated
2526
public static final long getOffset(MessageId messageId) {
2627
MessageIdImpl msgId = (MessageIdImpl) messageId;
2728
long ledgerId = msgId.getLedgerId();
@@ -34,6 +35,7 @@ public static final long getOffset(MessageId messageId) {
3435
return offset;
3536
}
3637

38+
@Deprecated
3739
public static final MessageId getMessageId(long offset) {
3840
// Demultiplex ledgerId and entryId from offset
3941
long ledgerId = offset >>> 28;

pulsar-io/kafka-connect-adaptor/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@
4646
<scope>compile</scope>
4747
</dependency>
4848

49+
<dependency>
50+
<groupId>org.apache.pulsar</groupId>
51+
<artifactId>pulsar-functions-utils</artifactId>
52+
<version>${project.version}</version>
53+
</dependency>
54+
4955
<dependency>
5056
<groupId>com.fasterxml.jackson.core</groupId>
5157
<artifactId>jackson-databind</artifactId>

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.kafka.connect.sink.SinkTaskContext;
4141
import org.apache.kafka.connect.storage.OffsetBackingStore;
4242
import org.apache.pulsar.client.api.PulsarClientException;
43-
import org.apache.pulsar.client.util.MessageIdUtils;
43+
import org.apache.pulsar.functions.utils.FunctionCommon;
4444
import org.apache.pulsar.io.core.SinkContext;
4545

4646
@Slf4j
@@ -150,7 +150,7 @@ private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) {
150150
try {
151151
ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
152152
topicPartition.partition(),
153-
MessageIdUtils.getMessageId(offset));
153+
FunctionCommon.getMessageId(offset));
154154
} catch (PulsarClientException e) {
155155
log.error("Failed to seek topic {} partition {} offset {}",
156156
topicPartition.topic(), topicPartition.partition(), offset, e);

pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@
6464
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
6565
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
6666
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
67-
import org.apache.pulsar.client.util.MessageIdUtils;
6867
import org.apache.pulsar.common.schema.KeyValue;
6968
import org.apache.pulsar.common.schema.SchemaInfo;
7069
import org.apache.pulsar.common.schema.SchemaType;
7170
import org.apache.pulsar.functions.api.Record;
7271
import org.apache.pulsar.functions.source.PulsarRecord;
72+
import org.apache.pulsar.functions.utils.FunctionCommon;
7373
import org.apache.pulsar.io.core.SinkContext;
7474
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
7575
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
@@ -303,8 +303,8 @@ public void seekPauseResumeTest() throws Exception {
303303
assertEquals(status.get(), 1);
304304

305305
final TopicPartition tp = new TopicPartition("fake-topic", 0);
306-
assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
307-
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));
306+
assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
307+
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));
308308

309309
sink.taskContext.offset(tp, 0);
310310
verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
@@ -347,12 +347,12 @@ public void seekPauseResumeWithSanitizeTest() throws Exception {
347347
assertEquals(status.get(), 1);
348348

349349
final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
350-
assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
351-
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));
350+
assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
351+
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));
352352

353353
sink.taskContext.offset(tp, 0);
354354
verify(context, times(1)).seek(pulsarTopicName,
355-
tp.partition(), MessageIdUtils.getMessageId(0));
355+
tp.partition(), FunctionCommon.getMessageId(0));
356356
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);
357357

358358
sink.taskContext.pause(tp);

0 commit comments

Comments
 (0)