Skip to content

Commit b4686e1

Browse files
jeffreyAaronJonah Calvo
authored andcommitted
Enable zSTD Compression for Kafka Buffer - Json type (opensearch-project#5778)
* Implement Kafka buffering compression Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com> Signed-off-by: Jonah Calvo <caljonah@amazon.com>
1 parent 8feadaa commit b4686e1

26 files changed

Lines changed: 146 additions & 52 deletions

File tree

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java renamed to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/CompressionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.dataprepper.plugins.sink.s3.compression;
6+
package org.opensearch.dataprepper.model.codec;
77

88
import java.io.IOException;
99
import java.io.OutputStream;

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/CompressionOption.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.codec;
77

88
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import org.opensearch.dataprepper.model.codec.CompressionEngine;
910
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
1011

1112
import java.util.Arrays;
@@ -30,6 +31,12 @@ public enum CompressionOption {
3031
"snappy", new SnappyDecompressionEngine()
3132
);
3233

34+
private static final Map<String, CompressionEngine> COMPRESSION_ENGINE_MAP = Map.of(
35+
"none", new NoneCompressionEngine(),
36+
"gzip", new GZipCompressionEngine(),
37+
"snappy", new SnappyCompressionEngine()
38+
);
39+
3340
private final String option;
3441

3542
CompressionOption(final String option) {
@@ -46,6 +53,10 @@ public static CompressionOption fromFileName(final String fileName) {
4653
}
4754
}
4855

56+
public CompressionEngine getCompressionEngine() {
57+
return COMPRESSION_ENGINE_MAP.getOrDefault(this.option, new NoneCompressionEngine());
58+
}
59+
4960
public DecompressionEngine getDecompressionEngine() {
5061
return DECOMPRESSION_ENGINE_MAP.getOrDefault(this.option, new NoneDecompressionEngine());
5162
}

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java renamed to data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/GZipCompressionEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.dataprepper.plugins.sink.s3.compression;
6+
package org.opensearch.dataprepper.plugins.codec;
77

88
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
9+
import org.opensearch.dataprepper.model.codec.CompressionEngine;
910

1011
import java.io.IOException;
1112
import java.io.OutputStream;
1213

13-
class GZipCompressionEngine implements CompressionEngine {
14+
public class GZipCompressionEngine implements CompressionEngine {
1415
@Override
1516
public OutputStream createOutputStream(final OutputStream outputStream) throws IOException {
1617
return new GzipCompressorOutputStream(outputStream);

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java renamed to data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/NoneCompressionEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.dataprepper.plugins.sink.s3.compression;
6+
package org.opensearch.dataprepper.plugins.codec;
7+
8+
import org.opensearch.dataprepper.model.codec.CompressionEngine;
79

810
import java.io.OutputStream;
911

10-
class NoneCompressionEngine implements CompressionEngine {
12+
public class NoneCompressionEngine implements CompressionEngine {
1113
@Override
1214
public OutputStream createOutputStream(final OutputStream outputStream) {
1315
return outputStream;

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngine.java renamed to data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/SnappyCompressionEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.dataprepper.plugins.sink.s3.compression;
6+
package org.opensearch.dataprepper.plugins.codec;
77

8+
import org.opensearch.dataprepper.model.codec.CompressionEngine;
89
import org.xerial.snappy.SnappyOutputStream;
910

1011
import java.io.IOException;
1112
import java.io.OutputStream;
1213

13-
class SnappyCompressionEngine implements CompressionEngine {
14+
public class SnappyCompressionEngine implements CompressionEngine {
1415
@Override
1516
public OutputStream createOutputStream(final OutputStream outputStream) throws IOException {
1617
return new SnappyOutputStream(outputStream);

data-prepper-plugins/kafka-plugins/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ dependencies {
5353
implementation 'commons-collections:commons-collections:3.2.2'
5454
implementation 'software.amazon.awssdk:s3'
5555
implementation 'software.amazon.awssdk:apache-client'
56+
implementation 'com.github.luben:zstd-jni:1.5.7-3'
5657

5758
testImplementation 'org.yaml:snakeyaml:2.2'
5859
testImplementation testLibs.spring.test

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.fasterxml.jackson.core.JsonParseException;
99
import com.fasterxml.jackson.databind.ObjectMapper;
1010
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
11+
import com.github.luben.zstd.Zstd;
1112
import com.google.protobuf.ByteString;
1213
import org.apache.commons.lang3.RandomStringUtils;
1314
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -118,7 +119,8 @@ void setUp() {
118119
final Map<String, Object> bufferConfigMap = Map.of(
119120
"topics", List.of(topicConfigMap),
120121
"bootstrap_servers", List.of(bootstrapServersCommaDelimited),
121-
"encryption", Map.of("type", "none")
122+
"encryption", Map.of("type", "none"),
123+
"compression", Map.of("type", "zstd")
122124
);
123125
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);
124126

@@ -320,7 +322,7 @@ void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutExc
320322

321323
final byte[] innerData = bufferData.getData().toByteArray();
322324

323-
final Map<String, Object> actualEventData = objectMapper.readValue(innerData, Map.class);
325+
final Map<String, Object> actualEventData = objectMapper.readValue(decompress(innerData), Map.class);
324326
assertThat(actualEventData, notNullValue());
325327
assertThat(actualEventData, hasKey("message"));
326328
assertThat(actualEventData.get("message"), equalTo(record.getData().get("message", String.class)));
@@ -360,7 +362,7 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep
360362

361363
final byte[] innerData = bufferData.getData().toByteArray();
362364

363-
assertThat(innerData, equalTo(writtenBytes));
365+
assertThat(decompress(innerData), equalTo(writtenBytes));
364366
}
365367

366368
@Nested
@@ -446,8 +448,8 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T
446448

447449
byte[] innerData = bufferData.getData().toByteArray();
448450

449-
assertThat(innerData, notNullValue());
450-
assertThrows(JsonParseException.class, () -> objectMapper.readValue(innerData, Map.class));
451+
assertThat(decompress(innerData), notNullValue());
452+
assertThrows(JsonParseException.class, () -> objectMapper.readValue(decompress(innerData), Map.class));
451453

452454
final byte[] deserializedBytes = decryptCipher.doFinal(innerData);
453455

@@ -492,10 +494,10 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr
492494

493495
final byte[] innerData = bufferData.getData().toByteArray();
494496

495-
assertThat(innerData, notNullValue());
496-
assertThat(innerData, not(equalTo(writtenBytes)));
497+
assertThat(decompress(innerData), notNullValue());
498+
assertThat(decompress(innerData), not(equalTo(writtenBytes)));
497499

498-
final byte[] decryptedBytes = decryptCipher.doFinal(innerData);
500+
final byte[] decryptedBytes = decryptCipher.doFinal(decompress(innerData));
499501

500502
assertThat(decryptedBytes, equalTo(writtenBytes));
501503
}
@@ -553,4 +555,8 @@ private Record<Event> createRecord() {
553555
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
554556
return new Record<>(event);
555557
}
558+
559+
private byte[] decompress(byte[] input) {
560+
return Zstd.decompress(input, (int) Zstd.getFrameContentSize(input));
561+
}
556562
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,8 @@ public boolean IsEncryptionAtRestSettingValid() {
257257
}
258258
return true;
259259
}
260+
261+
public boolean encryptionAtRestEnabled() {
262+
return encryptionId != null || encryptionKey != null || kmsConfig != null;
263+
}
260264
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
2323
import org.apache.kafka.common.errors.RecordTooLargeException;
2424
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
25+
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
2526
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
2627
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
2728
import org.opensearch.dataprepper.plugins.kafka.buffer.serialization.BufferSerializationFactory;
2829
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
2930
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
3031
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
3132
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
33+
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
3234
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
3335
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
3436
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
@@ -40,10 +42,7 @@
4042
import org.slf4j.MDC;
4143

4244
import java.time.Duration;
43-
import java.util.Collection;
44-
import java.util.List;
45-
import java.util.Map;
46-
import java.util.Optional;
45+
import java.util.*;
4746
import java.util.concurrent.ExecutorService;
4847
import java.util.concurrent.Executors;
4948
import java.util.concurrent.TimeUnit;
@@ -77,22 +76,29 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
7776
final CircuitBreaker circuitBreaker,
7877
final EncryptionSupplier encryptionSupplier) {
7978
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()+"buffer"), pluginSetting.getPipelineName());
79+
80+
CompressionOption manualCompressionConfig = CompressionOption.NONE;
81+
if (kafkaBufferConfig.getTopic().encryptionAtRestEnabled()) {
82+
// If encryption is enabled, disable Kafka built-in compression and do it manually.
83+
manualCompressionConfig = CompressionOption.fromOptionValue(kafkaBufferConfig.getKafkaProducerProperties().getCompressionType());
84+
kafkaBufferConfig.getKafkaProducerProperties().setCompressionType(CompressionOption.NONE.name().toLowerCase());
85+
}
86+
8087
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory(), encryptionSupplier);
8188
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory());
8289
this.byteDecoder = byteDecoder;
8390
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
8491
final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName());
85-
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false);
92+
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false, manualCompressionConfig);
8693
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
8794
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
8895
this.shutdownInProgress = new AtomicBoolean(false);
8996
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
9097
this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
91-
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
98+
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker, manualCompressionConfig);
9299
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
93100
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
94101
consumers.forEach(this.executorService::submit);
95-
96102
this.drainTimeout = kafkaBufferConfig.getDrainTimeout();
97103
}
98104

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
2222
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
2323

24+
2425
import java.time.Duration;
2526
import java.util.List;
2627
import java.util.Objects;
@@ -57,7 +58,6 @@ class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
5758
@JsonProperty("custom_metric_prefix")
5859
private String customMetricPrefix;
5960

60-
6161
public List<String> getBootstrapServers() {
6262
if (Objects.nonNull(bootstrapServers)) {
6363
return bootstrapServers;
@@ -148,4 +148,5 @@ public Duration getDrainTimeout() {
148148
public Optional<String> getCustomMetricPrefix() {
149149
return Optional.ofNullable(customMetricPrefix);
150150
}
151+
151152
}

0 commit comments

Comments
 (0)