Skip to content

Commit 1220951

Browse files
[improve][client][PIP-389] Add a producer config to improve compression performance (#23525)
PIP: #23526 ### Motivation The motivation of this PIP is to provide a way to improve the compression performance by skipping the compression of small messages. We want to add a new configuration compressMinMsgBodySize to the producer configuration. This configuration will allow the user to set the minimum size of the message body that will be compressed. If the message body size is less than the compressMinMsgBodySize, the message will not be compressed.
1 parent 1d53d36 commit 1220951

8 files changed

Lines changed: 115 additions & 32 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public ByteBuf toByteBuf() {
167167
}
168168
}
169169

170-
ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
170+
ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(false));
171171
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
172172
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
173173
messageMetadata, encryptedPayload);

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1257,11 +1257,12 @@ public void testExamineMessageMetadata() throws Exception {
12571257

12581258
admin.topics().createPartitionedTopic(topicName, 2);
12591259
@Cleanup
1260-
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
1260+
ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING)
12611261
.producerName("testExamineMessageMetadataProducer")
12621262
.compressionType(CompressionType.LZ4)
12631263
.topic(topicName + "-partition-0")
12641264
.create();
1265+
producer.getConfiguration().setCompressMinMsgBodySize(1);
12651266

12661267
producer.newMessage()
12671268
.keyBytes("partition123".getBytes())

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,26 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21-
import static org.testng.Assert.assertEquals;
2221
import static org.testng.Assert.assertFalse;
2322
import static org.testng.Assert.assertNotNull;
23+
import static org.testng.Assert.assertEquals;
2424

2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
28+
2829
import lombok.Cleanup;
2930
import lombok.extern.slf4j.Slf4j;
3031
import org.apache.pulsar.broker.BrokerTestUtil;
3132
import org.apache.pulsar.broker.service.ServerCnx;
3233
import org.apache.pulsar.client.api.BatcherBuilder;
34+
import org.apache.pulsar.client.api.CompressionType;
3335
import org.apache.pulsar.client.api.Consumer;
3436
import org.apache.pulsar.client.api.Message;
3537
import org.apache.pulsar.client.api.MessageId;
3638
import org.apache.pulsar.client.api.Producer;
3739
import org.apache.pulsar.client.api.ProducerConsumerBase;
40+
import org.apache.pulsar.client.api.PulsarClientException;
3841
import org.apache.pulsar.client.api.SubscriptionType;
3942
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
4043
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -230,4 +233,47 @@ public void testRetentionPolicyByProducingMessages() throws Exception {
230233
assertEquals(internalStats.ledgers.size(), 1);
231234
});
232235
}
236+
237+
238+
@Test
239+
public void testProducerCompressionMinMsgBodySize() throws PulsarClientException {
240+
byte[] msg1022 = new byte[1022];
241+
byte[] msg1025 = new byte[1025];
242+
final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
243+
@Cleanup
244+
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
245+
.topic(topicName)
246+
.producerName("producer")
247+
.compressionType(CompressionType.LZ4)
248+
.create();
249+
@Cleanup
250+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
251+
.topic(topicName)
252+
.subscriptionName("sub")
253+
.subscribe();
254+
255+
producer.conf.setCompressMinMsgBodySize(1024);
256+
producer.conf.setCompressionType(CompressionType.LZ4);
257+
// disable batch
258+
producer.conf.setBatchingEnabled(false);
259+
producer.newMessage().value(msg1022).send();
260+
MessageImpl<byte[]> message = (MessageImpl<byte[]>) consumer.receive();
261+
CompressionType compressionType = message.getCompressionType();
262+
assertEquals(compressionType, CompressionType.NONE);
263+
producer.newMessage().value(msg1025).send();
264+
message = (MessageImpl<byte[]>) consumer.receive();
265+
compressionType = message.getCompressionType();
266+
assertEquals(compressionType, CompressionType.LZ4);
267+
268+
// enable batch
269+
producer.conf.setBatchingEnabled(true);
270+
producer.newMessage().value(msg1022).send();
271+
message = (MessageImpl<byte[]>) consumer.receive();
272+
compressionType = message.getCompressionType();
273+
assertEquals(compressionType, CompressionType.NONE);
274+
producer.newMessage().value(msg1025).send();
275+
message = (MessageImpl<byte[]>) consumer.receive();
276+
compressionType = message.getCompressionType();
277+
assertEquals(compressionType, CompressionType.LZ4);
278+
}
233279
}

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType comp
124124
.compressionType(compressionType)
125125
.enableBatching(false)
126126
.create();
127+
producer.getConfiguration().setCompressMinMsgBodySize(1);
127128
producer.getConnectionHandler().setMaxMessageSize(maxMessageSize);
128129
MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer);
129130
/**

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
142142
}
143143

144144
protected ByteBuf getCompressedBatchMetadataAndPayload() {
145+
return getCompressedBatchMetadataAndPayload(true);
146+
}
147+
148+
protected ByteBuf getCompressedBatchMetadataAndPayload(boolean clientOperation) {
145149
int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
146150
int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();
147151

@@ -169,11 +173,23 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
169173
}
170174

171175
int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
172-
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
173-
batchedMessageMetadataAndPayload.release();
174-
if (compressionType != CompressionType.NONE) {
175-
messageMetadata.setCompression(compressionType);
176-
messageMetadata.setUncompressedSize(uncompressedSize);
176+
ByteBuf compressedPayload;
177+
if (clientOperation && producer != null){
178+
if (compressionType != CompressionType.NONE
179+
&& uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
180+
compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload);
181+
messageMetadata.setCompression(compressionType);
182+
messageMetadata.setUncompressedSize(uncompressedSize);
183+
} else {
184+
compressedPayload = batchedMessageMetadataAndPayload;
185+
}
186+
} else {
187+
compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
188+
batchedMessageMetadataAndPayload.release();
189+
if (compressionType != CompressionType.NONE) {
190+
messageMetadata.setCompression(compressionType);
191+
messageMetadata.setUncompressedSize(uncompressedSize);
192+
}
177193
}
178194

179195
// Update the current max batch size using the uncompressed size, which is what we need in any case to
@@ -252,7 +268,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
252268
if (messages.size() == 1) {
253269
messageMetadata.clear();
254270
messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
255-
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
271+
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
272+
getCompressedBatchMetadataAndPayload());
256273
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
257274
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
258275
1, null, messageMetadata, encryptedPayload);
@@ -284,7 +301,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
284301
lowestSequenceId = -1L;
285302
return op;
286303
}
287-
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
304+
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
305+
getCompressedBatchMetadataAndPayload());
288306
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
289307
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
290308
encryptedPayload.release();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.stream.Collectors;
4040
import lombok.Getter;
41+
import org.apache.pulsar.client.api.CompressionType;
4142
import org.apache.pulsar.client.api.Message;
4243
import org.apache.pulsar.client.api.MessageId;
4344
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -780,6 +781,10 @@ int getUncompressedSize() {
780781
return uncompressedSize;
781782
}
782783

784+
CompressionType getCompressionType() {
785+
return CompressionType.valueOf(msgMetadata.getCompression().name());
786+
}
787+
783788
SchemaState getSchemaState() {
784789
return schemaState;
785790
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,8 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
486486
* @param payload
487487
* @return a new payload
488488
*/
489-
private ByteBuf applyCompression(ByteBuf payload) {
489+
@VisibleForTesting
490+
public ByteBuf applyCompression(ByteBuf payload) {
490491
ByteBuf compressedPayload = compressor.encode(payload);
491492
payload.release();
492493
return compressedPayload;
@@ -540,22 +541,29 @@ public void sendAsync(Message<?> message, SendCallback callback) {
540541
boolean compressed = false;
541542
// Batch will be compressed when closed
542543
// If a message has a delayed delivery time, we'll always send it individually
543-
if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
544-
compressedPayload = applyCompression(payload);
545-
compressed = true;
544+
if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) {
545+
if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {
546546

547-
// validate msg-size (For batching this will be check at the batch completion size)
548-
int compressedSize = compressedPayload.readableBytes();
549-
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
550-
compressedPayload.release();
551-
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
552-
PulsarClientException.InvalidMessageException invalidMessageException =
553-
new PulsarClientException.InvalidMessageException(
554-
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
555-
+ " %d bytes",
556-
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
557-
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
558-
return;
547+
} else {
548+
compressedPayload = applyCompression(payload);
549+
compressed = true;
550+
551+
// validate msg-size (For batching this will be check at the batch completion size)
552+
int compressedSize = compressedPayload.readableBytes();
553+
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
554+
compressedPayload.release();
555+
String compressedStr = conf.getCompressionType() != CompressionType.NONE
556+
? ("compressed (" + conf.getCompressionType() + ")")
557+
: "uncompressed";
558+
PulsarClientException.InvalidMessageException invalidMessageException =
559+
new PulsarClientException.InvalidMessageException(
560+
format("The producer %s of the topic %s sends a %s message with %d bytes that "
561+
+ "exceeds %d bytes",
562+
producerName, topic, compressedStr, compressedSize,
563+
getMaxMessageSize()));
564+
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
565+
return;
566+
}
559567
}
560568
}
561569

@@ -577,7 +585,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
577585

578586
// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
579587
// into chunks.
580-
updateMessageMetadata(msgMetadata, uncompressedSize);
588+
updateMessageMetadata(msgMetadata, uncompressedSize, compressed);
581589

582590
// send in chunks
583591
int totalChunks;
@@ -673,7 +681,9 @@ public void sendAsync(Message<?> message, SendCallback callback) {
673681
* @param uncompressedSize
674682
* @return the sequence id
675683
*/
676-
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
684+
@SuppressWarnings("checkstyle:Indentation")
685+
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize,
686+
boolean isCompressed) {
677687
if (!msgMetadata.hasPublishTime()) {
678688
msgMetadata.setPublishTime(client.getClientClock().millis());
679689

@@ -683,9 +693,9 @@ private void updateMessageMetadata(final MessageMetadata msgMetadata, final int
683693

684694
// The field "uncompressedSize" is zero means the compression info were not set yet.
685695
if (msgMetadata.getUncompressedSize() <= 0) {
686-
if (conf.getCompressionType() != CompressionType.NONE) {
687-
msgMetadata
688-
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
696+
if (conf.getCompressionType() != CompressionType.NONE && isCompressed) {
697+
msgMetadata.setCompression(
698+
CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
689699
}
690700
msgMetadata.setUncompressedSize(uncompressedSize);
691701
}
@@ -777,7 +787,7 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
777787
} else {
778788
// in this case compression has not been applied by the caller
779789
// but we have to compress the payload if compression is configured
780-
if (!compressed) {
790+
if (!compressed && chunkPayload.readableBytes() > conf.getCompressMinMsgBodySize()) {
781791
chunkPayload = applyCompression(chunkPayload);
782792
}
783793
ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
189189
)
190190
private CompressionType compressionType = CompressionType.NONE;
191191

192+
private int compressMinMsgBodySize = 4 * 1024; // 4kb
193+
192194
// Cannot use Optional<Long> since it's not serializable
193195
private Long initialSequenceId = null;
194196

0 commit comments

Comments
 (0)