Enable zSTD Compression for Kafka Buffer - Json type#5778
Conversation
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
san81
left a comment
There was a problem hiding this comment.
Nice work. Just left a few comments. Please take a look
| implementation platform('com.fasterxml.jackson:jackson-bom:2.17.2') | ||
| implementation platform('org.eclipse.jetty:jetty-bom:9.4.53.v20231009') | ||
| implementation platform('io.micrometer:micrometer-bom:1.10.5') | ||
| implementation 'com.github.luben:zstd-jni:1.5.7-3' |
There was a problem hiding this comment.
do you need this in the global build.gradle file as well?
There was a problem hiding this comment.
Not needed, I'll remove it.
| this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE)); | ||
| consumers.forEach(this.executorService::submit); | ||
|
|
||
| this.serdeFormat = kafkaBufferConfig.getSerdeFormat(); |
There was a problem hiding this comment.
I am not seeing any references to this newly introduced member. Not sure if we need this here?
There was a problem hiding this comment.
Doesn't seem necessary, I will remove it.
| InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value()); | ||
| InputStream byteInputStream = new ByteArrayInputStream((byte[])consumerRecord.value()); | ||
| InputStream inputStream; | ||
| if (compressionEnabled) { |
There was a problem hiding this comment.
Kafka supports multiple compression algorithms: lz4, gzip, snappy, zstd. Do you think just using one boolean parameter is sufficient for configuration? There are also parameters for tuning the algorithms.
There was a problem hiding this comment.
I think that the compression stage should be invisible to the user and the flag should be removed altogether. The data outputted is not modified in anyway and makes no difference (other than performance) on the user end. I will remove the config flag in a future commit on this PR.
There was a problem hiding this comment.
@KarstenSchnitter that's valid point. But we will not be using Kafka's compression in case of Kafka Buffer, we will be using compression outside the kafka so that we can do compression and encryption in that order. We are starting with ZSTD and may add other options in future.
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
| public KafkaCustomConsumerFactory(SerializationFactory serializationFactory, AwsCredentialsSupplier awsCredentialsSupplier, boolean isCompressionEnabled) { | ||
| this.serializationFactory = serializationFactory; | ||
| this.awsCredentialsSupplier = awsCredentialsSupplier; | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
We don't need this method anymore - right?
There was a problem hiding this comment.
Not needed, will remove.
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
kkondaka
left a comment
There was a problem hiding this comment.
Looks good except for the default config. Please test with KMS keys too.
| } | ||
|
|
||
| public CompressionConfig getCompressionConfig() { | ||
| if (Objects.isNull(compressionConfig)) { |
There was a problem hiding this comment.
I don't think we want to change default behavior. We should only enable compression when compression config is explicitly provided in the YAML
| this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis()); | ||
| } | ||
|
|
||
| public KafkaCustomConsumer(final KafkaConsumer consumer, |
There was a problem hiding this comment.
Usually, we do reverse way when multiple constructors are present. Invoke the second constructor from the first by passing null as the compression config.
constructor (...existing args..., compression-arg) {
init all the members
}
constructor(..existing args...) {
this(existing args, null);
}
- Also address review comments Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
san81
left a comment
There was a problem hiding this comment.
Essential addition to the Kafka buffer. Thanks for adding this.
|
@jeffreyAaron , Did you try the existing configuration? It should tell Kafka to use the zstd compression. |
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @jeffreyAaron for the contribution. I see the goal is to support compress then encrypt. I have some code changes to recommend.
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public enum CompressionType { |
There was a problem hiding this comment.
We should extract this code from the s3 sink.
It has the interfaces that we need.
The CompressionEngine interface can be moved here:
The CompressionType can be consolidated with this: https://github.com/opensearch-project/data-prepper/blob/09a6497b29b4f1b3dbadb2a7b0708b76e4855c28/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/CompressionOption.java
Add a method like this:
public CompressionEngine getCompressionEngine() {
return ...
}
And all the CompressionEngine implementations can be put in here: https://github.com/opensearch-project/data-prepper/tree/09a6497b29b4f1b3dbadb2a7b0708b76e4855c28/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec
There was a problem hiding this comment.
This is a good idea. I'll implement these changes.
| private String customMetricPrefix; | ||
|
|
||
| @JsonProperty("compression") | ||
| private CompressionConfig compressionConfig; |
There was a problem hiding this comment.
Rather than a new config, we should use the existing one (producer_properties/compression_type).
If encryption is not enabled, then let Kafka encrypt.
If encryption is enabled, then do it manually and disable it in Kafka.
There was a problem hiding this comment.
Implemented this logic in KafkaBuffer.java in 947f031
- Extract Compression Engine classes - Implement logic to default to Kafka compression if encryption at rest is Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
|
@dlvenable Requested changes are implemented. Could you please take a look when you have time? |
Yes, this seems to work with noticeable improvement with the compression enabled. 947f031 Should ensure that compression is done before encryption. |
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @jeffreyAaron ! This looks good to me.
…ect#5778) * Implement Kafka buffering compression Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com> Signed-off-by: Jonah Calvo <caljonah@amazon.com>
…roject#5811) * Fix import style and integration test fixes Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com> Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Description
Implements zSTD compression and decompression for the
KafkaBufferclass. Most changes are implemented in theCustomKafkaConsumerandCustomKafkaProducer.Issues Resolved
Resolves #5777
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.