Skip to content

Commit 8d44ee7

Browse files
committed
Fix bugs that caused test to fail
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
1 parent 3e6d9cd commit 8d44ee7

3 files changed

Lines changed: 4 additions & 11 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
8383
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
8484
final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName());
8585
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false);
86-
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
86+
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier, kafkaBufferConfig.getCompressionEnabled());
8787
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
8888
this.shutdownInProgress = new AtomicBoolean(false);
8989
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.fasterxml.jackson.core.JsonParser;
1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import com.github.luben.zstd.Zstd;
1213
import com.github.luben.zstd.ZstdBufferDecompressingStream;
1314
import com.github.luben.zstd.ZstdInputStream;
1415
import com.google.common.annotations.VisibleForTesting;
@@ -562,6 +563,7 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
562563
InputStream byteInputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
563564
InputStream inputStream;
564565
if (compressionEnabled) {
566+
inputStream = new ZstdInputStream(byteInputStream);
565567
} else {
566568
inputStream = byteInputStream;
567569
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,7 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
9696
final SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
9797
return new KafkaCustomProducer(producer,
9898
kafkaProducerConfig, dlqSink,
99-
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService);
100-
}
101-
102-
public KafkaCustomProducer createCompressedProducer(final KafkaProducerConfig kafkaProducerConfig,
103-
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
104-
final DLQSink dlqSink,
105-
final boolean topicNameInMetrics) {
106-
KafkaCustomProducer kafkaCustomProducer = createProducer(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics);
107-
kafkaCustomProducer.setCompressionEnabled(compressionEnabled);
108-
return kafkaCustomProducer;
99+
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService, compressionEnabled);
109100
}
110101

111102
private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {

0 commit comments

Comments
 (0)