Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v22;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
Expand Down Expand Up @@ -2283,7 +2284,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
.thenApply(lastPosition -> {
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
Position markDeletePosition = PositionFactory.EARLIEST;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
Expand Down Expand Up @@ -2344,8 +2345,7 @@ private void getLargestBatchIndexWhenPossible(
} else {
// if readCompacted is false, we need to return MessageId.earliest
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()));
}
return;
}
Expand Down Expand Up @@ -2404,8 +2404,7 @@ public String toString() {

writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()));
}
});
});
Expand All @@ -2415,6 +2414,10 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent
int partitionIndex, Position markDeletePosition) {
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
if (entry != null) {
if (getRemoteEndpointProtocolVersion() >= v22.getValue()) {
sendGetLastMessageIdResponseWithBuffer(requestId, partitionIndex, entry, markDeletePosition);
return;
}
try {
// in this case, all the data has been compacted, so return the last position
// in the compacted ledger to the client
Expand Down Expand Up @@ -2453,6 +2456,29 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent
});
}

private void sendGetLastMessageIdResponseWithBuffer(long requestId, int partitionIndex, Entry entry,
Position markDeletePosition) {
try {
final var buffer = entry.getDataBuffer().retain();
final ByteBufPair response;
try {
response = Commands.newGetLastMessageIdResponse(requestId, entry.getLedgerId(),
entry.getEntryId(), partitionIndex, markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId(), buffer);
} catch (Throwable throwable) {
buffer.release();
entry.release();
log.error("Unexpected exception when getLastMessageId for compacted consumer (request id: {})",
requestId, throwable);
writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, throwable.getMessage()));
return;
}
ctx.writeAndFlush(response, ctx.voidPromise());
} finally {
entry.release();
}
}

private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
int batchSize = metadata.getNumMessagesInBatch();
if (batchSize <= 1){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PayloadToMessageIdConverter;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
Expand All @@ -54,6 +55,13 @@ public class RawReaderImpl implements RawReader {
public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
this(client, topic, subscription, consumerFuture, createTopicIfDoesNotExist, retryOnRecoverableErrors, null);
}

public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors,
PayloadToMessageIdConverter payloadToMessageIdConverter) {
consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
Expand All @@ -62,6 +70,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);
consumerConfiguration.setPayloadToMessageIdConverter(payloadToMessageIdConverter);

consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist,
retryOnRecoverableErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ public Compactor(ServiceConfiguration conf,
}

public CompletableFuture<Long> compact(String topic) {
return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false).thenComposeAsync(
this::compactAndCloseReader, scheduler);
return createRawReader(topic).thenComposeAsync(this::compactAndCloseReader, scheduler);
}

protected CompletableFuture<RawReader> createRawReader(String topic) {
return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false);
}

private CompletableFuture<Long> compactAndCloseReader(RawReader reader) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;

import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessagePayload;
import org.apache.pulsar.client.api.MessagePayloadContext;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PayloadToMessageIdConverter;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.RawReaderImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;

public class CustomCompactionServiceFactory extends PulsarCompactionServiceFactory {

private static final String RETAINED_MESSAGE_INDEXES = "retained.message.indexes";

@Override
protected Compactor newCompactor() throws PulsarServerException {
return new CustomCompactor(getPulsarService());
}

public static MessageId convertPayloadToMessageId(PayloadToMessageIdConverter.LastEntry lastEntry) {
final var buf = Unpooled.wrappedBuffer(lastEntry.getMetadataBuffer());
try {
final var metadata = Commands.parseMessageMetadata(buf);
final int batchSize = metadata.getNumMessagesInBatch();
final var property = metadata.getPropertiesList().stream()
.filter(__ -> __.getKey().equals(RETAINED_MESSAGE_INDEXES)).findAny().orElse(null);
final int batchIndex;
if (property == null) {
batchIndex = batchSize - 1;
} else {
final var indexes = Arrays.stream(property.getValue().split(",")).map(Integer::valueOf).toList();
batchIndex = indexes.get(indexes.size() - 1);
}
return new BatchMessageIdImpl(lastEntry.getLedgerId(), lastEntry.getEntryId(),
lastEntry.getPartitionIndex(), batchIndex);
} finally {
buf.release();
}
}

public static class PayloadProcessor implements MessagePayloadProcessor {

@Override
public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
Consumer<Message<T>> messageConsumer) throws Exception {
final var property = context.getProperty(RETAINED_MESSAGE_INDEXES);
if (property == null) {
MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer);
return;
}
final var numMessages = context.getNumMessages();
final var indexes = Arrays.stream(property.split(",")).map(Integer::valueOf).collect(Collectors.toSet());
for (int i = 0; i < numMessages; i++) {
final var msg = context.getMessageAt(i, numMessages, payload, true, schema);
if (indexes.contains(i)) {
messageConsumer.accept(msg);
}
}
}
}

private static class CustomCompactor extends PublishingOrderCompactor {

public CustomCompactor(PulsarService pulsarService) throws PulsarServerException {
super(pulsarService.getConfiguration(), pulsarService.getClient(), pulsarService.getBookKeeperClient(),
pulsarService.getCompactorExecutor());
}

@Override
protected CompletableFuture<RawReader> createRawReader(String topic) {
final var future = new CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>();
final var reader = new RawReaderImpl((PulsarClientImpl) pulsar, topic, COMPACTION_SUBSCRIPTION, future,
false, false, CustomCompactionServiceFactory::convertPayloadToMessageId);
return future.thenApply(__ -> reader);
}

// This is a simple implementation that assumes all messages are not compressed and have partition keys
@Override
protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg, MessageMetadata metadata,
BiPredicate<String, MessageId> filter, boolean retainNullKey)
throws IOException {
final var payload = msg.getHeadersAndPayload();
if (metadata == null) {
metadata = Commands.parseMessageMetadata(payload);
} else {
Commands.skipMessageMetadata(payload);
}

final var batchSize = metadata.getNumMessagesInBatch();
final var singleMessageMetadata = new SingleMessageMetadata();
final var retainedMessageIndexes = new ArrayList<String>();
final var batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());
// The difference from the built-in compactor's behavior is that the compactedOut field is no longer set.
// Instead, the retained messages' indexes are serialized into a property.
for (int i = 0; i < batchSize; i++) {
final var singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload,
singleMessageMetadata, 0, batchSize);
final var id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
msg.getMessageIdData().getEntryId(), msg.getMessageIdData().getPartition(), i);
if (singleMessageMetadata.isCompactedOut()) {
retainedMessageIndexes.add(Integer.toString(i));
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, Unpooled.EMPTY_BUFFER,
batchBuffer);
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
retainedMessageIndexes.add(Integer.toString(i));
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, Unpooled.EMPTY_BUFFER,
batchBuffer);
}
singleMessagePayload.release();
}
if (retainedMessageIndexes.isEmpty()) {
return Optional.empty();
}
metadata.addProperty().setKey(RETAINED_MESSAGE_INDEXES)
.setValue(String.join(",", retainedMessageIndexes));
final var metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
metadata, batchBuffer);
try {
return Optional.of(new RawMessageImpl(msg.getMessageIdData(), metadataAndPayload));
} finally {
metadataAndPayload.release();
}
}
}
}
Loading
Loading