Skip to content

Enable zSTD Compression for Kafka Buffer - Json type#5778

Merged
san81 merged 19 commits into
opensearch-project:mainfrom
jeffreyAaron:main
Jun 20, 2025
Merged

Enable zSTD Compression for Kafka Buffer - Json type#5778
san81 merged 19 commits into
opensearch-project:mainfrom
jeffreyAaron:main

Conversation

@jeffreyAaron

@jeffreyAaron jeffreyAaron commented Jun 13, 2025

Copy link
Copy Markdown
Contributor

Description

Implements zSTD compression and decompression for the KafkaBuffer class. Most changes are implemented in the CustomKafkaConsumer and CustomKafkaProducer.

Issues Resolved

Resolves #5777

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
@jeffreyAaron jeffreyAaron marked this pull request as ready for review June 13, 2025 18:38
@jeffreyAaron jeffreyAaron marked this pull request as draft June 13, 2025 19:04
@jeffreyAaron jeffreyAaron marked this pull request as ready for review June 13, 2025 19:35

@san81 san81 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. Just left a few comments. Please take a look

Comment thread build.gradle Outdated
implementation platform('com.fasterxml.jackson:jackson-bom:2.17.2')
implementation platform('org.eclipse.jetty:jetty-bom:9.4.53.v20231009')
implementation platform('io.micrometer:micrometer-bom:1.10.5')
implementation 'com.github.luben:zstd-jni:1.5.7-3'

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this in the global build.gradle file as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, I'll remove it.

this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
consumers.forEach(this.executorService::submit);

this.serdeFormat = kafkaBufferConfig.getSerdeFormat();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not seeing any references to this newly introduced member. Not sure if we need this here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem necessary, I will remove it.

InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
InputStream byteInputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
InputStream inputStream;
if (compressionEnabled) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka supports multiple compression algorithms: lz4, gzip, snappy, zstd. Do you think just using one boolean parameter is sufficient for configuration? There are also parameters for tuning the algorithms.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the compression stage should be invisible to the user and the flag should be removed altogether. The data outputted is not modified in anyway and makes no difference (other than performance) on the user end. I will remove the config flag in a future commit on this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KarstenSchnitter that's valid point. But we will not be using Kafka's compression in case of Kafka Buffer, we will be using compression outside the kafka so that we can do compression and encryption in that order. We are starting with ZSTD and may add other options in future.

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Comment on lines +74 to +79
public KafkaCustomConsumerFactory(SerializationFactory serializationFactory, AwsCredentialsSupplier awsCredentialsSupplier, boolean isCompressionEnabled) {
this.serializationFactory = serializationFactory;
this.awsCredentialsSupplier = awsCredentialsSupplier;
}


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this method anymore - right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, will remove.

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>

@kkondaka kkondaka left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good except for the default config. Please test with KMS keys too.

}

public CompressionConfig getCompressionConfig() {
if (Objects.isNull(compressionConfig)) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to change default behavior. We should only enable compression when compression config is explicitly provided in the YAML

this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis());
}

public KafkaCustomConsumer(final KafkaConsumer consumer,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, we do reverse way when multiple constructors are present. Invoke the second constructor from the first by passing null as the compression config.

constructor (...existing args..., compression-arg) {
   init all the members
}

constructor(..existing args...) {
    this(existing args, null);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

- Also address review comments
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
san81
san81 previously approved these changes Jun 19, 2025

@san81 san81 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essential addition to the Kafka buffer. Thanks for adding this.

@dlvenable

Copy link
Copy Markdown
Member

@jeffreyAaron , Did you try the existing configuration? It should tell Kafka to use the zstd compression.

buffer:
  kafka:
    producer_properties:
      compression_type: zstd

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jeffreyAaron for the contribution. I see the goal is to support compress then encrypt. I have some code changes to recommend.

import java.util.Map;
import java.util.stream.Collectors;

public enum CompressionType {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. I'll implement these changes.

private String customMetricPrefix;

@JsonProperty("compression")
private CompressionConfig compressionConfig;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a new config, we should use the existing one (producer_properties/compression_type).

If encryption is not enabled, then let Kafka encrypt.

If encryption is enabled, then do it manually and disable it in Kafka.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this logic in KafkaBuffer.java in 947f031

- Extract Compression Engine classes
- Implement logic to default to Kafka compression if encryption at rest is

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
@jeffreyAaron

Copy link
Copy Markdown
Contributor Author

@dlvenable Requested changes are implemented. Could you please take a look when you have time?

@san81 san81 requested a review from dlvenable June 20, 2025 19:44
@jeffreyAaron

Copy link
Copy Markdown
Contributor Author

@jeffreyAaron , Did you try the existing configuration? It should tell Kafka to use the zstd compression.

buffer:
  kafka:
    producer_properties:
      compression_type: zstd

Yes, this seems to work with noticeable improvement with the compression enabled.

947f031 Should ensure that compression is done before encryption.

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jeffreyAaron ! This looks good to me.

@san81 san81 merged commit 691a737 into opensearch-project:main Jun 20, 2025
5 of 9 checks passed
san81 pushed a commit that referenced this pull request Jun 23, 2025
* Fix import style and integration test fixes

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
JonahCalvo pushed a commit to JonahCalvo/os-data-prepper that referenced this pull request Jul 17, 2025
…ect#5778)

* Implement Kafka buffering compression

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
JonahCalvo pushed a commit to JonahCalvo/os-data-prepper that referenced this pull request Jul 17, 2025
…roject#5811)

* Fix import style and integration test fixes

Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable Compression for Kafka Buffer - Json type

5 participants