Skip to content

Commit 240d03b

Browse files
[fix][client]Add setCompressMinMsgBodySize method to ProducerBuilder for compression configuration flexibility (#24164)
PIP: #23526 #### Motivation Currently, the `ProducerBuilder` API lacks the ability to directly configure the `compressMinMsgBodySize` property during producer initialization. Users are forced to modify the internal `ProducerConfigurationData` post-creation, which: 1. **Violates encapsulation** by exposing internal configuration fields. 2. **Introduces inconsistency** with other compression settings (e.g., `compressionType`) that are already configurable via the builder. 3. **Creates usability hurdles**, requiring workarounds like: ```java Producer<byte[]> producer = client.newProducer().topic("test").create(); producer.conf.setCompressMinMsgBodySize(1024); // Insecure and error-prone ``` This change aligns the builder API with standard compression configuration patterns and improves developer ergonomics. #### Modifications 1. **API Enhancement**: - Added `compressMinMsgBodySize(int)` method to the `ProducerBuilder` interface. - Propagate the configured value to `ProducerConfigurationData` during producer initialization. 2. **Behavior Preservation**: - Default value remains unchanged if the method is not invoked. - Fully backward-compatible with existing code. 3. **Testing & Documentation**: - Added unit tests to validate compression threshold behavior via the new builder method. - Updated Javadoc and configuration examples to reflect the new API.
1 parent 610b72e commit 240d03b

3 files changed

Lines changed: 20 additions & 2 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,15 +245,14 @@ public void testProducerCompressionMinMsgBodySize() throws PulsarClientException
245245
.topic(topicName)
246246
.producerName("producer")
247247
.compressionType(CompressionType.LZ4)
248+
.compressionMinMsgBodySize(1024)
248249
.create();
249250
@Cleanup
250251
Consumer<byte[]> consumer = pulsarClient.newConsumer()
251252
.topic(topicName)
252253
.subscriptionName("sub")
253254
.subscribe();
254255

255-
producer.conf.setCompressMinMsgBodySize(1024);
256-
producer.conf.setCompressionType(CompressionType.LZ4);
257256
// disable batch
258257
producer.conf.setBatchingEnabled(false);
259258
producer.newMessage().value(msg1024).send();

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,19 @@ public interface ProducerBuilder<T> extends Cloneable {
293293
*/
294294
ProducerBuilder<T> compressionType(CompressionType compressionType);
295295

296+
/**
297+
* Sets the minimum uncompressed message body size required to enable compression.
298+
* <p>
299+
* When a message's body size exceeds this threshold (in bytes), compression will be applied
300+
* using the configured {@link #compressionType(CompressionType)}. Messages smaller than this
301+
* threshold will not be compressed.
302+
* <p>
303+
* Default: 4 KB
304+
*
305+
* @param compressionMinMsgBodySize the minimum uncompressed message body size required to enable compression
306+
*/
307+
ProducerBuilder<T> compressionMinMsgBodySize(int compressionMinMsgBodySize);
308+
296309
/**
297310
* Set a custom message routing policy by passing an implementation of MessageRouter.
298311
*

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ public ProducerBuilder<T> compressionType(@NonNull CompressionType compressionTy
174174
return this;
175175
}
176176

177+
@Override
178+
public ProducerBuilder<T> compressionMinMsgBodySize(int compressionMinMsgBodySize) {
179+
conf.setCompressMinMsgBodySize(compressionMinMsgBodySize);
180+
return this;
181+
}
182+
177183
@Override
178184
public ProducerBuilder<T> hashingScheme(@NonNull HashingScheme hashingScheme) {
179185
conf.setHashingScheme(hashingScheme);

0 commit comments

Comments
 (0)