Skip to content

Commit 3629ba3

Browse files
grishaflhotari
authored andcommitted
[fix][broker] Fix non-batched null-value messages not removed during topic compaction (#25817)
(cherry picked from commit 1fa9e35)
1 parent 879cc01 commit 3629ba3

3 files changed

Lines changed: 93 additions & 15 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -449,14 +449,24 @@ private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage
449449
return bkf;
450450
}
451451

452+
/**
453+
* Extract the partition key and the payload size for a non-batch message.
454+
*
455+
* @return a pair of (partitionKey, payloadSize), or null if the message has no partition key.
456+
*/
452457
protected Pair<String, Integer> extractKeyAndSize(RawMessage m, MessageMetadata msgMetadata) {
453-
ByteBuf headersAndPayload = m.getHeadersAndPayload();
454458
if (msgMetadata.hasPartitionKey()) {
455-
int size = headersAndPayload.readableBytes();
456-
if (msgMetadata.hasUncompressedSize()) {
457-
size = msgMetadata.getUncompressedSize();
459+
int payloadSize;
460+
if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
461+
payloadSize = 0;
462+
} else if (msgMetadata.hasUncompressedSize()) {
463+
payloadSize = msgMetadata.getUncompressedSize();
464+
} else {
465+
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
466+
Commands.skipMessageMetadata(headersAndPayload);
467+
payloadSize = headersAndPayload.readableBytes();
458468
}
459-
return Pair.of(msgMetadata.getPartitionKey(), size);
469+
return Pair.of(msgMetadata.getPartitionKey(), payloadSize);
460470
} else {
461471
return null;
462472
}

pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.compaction;
2020

21-
import io.netty.buffer.ByteBuf;
2221
import java.io.IOException;
2322
import java.util.List;
2423
import java.util.Map;
@@ -139,17 +138,12 @@ protected boolean compactBatchMessage(String topic, Map<String, Pair<MessageId,
139138
}
140139

141140
protected MessageCompactionData extractMessageCompactionData(RawMessage m, MessageMetadata metadata) {
142-
ByteBuf headersAndPayload = m.getHeadersAndPayload();
143-
if (metadata.hasPartitionKey()) {
144-
int size = headersAndPayload.readableBytes();
145-
if (metadata.hasUncompressedSize()) {
146-
size = metadata.getUncompressedSize();
147-
}
148-
return new MessageCompactionData(m.getMessageId(), metadata.getPartitionKey(),
149-
size, metadata.getEventTime());
150-
} else {
141+
Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
142+
if (keyAndSize == null) {
151143
return null;
152144
}
145+
return new MessageCompactionData(m.getMessageId(), keyAndSize.getLeft(),
146+
keyAndSize.getRight(), metadata.getEventTime());
153147
}
154148

155149
private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,80 @@ public void testBatchMessageWithNullValue() throws Exception {
640640
assertEquals(messages.get(2).getKey(), "key5");
641641
}
642642

643+
/**
644+
* Write raw non-batch entries directly to the managed ledger without
645+
* uncompressedSize, as seen with some non-Java clients. Verifies that
646+
* null-value tombstones remove keys during compaction.
647+
*/
648+
@Test
649+
public void testNonBatchedMessageWithNullValue() throws Exception {
650+
String topic = "persistent://my-tenant/my-ns/non-batched-message-with-null-value";
651+
652+
admin.topics().createNonPartitionedTopic(topic);
653+
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
654+
.receiverQueueSize(1).readCompacted(true).subscribe().close();
655+
656+
PersistentTopic persistentTopic =
657+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
658+
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
659+
660+
long seqId = 0;
661+
662+
// key1: value then null-value tombstone
663+
ml.addEntry(buildNonBatchEntry("key1", "my-message-1".getBytes(), seqId++));
664+
ml.addEntry(buildNonBatchEntry("key1", null, seqId++));
665+
666+
// key2: value only (should survive)
667+
ml.addEntry(buildNonBatchEntry("key2", "my-message-3".getBytes(), seqId++));
668+
669+
// key3: value then null-value tombstone
670+
ml.addEntry(buildNonBatchEntry("key3", "my-message-4".getBytes(), seqId++));
671+
ml.addEntry(buildNonBatchEntry("key3", null, seqId++));
672+
673+
// key4: value only (should survive)
674+
ml.addEntry(buildNonBatchEntry("key4", "my-message-6".getBytes(), seqId++));
675+
676+
compact(topic);
677+
678+
List<Message<byte[]>> messages = new ArrayList<>();
679+
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
680+
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) {
681+
while (true) {
682+
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
683+
if (message == null) {
684+
break;
685+
}
686+
messages.add(message);
687+
}
688+
}
689+
690+
assertEquals(messages.size(), 2);
691+
assertEquals(messages.get(0).getKey(), "key2");
692+
assertEquals(messages.get(1).getKey(), "key4");
693+
}
694+
695+
private byte[] buildNonBatchEntry(String key, byte[] payload, long sequenceId) {
696+
org.apache.pulsar.common.api.proto.MessageMetadata metadata =
697+
new org.apache.pulsar.common.api.proto.MessageMetadata();
698+
metadata.setPartitionKey(key);
699+
metadata.setPublishTime(System.currentTimeMillis());
700+
metadata.setProducerName("test-non-batch");
701+
metadata.setSequenceId(sequenceId);
702+
if (payload == null) {
703+
metadata.setNullValue(true);
704+
}
705+
ByteBuf payloadBuf = io.netty.buffer.Unpooled.wrappedBuffer(
706+
payload != null ? payload : new byte[0]);
707+
ByteBuf entry = org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(
708+
org.apache.pulsar.common.protocol.Commands.ChecksumType.Crc32c,
709+
metadata, payloadBuf);
710+
byte[] bytes = new byte[entry.readableBytes()];
711+
entry.readBytes(bytes);
712+
entry.release();
713+
payloadBuf.release();
714+
return bytes;
715+
}
716+
643717
@Test
644718
public void testWholeBatchCompactedOut() throws Exception {
645719
String topic = "persistent://my-tenant/my-ns/whole-batch-compacted-out";

0 commit comments

Comments
 (0)