Skip to content

Commit 9d41cc9

Browse files
authored
Add ZSTD compression engine for Kafka Buffer (#5844)
* Add ZSTD compression engine Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
1 parent 54f8e29 commit 9d41cc9

7 files changed

Lines changed: 72 additions & 2 deletions

File tree

data-prepper-plugins/common/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
implementation libs.commons.io
1414
implementation 'software.amazon.awssdk:s3'
1515
implementation 'software.amazon.awssdk:acm'
16+
implementation 'com.github.luben:zstd-jni:1.5.7-3'
1617
implementation libs.commons.compress
1718
implementation libs.commons.lang3
1819
implementation libs.bouncycastle.bcprov

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/CompressionOption.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public enum CompressionOption {
1717
NONE("none"),
1818
GZIP("gzip"),
1919
SNAPPY("snappy"),
20+
ZSTD("zstd"),
2021
AUTOMATIC("automatic");
2122

2223
private static final Map<String, CompressionOption> OPTIONS_MAP = Arrays.stream(CompressionOption.values())
@@ -28,13 +29,15 @@ public enum CompressionOption {
2829
private static final Map<String, DecompressionEngine> DECOMPRESSION_ENGINE_MAP = Map.of(
2930
"none", new NoneDecompressionEngine(),
3031
"gzip", new GZipDecompressionEngine(),
31-
"snappy", new SnappyDecompressionEngine()
32+
"snappy", new SnappyDecompressionEngine(),
33+
"zstd", new ZstdDecompressionEngine()
3234
);
3335

3436
private static final Map<String, CompressionEngine> COMPRESSION_ENGINE_MAP = Map.of(
3537
"none", new NoneCompressionEngine(),
3638
"gzip", new GZipCompressionEngine(),
37-
"snappy", new SnappyCompressionEngine()
39+
"snappy", new SnappyCompressionEngine(),
40+
"zstd", new ZstdCompressionEngine()
3841
);
3942

4043
private final String option;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.opensearch.dataprepper.plugins.codec;
2+
3+
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
4+
import org.opensearch.dataprepper.model.codec.CompressionEngine;
5+
6+
import java.io.IOException;
7+
import java.io.OutputStream;
8+
9+
public class ZstdCompressionEngine implements CompressionEngine {
10+
@Override
11+
public OutputStream createOutputStream(OutputStream outputStream) throws IOException {
12+
return new ZstdCompressorOutputStream(outputStream);
13+
}
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.opensearch.dataprepper.plugins.codec;
2+
3+
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
4+
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
5+
6+
import java.io.IOException;
7+
import java.io.InputStream;
8+
9+
public class ZstdDecompressionEngine implements DecompressionEngine {
10+
@Override
11+
public InputStream createInputStream(InputStream responseInputStream) throws IOException {
12+
return new ZstdCompressorInputStream(responseInputStream);
13+
}
14+
}

data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/codec/CompressionOptionTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,28 @@ void testFromFileName_default() {
3939
}
4040

4141

42+
43+
@ParameterizedTest
44+
@EnumSource(value = CompressionOption.class, names = {"AUTOMATIC"}, mode = EnumSource.Mode.EXCLUDE)
45+
void testCompressionDecompression(final CompressionOption option) throws Exception {
46+
String testData = "This is test data for compression and decompression";
47+
byte[] originalBytes = testData.getBytes();
48+
49+
java.io.ByteArrayOutputStream compressedOutput = new java.io.ByteArrayOutputStream();
50+
java.io.OutputStream compressor = option.getCompressionEngine().createOutputStream(compressedOutput);
51+
compressor.write(originalBytes);
52+
compressor.close();
53+
54+
byte[] compressedBytes = compressedOutput.toByteArray();
55+
java.io.InputStream decompressor = option.getDecompressionEngine().createInputStream(new java.io.ByteArrayInputStream(compressedBytes));
56+
byte[] decompressedBytes = decompressor.readAllBytes();
57+
58+
assertThat(decompressedBytes, is(originalBytes));
59+
}
60+
61+
62+
63+
64+
65+
4266
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public void produceRawData(final byte[] bytes, final String key) throws Exceptio
121121
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
122122
OutputStream compressedOutputStream = compressionConfig.getCompressionEngine().createOutputStream(byteArrayOutputStream);
123123
compressedOutputStream.write(bytes);
124+
compressedOutputStream.close();
124125
send(topicName, key, byteArrayOutputStream.toByteArray()).get();
125126

126127
topicMetrics.update(producer);
@@ -175,6 +176,7 @@ private void publishJsonMessageAsBytes(Record<Event> record, String key) throws
175176
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
176177
OutputStream compressedOutputStream = compressionConfig.getCompressionEngine().createOutputStream(byteArrayOutputStream);
177178
compressedOutputStream.write(bytes);
179+
compressedOutputStream.close();
178180

179181
send(topicName, key, byteArrayOutputStream.toByteArray());
180182
}

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.dataprepper.model.plugin.PluginFactory;
2727
import org.opensearch.dataprepper.model.record.Record;
2828
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
29+
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
2930
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
3031
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
3132
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
@@ -513,4 +514,15 @@ void test_customCompressionOption_with_encryption_disabled() {
513514

514515
assertThat(kafkaBuffer.getCustomCompressionOption().name(), equalTo("NONE"));
515516
}
517+
518+
@Test
519+
void test_compressionOption_supports_all_kafka_compression_types() {
520+
String[] kafkaCompressionTypes = {"none", "gzip", "snappy", "zstd"};
521+
522+
for (String compressionType : kafkaCompressionTypes) {
523+
CompressionOption option = CompressionOption.fromOptionValue(compressionType);
524+
assertThat("CompressionOption should support Kafka compression type to ensure that KafkaBuffer compresses data properly when encryption is enabled: " + compressionType,
525+
option, org.hamcrest.Matchers.notNullValue());
526+
}
527+
}
516528
}

0 commit comments

Comments
 (0)