Skip to content

Commit 4f96146

Browse files
aloyszhangAloys Zhang
andauthored
[improve][broker] Reducing the parse of MessageMetadata in compaction (#23285)
Co-authored-by: Aloys Zhang <aloyszhang@apche.org>
1 parent 13c19b5 commit 4f96146

4 files changed

Lines changed: 46 additions & 30 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,16 @@ public static boolean isReadableBatch(MessageMetadata metadata) {
5252
return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0;
5353
}
5454

55-
public static List<MessageCompactionData> extractMessageCompactionData(RawMessage msg)
55+
public static List<MessageCompactionData> extractMessageCompactionData(RawMessage msg, MessageMetadata metadata)
5656
throws IOException {
5757
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
5858

5959
ByteBuf payload = msg.getHeadersAndPayload();
60-
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
60+
if (metadata == null) {
61+
metadata = Commands.parseMessageMetadata(payload);
62+
} else {
63+
Commands.skipMessageMetadata(payload);
64+
}
6165
int batchSize = metadata.getNumMessagesInBatch();
6266

6367
CompressionType compressionType = metadata.getCompression();
@@ -91,15 +95,24 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
9195
RawMessage msg)
9296
throws IOException {
9397
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
94-
for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
98+
for (MessageCompactionData mcd : extractMessageCompactionData(msg, null)) {
99+
idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize()));
100+
}
101+
return idsAndKeysAndSize;
102+
}
103+
104+
public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSize(
105+
RawMessage msg, MessageMetadata metadata) throws IOException {
106+
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
107+
for (MessageCompactionData mcd : extractMessageCompactionData(msg, metadata)) {
95108
idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize()));
96109
}
97110
return idsAndKeysAndSize;
98111
}
99112

100113
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
101114
BiPredicate<String, MessageId> filter) throws IOException {
102-
return rebatchMessage(msg, filter, true);
115+
return rebatchMessage(msg, null, filter, true);
103116
}
104117

105118
/**
@@ -109,6 +122,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
109122
* NOTE: this message does not alter the reference count of the RawMessage argument.
110123
*/
111124
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
125+
MessageMetadata metadata,
112126
BiPredicate<String, MessageId> filter,
113127
boolean retainNullKey)
114128
throws IOException {
@@ -123,7 +137,11 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
123137
payload.readerIndex(readerIndex);
124138
brokerMeta = payload.readSlice(brokerEntryMetadataSize + Short.BYTES + Integer.BYTES);
125139
}
126-
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
140+
if (metadata == null) {
141+
metadata = Commands.parseMessageMetadata(payload);
142+
} else {
143+
Commands.skipMessageMetadata(payload);
144+
}
127145
ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());
128146

129147
CompressionType compressionType = metadata.getCompression();

pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public AbstractTwoPhaseCompactor(ServiceConfiguration conf,
7777
protected abstract Map<String, MessageId> toLatestMessageIdForKey(Map<String, T> latestForKey);
7878

7979
protected abstract boolean compactMessage(String topic, Map<String, T> latestForKey,
80-
RawMessage m, MessageId id);
80+
RawMessage m, MessageMetadata metadata, MessageId id);
8181

8282

8383
protected abstract boolean compactBatchMessage(String topic, Map<String, T> latestForKey,
@@ -147,7 +147,7 @@ private void phaseOneLoop(RawReader reader,
147147
} else if (RawBatchConverter.isReadableBatch(metadata)) {
148148
deletedMessage = compactBatchMessage(reader.getTopic(), latestForKey, m, metadata, id);
149149
} else {
150-
deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, id);
150+
deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, metadata, id);
151151
}
152152
MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
153153
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
@@ -239,15 +239,15 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
239239
} else if (RawBatchConverter.isReadableBatch(metadata)) {
240240
try {
241241
messageToAdd = rebatchMessage(reader.getTopic(),
242-
m, (key, subid) -> subid.equals(latestForKey.get(key)),
242+
m, metadata, (key, subid) -> subid.equals(latestForKey.get(key)),
243243
topicCompactionRetainNullKey);
244244
} catch (IOException ioe) {
245245
log.info("Error decoding batch for message {}. Whole batch will be included in output",
246246
id, ioe);
247247
messageToAdd = Optional.of(m);
248248
}
249249
} else {
250-
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
250+
Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
251251
MessageId msg;
252252
if (keyAndSize == null) {
253253
messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : Optional.empty();
@@ -392,9 +392,8 @@ private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage
392392
return bkf;
393393
}
394394

395-
protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
395+
protected Pair<String, Integer> extractKeyAndSize(RawMessage m, MessageMetadata msgMetadata) {
396396
ByteBuf headersAndPayload = m.getHeadersAndPayload();
397-
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
398397
if (msgMetadata.hasPartitionKey()) {
399398
int size = headersAndPayload.readableBytes();
400399
if (msgMetadata.hasUncompressedSize()) {
@@ -408,13 +407,14 @@ protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
408407

409408

410409
protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg,
410+
MessageMetadata metadata,
411411
BiPredicate<String, MessageId> filter,
412412
boolean retainNullKey)
413413
throws IOException {
414414
if (log.isDebugEnabled()) {
415415
log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic);
416416
}
417-
return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
417+
return RawBatchConverter.rebatchMessage(msg, metadata, filter, retainNullKey);
418418
}
419419

420420
protected static class PhaseOneResult<T> {

pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.pulsar.client.api.RawMessage;
3535
import org.apache.pulsar.client.impl.RawBatchConverter;
3636
import org.apache.pulsar.common.api.proto.MessageMetadata;
37-
import org.apache.pulsar.common.protocol.Commands;
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
4039

@@ -61,10 +60,10 @@ protected Map<String, MessageId> toLatestMessageIdForKey(
6160

6261
@Override
6362
protected boolean compactMessage(String topic, Map<String, Pair<MessageId, Long>> latestForKey,
64-
RawMessage m, MessageId id) {
63+
RawMessage m, MessageMetadata metadata, MessageId id) {
6564
boolean deletedMessage = false;
6665
boolean replaceMessage = false;
67-
MessageCompactionData mcd = extractMessageCompactionData(m);
66+
MessageCompactionData mcd = extractMessageCompactionData(m, metadata);
6867

6968
if (mcd != null) {
7069
boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
@@ -100,7 +99,7 @@ protected boolean compactBatchMessage(String topic, Map<String, Pair<MessageId,
10099
int numMessagesInBatch = metadata.getNumMessagesInBatch();
101100
int deleteCnt = 0;
102101

103-
for (MessageCompactionData mcd : extractMessageCompactionDataFromBatch(m)) {
102+
for (MessageCompactionData mcd : extractMessageCompactionDataFromBatch(m, metadata)) {
104103
if (mcd.key() == null) {
105104
if (!topicCompactionRetainNullKey) {
106105
// record delete null-key message event
@@ -139,23 +138,22 @@ protected boolean compactBatchMessage(String topic, Map<String, Pair<MessageId,
139138
return deletedMessage;
140139
}
141140

142-
protected MessageCompactionData extractMessageCompactionData(RawMessage m) {
141+
protected MessageCompactionData extractMessageCompactionData(RawMessage m, MessageMetadata metadata) {
143142
ByteBuf headersAndPayload = m.getHeadersAndPayload();
144-
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
145-
if (msgMetadata.hasPartitionKey()) {
143+
if (metadata.hasPartitionKey()) {
146144
int size = headersAndPayload.readableBytes();
147-
if (msgMetadata.hasUncompressedSize()) {
148-
size = msgMetadata.getUncompressedSize();
145+
if (metadata.hasUncompressedSize()) {
146+
size = metadata.getUncompressedSize();
149147
}
150-
return new MessageCompactionData(m.getMessageId(), msgMetadata.getPartitionKey(),
151-
size, msgMetadata.getEventTime());
148+
return new MessageCompactionData(m.getMessageId(), metadata.getPartitionKey(),
149+
size, metadata.getEventTime());
152150
} else {
153151
return null;
154152
}
155153
}
156154

157-
private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg)
155+
private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
158156
throws IOException {
159-
return RawBatchConverter.extractMessageCompactionData(msg);
157+
return RawBatchConverter.extractMessageCompactionData(msg, metadata);
160158
}
161159
}

pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ protected Map<String, MessageId> toLatestMessageIdForKey(Map<String, MessageId>
5353

5454
@Override
5555
protected boolean compactMessage(String topic, Map<String, MessageId> latestForKey,
56-
RawMessage m, MessageId id) {
56+
RawMessage m, MessageMetadata metadata, MessageId id) {
5757
boolean deletedMessage = false;
5858
boolean replaceMessage = false;
59-
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
59+
Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
6060
if (keyAndSize != null) {
6161
if (keyAndSize.getRight() > 0) {
6262
MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
@@ -84,7 +84,7 @@ protected boolean compactBatchMessage(String topic, Map<String, MessageId> lates
8484
int numMessagesInBatch = metadata.getNumMessagesInBatch();
8585
int deleteCnt = 0;
8686
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(
87-
m)) {
87+
m, metadata)) {
8888
if (e != null) {
8989
if (e.getMiddle() == null) {
9090
if (!topicCompactionRetainNullKey) {
@@ -119,9 +119,9 @@ protected boolean compactBatchMessage(String topic, Map<String, MessageId> lates
119119
}
120120

121121
protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(
122-
RawMessage msg)
122+
RawMessage msg, MessageMetadata metadata)
123123
throws IOException {
124-
return RawBatchConverter.extractIdsAndKeysAndSize(msg);
124+
return RawBatchConverter.extractIdsAndKeysAndSize(msg, metadata);
125125
}
126126

127127
}

0 commit comments

Comments
 (0)