diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3aa365323ec7c..8124b2be4f6a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -27,6 +27,7 @@ import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; +import static org.apache.pulsar.common.api.proto.ProtocolVersion.v22; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; @@ -2283,7 +2284,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) .thenApply(lastPosition -> { int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - Position markDeletePosition = null; + Position markDeletePosition = PositionFactory.EARLIEST; if (consumer.getSubscription() instanceof PersistentSubscription) { markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() .getMarkDeletedPosition(); @@ -2344,8 +2345,7 @@ private void getLargestBatchIndexWhenPossible( } else { // if readCompacted is false, we need to return MessageId.earliest writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId())); } return; } @@ -2404,8 +2404,7 @@ public String toString() { writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(), lastPosition.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId())); } }); }); @@ -2415,6 +2414,10 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent int partitionIndex, Position markDeletePosition) { persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { if (entry != null) { + if (getRemoteEndpointProtocolVersion() >= v22.getValue()) { + sendGetLastMessageIdResponseWithBuffer(requestId, partitionIndex, entry, markDeletePosition); + return; + } try { // in this case, all the data has been compacted, so return the last position // in the compacted ledger to the client @@ -2453,6 +2456,29 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent }); } + private void sendGetLastMessageIdResponseWithBuffer(long requestId, int partitionIndex, Entry entry, + Position markDeletePosition) { + try { + final var buffer = entry.getDataBuffer().retain(); + final ByteBufPair response; + try { + response = Commands.newGetLastMessageIdResponse(requestId, entry.getLedgerId(), + entry.getEntryId(), partitionIndex, markDeletePosition.getLedgerId(), + markDeletePosition.getEntryId(), buffer); + } catch (Throwable throwable) { + buffer.release(); + entry.release(); + log.error("Unexpected exception when getLastMessageId for compacted consumer (request id: {})", + requestId, throwable); + writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, throwable.getMessage())); + return; + } + ctx.writeAndFlush(response, ctx.voidPromise()); + } finally { + entry.release(); + } + } + private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { int batchSize = metadata.getNumMessagesInBatch(); if (batchSize <= 1){ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 32f75d71dc332..6a27b79edf776 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; @@ -54,6 +55,13 @@ public class RawReaderImpl implements RawReader { public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, CompletableFuture> consumerFuture, boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) { + this(client, topic, subscription, consumerFuture, createTopicIfDoesNotExist, retryOnRecoverableErrors, null); + } + + public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors, + PayloadToMessageIdConverter payloadToMessageIdConverter) { consumerConfiguration = new ConsumerConfigurationData<>(); consumerConfiguration.getTopicNames().add(topic); consumerConfiguration.setSubscriptionName(subscription); @@ -62,6 +70,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumerConfiguration.setReadCompacted(true); consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); consumerConfiguration.setAckReceiptEnabled(true); + consumerConfiguration.setPayloadToMessageIdConverter(payloadToMessageIdConverter); consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist, retryOnRecoverableErrors); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index d37298757db9d..729cda8f1f1e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -56,8 +56,11 @@ public Compactor(ServiceConfiguration conf, } public CompletableFuture compact(String topic) { - return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false).thenComposeAsync( - this::compactAndCloseReader, scheduler); + return createRawReader(topic).thenComposeAsync(this::compactAndCloseReader, scheduler); + } + + protected CompletableFuture createRawReader(String topic) { + return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false); } private CompletableFuture compactAndCloseReader(RawReader reader) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java new file mode 100644 index 0000000000000..2dce6c18c03be --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceFactory.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.MessagePayloadContext; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.client.impl.RawReaderImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.protocol.Commands; + +public class CustomCompactionServiceFactory extends PulsarCompactionServiceFactory { + + private static final String RETAINED_MESSAGE_INDEXES = "retained.message.indexes"; + + @Override + protected Compactor newCompactor() throws PulsarServerException { + return new CustomCompactor(getPulsarService()); + } + + public static MessageId convertPayloadToMessageId(PayloadToMessageIdConverter.LastEntry lastEntry) { + final var buf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer()); + try { + final var metadata = Commands.parseMessageMetadata(buf); + final int batchSize = metadata.getNumMessagesInBatch(); + final var property = metadata.getPropertiesList().stream() + .filter(__ -> __.getKey().equals(RETAINED_MESSAGE_INDEXES)).findAny().orElse(null); + final int batchIndex; + if (property == null) { + batchIndex = batchSize - 1; + } else { + final var indexes = Arrays.stream(property.getValue().split(",")).map(Integer::valueOf).toList(); + batchIndex = indexes.get(indexes.size() - 1); + } + return new BatchMessageIdImpl(lastEntry.getLedgerId(), lastEntry.getEntryId(), + lastEntry.getPartitionIndex(), batchIndex); + } finally { + buf.release(); + } + } + + public static class PayloadProcessor implements MessagePayloadProcessor { + + @Override + public void process(MessagePayload payload, MessagePayloadContext context, Schema schema, + Consumer> messageConsumer) throws Exception { + final var property = context.getProperty(RETAINED_MESSAGE_INDEXES); + if (property == null) { + MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer); + return; + } + final var numMessages = context.getNumMessages(); + final var indexes = Arrays.stream(property.split(",")).map(Integer::valueOf).collect(Collectors.toSet()); + for (int i = 0; i < numMessages; i++) { + final var msg = context.getMessageAt(i, numMessages, payload, true, schema); + if (indexes.contains(i)) { + messageConsumer.accept(msg); + } + } + } + } + + private static class CustomCompactor extends PublishingOrderCompactor { + + public CustomCompactor(PulsarService pulsarService) throws PulsarServerException { + super(pulsarService.getConfiguration(), pulsarService.getClient(), pulsarService.getBookKeeperClient(), + pulsarService.getCompactorExecutor()); + } + + @Override + protected CompletableFuture createRawReader(String topic) { + final var future = new CompletableFuture>(); + final var reader = new RawReaderImpl((PulsarClientImpl) pulsar, topic, COMPACTION_SUBSCRIPTION, future, + false, false, CustomCompactionServiceFactory::convertPayloadToMessageId); + return future.thenApply(__ -> reader); + } + + // This is a simple implementation that assumes all messages are not compressed and have partition keys + @Override + protected Optional rebatchMessage(String topic, RawMessage msg, MessageMetadata metadata, + BiPredicate filter, boolean retainNullKey) + throws IOException { + final var payload = msg.getHeadersAndPayload(); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(payload); + } else { + Commands.skipMessageMetadata(payload); + } + + final var batchSize = metadata.getNumMessagesInBatch(); + final var singleMessageMetadata = new SingleMessageMetadata(); + final var retainedMessageIndexes = new ArrayList(); + final var batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity()); + // The difference from the built-in compactor's behavior is that the compactedOut field is no longer set. + // Instead, the retained messages' indexes are serialized into a property. + for (int i = 0; i < batchSize; i++) { + final var singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, + singleMessageMetadata, 0, batchSize); + final var id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(), + msg.getMessageIdData().getEntryId(), msg.getMessageIdData().getPartition(), i); + if (singleMessageMetadata.isCompactedOut()) { + retainedMessageIndexes.add(Integer.toString(i)); + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, Unpooled.EMPTY_BUFFER, + batchBuffer); + } else if (filter.test(singleMessageMetadata.getPartitionKey(), id) + && singleMessagePayload.readableBytes() > 0) { + retainedMessageIndexes.add(Integer.toString(i)); + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, + singleMessagePayload, batchBuffer); + } else { + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, Unpooled.EMPTY_BUFFER, + batchBuffer); + } + singleMessagePayload.release(); + } + if (retainedMessageIndexes.isEmpty()) { + return Optional.empty(); + } + metadata.addProperty().setKey(RETAINED_MESSAGE_INDEXES) + .setValue(String.join(",", retainedMessageIndexes)); + final var metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, + metadata, batchBuffer); + try { + return Optional.of(new RawMessageImpl(msg.getMessageIdData(), metadataAndPayload)); + } finally { + metadataAndPayload.release(); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java new file mode 100644 index 0000000000000..a077bf93e56c9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CustomCompactionServiceTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class CustomCompactionServiceTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE); + conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE); + conf.setCompactionServiceFactoryClassName(CustomCompactionServiceFactory.class.getName()); + } + + @Test + public void testGetLastMessageIdAfterCompaction() throws Exception { + final var topic = BrokerTestUtil.newUniqueName("tp"); + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + .batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxMessages(Integer.MAX_VALUE) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create(); + producer.newMessage().key("k0").value("v0").sendAsync(); + producer.newMessage().key("k0").value("v1").sendAsync(); + producer.newMessage().key("k1").value("v0").sendAsync(); + producer.newMessage().key("k1").value(null).sendAsync(); + producer.flush(); + + triggerCompactionAndWait(topic); + + @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).topic(topic) + .startMessageId(MessageId.earliest).readCompacted(true) + .messagePayloadProcessor(new CustomCompactionServiceFactory.PayloadProcessor()) + .payloadToMessageIdConverter(CustomCompactionServiceFactory::convertPayloadToMessageId) + .create(); + final var messages = new ArrayList>(); + while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(3, TimeUnit.SECONDS); + assertNotNull(msg); + messages.add(msg); + log.info("Read {} => {} {{}}", msg.getKey(), msg.getValue(), msg.getMessageId()); + } + Assert.assertEquals(messages.size(), 1); + final var msg = messages.get(0); + Assert.assertEquals(msg.getKey(), "k0"); + Assert.assertEquals(msg.getValue(), "v1"); + Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getBatchIndex(), 1); + final var lastMsgId = reader.getLastMessageIds().get(0); + log.info("Last msg id: {}", lastMsgId); + Assert.assertEquals(msg.getMessageId(), lastMsgId); + + // Trigger the RawReader#getLastMessageId function and ensure no exception will be thrown + admin.namespaces().unload("public/default"); + triggerCompactionAndWait(topic); + Assert.assertEquals(reader.getLastMessageIds().get(0), lastMsgId); + } + + private void triggerCompactionAndWait(String topicName) throws Exception { + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get(); + persistentTopic.triggerCompaction(); + Awaitility.await().untilAsserted(() -> { + Position lastConfirmPos = persistentTopic.getManagedLedger().getLastConfirmedEntry(); + Position markDeletePos = persistentTopic + .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition(); + assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId()); + assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId()); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 252ee939aace3..a99949607f702 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -29,6 +29,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; @@ -39,6 +41,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -57,6 +60,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker-impl") public class GetLastMessageIdCompactedTest extends ProducerConsumerBase { @@ -516,4 +520,65 @@ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception assertNotEquals(message, null); } } + + @DataProvider(name = "encryptionAndCompression") + public Object[][] encryptionAndCompression(){ + return new Object[][]{ + { true, CompressionType.NONE }, + { true, CompressionType.LZ4 }, + { false, CompressionType.NONE }, + { false, CompressionType.LZ4 }, + }; + } + + @Test(dataProvider = "encryptionAndCompression", timeOut = 30000) + public void testGetLastMessageIdForEncryptedMessage(boolean encryption, CompressionType compressionType) + throws Exception { + final var topic = BrokerTestUtil.newUniqueName("tp"); + final var ecdsaPublickeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + final var producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic) + .batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxMessages(Integer.MAX_VALUE) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .compressionType(compressionType); + if (encryption) { + producerBuilder.addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader(ecdsaPublickeyFile); + } + @Cleanup final var producer = producerBuilder.create(); + producer.newMessage().key("k0").value("v0").sendAsync(); + producer.newMessage().key("k0").value("v1").sendAsync(); + producer.newMessage().key("k1").value("v0").sendAsync(); + producer.newMessage().key("k1").value(null).sendAsync(); + producer.flush(); + triggerCompactionAndWait(topic); + + final var consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .readCompacted(true); + if (encryption) { + consumerBuilder.defaultCryptoKeyReader(ecdsaPrivateKeyFile); + } + @Cleanup final var consumer = consumerBuilder.subscribe(); + final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0); + if (encryption) { + // Compaction does not work for encrypted messages + assertEquals(msgId.getBatchIndex(), 3); + } else { + assertEquals(msgId.getBatchIndex(), 1); + } + + final var readerBuilder = pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest) + .topic(topic).readCompacted(true); + if (encryption) { + readerBuilder.defaultCryptoKeyReader(ecdsaPrivateKeyFile); + } + @Cleanup final var reader = readerBuilder.create(); + MessageIdAdv readMsgId = (MessageIdAdv) MessageId.earliest; + while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(); + log.info("Read key: {}, value: {}", msg.getKey(), Optional.ofNullable(msg.getValue()).orElse("(null)")); + readMsgId = (MessageIdAdv) msg.getMessageId(); + } + assertEquals(readMsgId, msgId); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 142c474114912..74d33b5015fbf 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -950,4 +950,26 @@ ConsumerBuilder topicConfiguration(String topicName, */ ConsumerBuilder topicConfiguration(Pattern topicsPattern, java.util.function.Consumer> builderConsumer); + + /** + * Configures a custom `PayloadToMessageIdConverter` to handle the parsing of the last entry's buffer when + * {@link ConsumerBuilder#readCompacted(boolean)} is set to `true`. + * + * When compaction is enabled, the `GetLastMessageId` response may include the buffer of the last entry from the + * compaction service. In such cases, the last message's message ID must be extracted from the buffer, as the entry + * may contain messages that have been compacted out and will not be delivered to the consumer. + * + * If the broker's topic compaction service uses the built-in implementation, users do not need to configure this + * explicitly, as the default conversion function handles the parsing correctly. However, if the broker is + * configured with a custom topic compaction service, you must provide a `converter` with an appropriate function to + * parse the buffer correctly based on the behavior of the custom compaction service. + * + * If the provided `converter` throws an exception during parsing, the corresponding result of + * {@link Consumer#getLastMessageIdsAsync()} will fail with that exception. + * + * @param converter The custom `PayloadToMessageIdConverter` to parse the last entry's buffer. + * @return The updated `ConsumerBuilder` instance. + */ + ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java new file mode 100644 index 0000000000000..31a187be1b157 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PayloadToMessageIdConverter.java @@ -0,0 +1,28 @@ +package org.apache.pulsar.client.api; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface PayloadToMessageIdConverter { + + MessageId convert(LastEntry lastEntry) throws IOException; + + interface LastEntry { + + long getLedgerId(); + + long getEntryId(); + + int getPartitionIndex(); + + /** + * @return the buffer that can be parsed to the `MessageMetadata` defined in `PulsarApi.proto` + */ + ByteBuffer getMetadataBuffer(); + + /** + * @return the uncompressed and unencrypted payload buffer of the last entry + */ + ByteBuffer getPayloadBuffer(); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index 4a313feba43d5..c6057ac082b3d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -376,4 +376,13 @@ public interface ReaderBuilder extends Cloneable { */ ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit); + /** + * @see ConsumerBuilder#messagePayloadProcessor + */ + ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); + + /** + * @see ConsumerBuilder#payloadToMessageIdConverter + */ + ReaderBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 0563fa7e66667..e92a253375515 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -568,7 +568,7 @@ protected void handleSuccess(CommandSuccess success) { } @Override - protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) { + protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success, ByteBuf buf) { checkArgument(state == State.Ready); if (log.isDebugEnabled()) { @@ -576,10 +576,11 @@ protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse suc ctx.channel(), success.getRequestId()); } long requestId = success.getRequestId(); - CompletableFuture requestFuture = - (CompletableFuture) pendingRequests.remove(requestId); + CompletableFuture> requestFuture = + (CompletableFuture>) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(new CommandGetLastMessageIdResponse().copyFrom(success)); + requestFuture.complete(Pair.of(new CommandGetLastMessageIdResponse().copyFrom(success), + buf.retainedDuplicate())); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); @@ -1062,7 +1063,8 @@ private CompletableFuture sendRequestAndHandleTimeout(ByteBuf requestMess return future; } - public CompletableFuture sendGetLastMessageId(ByteBuf request, long requestId) { + public CompletableFuture> sendGetLastMessageId( + ByteBuf request, long requestId) { return sendRequestAndHandleTimeout(request, requestId, RequestType.GetLastMessageId, true); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ce602a0ec9f36..74f8d677748e1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageListenerExecutor; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.api.RedeliveryBackoff; @@ -634,4 +635,10 @@ public ConsumerBuilder topicConfiguration(Pattern topicsPattern, builderConsumer.accept(topicConfiguration(topicsPattern)); return this; } + + @Override + public ConsumerBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter) { + conf.setPayloadToMessageIdConverter(converter); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index dfc217cf574f4..56667a74cf211 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.scurrilous.circe.checksum.Crc32cIntChecksum; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -84,6 +85,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; @@ -113,6 +115,7 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; +import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse; import org.apache.pulsar.common.api.proto.CommandMessage; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CompressionType; @@ -137,6 +140,7 @@ import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1416,6 +1420,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) { ackSet.add(cmdMessage.getAckSetAt(i)); } + } int redeliveryCount = cmdMessage.getRedeliveryCount(); MessageIdData messageId = cmdMessage.getMessageId(); @@ -2000,7 +2005,7 @@ public static DecryptResult discard() { private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata, - ByteBuf payload, ClientCnx currentCnx) { + ByteBuf payload, @Nullable ClientCnx currentCnx) { if (msgMetadata.getEncryptionKeysCount() == 0) { return DecryptResult.success(payload.retain()); @@ -2025,7 +2030,7 @@ private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeli return handleCryptoFailure(payload, messageId, currentCnx, redeliveryCount, batchSize, false); } - private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData messageId, ClientCnx currentCnx, + private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData messageId, @Nullable ClientCnx currentCnx, int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) { switch (conf.getCryptoFailureAction()) { @@ -2077,7 +2082,7 @@ private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData message } private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, - ClientCnx currentCnx, boolean checkMaxMessageSize) { + @Nullable ClientCnx currentCnx, boolean checkMaxMessageSize) { CompressionType compressionType = msgMetadata.getCompression(); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); int uncompressedSize = msgMetadata.getUncompressedSize(); @@ -2129,15 +2134,18 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC stats.incrementNumReceiveFailed(); } - private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, + private void discardCorruptedMessage(MessageIdData messageId, @Nullable ClientCnx currentCnx, ValidationError validationError) { log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); discardMessage(messageId, currentCnx, validationError, 1); } - private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError, - int batchMessages) { + private void discardMessage(MessageIdData messageId, @Nullable ClientCnx currentCnx, + ValidationError validationError, int batchMessages) { + if (currentCnx == null) { + return; + } ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual, validationError, Collections.emptyMap(), -1); currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise()); @@ -2846,7 +2854,8 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, log.debug("[{}][{}] Get topic last message Id", topic, subscription); } - cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> { + cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(pair -> { + final CommandGetLastMessageIdResponse cmd = pair.getKey(); MessageIdData lastMessageId = cmd.getLastMessageId(); MessageIdImpl markDeletePosition = null; if (cmd.hasConsumerMarkDeletePosition()) { @@ -2858,13 +2867,15 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); } - MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 - ? new MessageIdImpl(lastMessageId.getLedgerId(), - lastMessageId.getEntryId(), lastMessageId.getPartition()) - : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition(), lastMessageId.getBatchIndex()); - - future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); + final ByteBuf buf = pair.getValue(); + try { + final MessageId lastMsgId = getLastMessageIdFromResponse(lastMessageId, buf); + future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); + } catch (IOException e) { + future.completeExceptionally(e); + } finally { + buf.release(); + } }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( @@ -2896,6 +2907,102 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } } + public static MessageId convertBufferToMessageId(PayloadToMessageIdConverter.LastEntry lastEntry) + throws IOException { + final ByteBuf metadataBuf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer()); + final ByteBuf buf = Unpooled.wrappedBuffer(lastEntry.getPayloadBuffer()); + try { + final MessageMetadata metadata = Commands.parseMessageMetadata(metadataBuf); + final int batchSize = metadata.getNumMessagesInBatch(); + int batchIndex = -1; + for (int i = 0; i < batchSize; i++) { + final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + ByteBuf payload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadata, + i, batchSize); + try { + if (singleMessageMetadata.isCompactedOut()) { + continue; + } + batchIndex = i; + } finally { + payload.release(); + } + } + return new BatchMessageIdImpl(lastEntry.getLedgerId(), lastEntry.getEntryId(), + lastEntry.getPartitionIndex(), batchIndex); + } finally { + metadataBuf.release(); + buf.release(); + } + } + + private MessageId getLastMessageIdFromResponse(MessageIdData lastMessageId, ByteBuf buf) throws IOException { + if (buf.readableBytes() <= 0) { + if (lastMessageId.getBatchIndex() <= 0) { + return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition()); + } else { + return new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition(), lastMessageId.getBatchIndex()); + } + } + checkArgument(conf.isReadCompacted()); + + int startReaderIndex = buf.readerIndex(); + final MessageMetadata messageMetadata = Commands.parseMessageMetadata(buf); + final ByteBuf metadataBuf = buf.slice(startReaderIndex, buf.readerIndex() - startReaderIndex); + int batchSize = messageMetadata.getNumMessagesInBatch(); + if (batchSize <= 1) { + return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartition()); + } + + // Parse the correct batch index from buffer + if (!verifyChecksum(buf, lastMessageId)) { + throw new IOException("checksum mismatch in the last message's buffer"); + } + DecryptResult decryptResult = decryptPayloadIfNeeded(lastMessageId, 0, messageMetadata, buf, null); + if (decryptResult.shouldDiscard() || !decryptResult.success) { + throw new IOException("Failed to decrypt last message's buffer"); + } + final ByteBuf uncompressedBuf = uncompressPayloadIfNeeded(lastMessageId, messageMetadata, + decryptResult.payload, null, false); + decryptResult.payload.release(); + if (uncompressedBuf == null) { + throw new RuntimeException("Failed to uncompress last message's buffer"); + } + try { + return conf.getPayloadToMessageIdConverter().convert(new PayloadToMessageIdConverter.LastEntry() { + @Override + public long getLedgerId() { + return lastMessageId.getLedgerId(); + } + + @Override + public long getEntryId() { + return lastMessageId.getEntryId(); + } + + @Override + public int getPartitionIndex() { + return lastMessageId.getPartition(); + } + + @Override + public ByteBuffer getMetadataBuffer() { + return metadataBuf.nioBuffer(); + } + + @Override + public ByteBuffer getPayloadBuffer() { + return uncompressedBuf.nioBuffer(); + } + }); + } finally { + uncompressedBuf.release(); + } + } + private boolean isMessageUndecryptable(MessageMetadata msgMetadata) { return (msgMetadata.getEncryptionKeysCount() > 0 && conf.getCryptoKeyReader() == null && conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index d0ab90068ed31..c4a350b9e498c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -34,6 +34,8 @@ import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -280,4 +282,15 @@ public ReaderBuilder expireTimeOfIncompleteChunkedMessage(long duration, Time return this; } + @Override + public ReaderBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) { + conf.setPayloadProcessor(payloadProcessor); + return this; + } + + @Override + public ReaderBuilder payloadToMessageIdConverter(PayloadToMessageIdConverter converter) { + conf.setPayloadToMessageIdConverter(converter); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 8760d69447a64..c4ad632138c0f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -134,6 +134,12 @@ public void reachedEndOfTopic(Consumer consumer) { .ranges(readerConfiguration.getKeyHashRanges()) ); } + if (readerConfiguration.getPayloadProcessor() != null) { + consumerConfiguration.setPayloadProcessor(readerConfiguration.getPayloadProcessor()); + } + if (readerConfiguration.getPayloadToMessageIdConverter() != null) { + consumerConfiguration.setPayloadToMessageIdConverter(readerConfiguration.getPayloadToMessageIdConverter()); + } ConsumerInterceptors consumerInterceptors = ReaderInterceptorUtil.convertToConsumerInterceptors( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 4be8c4ed73e90..beb8796dadfa0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -46,11 +46,13 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageListenerExecutor; import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; @Data @NoArgsConstructor @@ -414,6 +416,8 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; private List topicConfigurations = new ArrayList<>(); + private PayloadToMessageIdConverter payloadToMessageIdConverter = + ConsumerImpl::convertBufferToMessageId; public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { return topicConfigurations.stream() @@ -459,4 +463,10 @@ public ConsumerConfigurationData clone() { public boolean isReplicateSubscriptionState() { return replicateSubscriptionState != null && replicateSubscriptionState; } + + public void setPayloadToMessageIdConverter(PayloadToMessageIdConverter converter) { + if (converter != null) { + this.payloadToMessageIdConverter = converter; + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index cd5aa4c12f5c3..6b52174b2f2ae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.PayloadToMessageIdConverter; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.ReaderInterceptor; import org.apache.pulsar.client.api.ReaderListener; @@ -164,6 +166,12 @@ public class ReaderConfigurationData implements Serializable, Cloneable { private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; + @JsonIgnore + private PayloadToMessageIdConverter payloadToMessageIdConverter; + + @JsonIgnore + private MessagePayloadProcessor payloadProcessor; + @JsonIgnore public String getTopicName() { if (topicNames.size() > 1) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 58d58a3acef98..e50a8f6af1928 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -2100,4 +2100,19 @@ public static ProducerAccessMode convertProducerAccessMode( public static boolean peerSupportsBrokerMetadata(int peerVersion) { return peerVersion >= ProtocolVersion.v16.getValue(); } + + public static ByteBufPair newGetLastMessageIdResponse(long requestId, long lastPositionLedgerId, + long lastPositionEntryId, int partitionIndex, + long markDeleteLedgerId, long markDeleteEntryId, + ByteBuf lastEntryBuffer) { + BaseCommand cmd = localCmd(Type.GET_LAST_MESSAGE_ID_RESPONSE); + CommandGetLastMessageIdResponse response = cmd.setGetLastMessageIdResponse() + .setRequestId(requestId); + response.setLastMessageId().setLedgerId(lastPositionLedgerId).setEntryId(lastPositionEntryId) + .setPartition(partitionIndex).setBatchIndex(-1); + if (markDeleteLedgerId >= 0) { + response.setConsumerMarkDeletePosition().setLedgerId(markDeleteLedgerId).setEntryId(markDeleteEntryId); + } + return serializeCommandMessageWithSize(cmd, lastEntryBuffer); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index c05b1d796dfdd..a993dd5036b02 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -322,7 +322,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case GET_LAST_MESSAGE_ID_RESPONSE: checkArgument(cmd.hasGetLastMessageIdResponse()); - handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse()); + handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse(), buffer); break; case ACTIVE_CONSUMER_CHANGE: @@ -628,7 +628,7 @@ protected void handleTopicMigrated(CommandTopicMigrated commandMigratedTopic) { protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) { throw new UnsupportedOperationException(); } - protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) { + protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success, ByteBuf buf) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index eacec33169e34..851104743e172 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -265,6 +265,7 @@ enum ProtocolVersion { v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version + v22 = 22; // Send the last entry's payload buffer to the consumer if the consumer's read_compacted field is true } message CommandConnect {