Skip to content

Commit 5e7cc30

Browse files
jeffreyAaronJonah Calvo
authored andcommitted
Fix style and build errors from opensearch-project#5778 (opensearch-project#5811)
* Fix import style and integration test fixes Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com> Signed-off-by: Jonah Calvo <caljonah@amazon.com>
1 parent b4686e1 commit 5e7cc30

6 files changed

Lines changed: 32 additions & 22 deletions

File tree

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.fasterxml.jackson.core.JsonParseException;
99
import com.fasterxml.jackson.databind.ObjectMapper;
1010
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
11-
import com.github.luben.zstd.Zstd;
1211
import com.google.protobuf.ByteString;
1312
import org.apache.commons.lang3.RandomStringUtils;
1413
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -119,8 +118,7 @@ void setUp() {
119118
final Map<String, Object> bufferConfigMap = Map.of(
120119
"topics", List.of(topicConfigMap),
121120
"bootstrap_servers", List.of(bootstrapServersCommaDelimited),
122-
"encryption", Map.of("type", "none"),
123-
"compression", Map.of("type", "zstd")
121+
"encryption", Map.of("type", "none")
124122
);
125123
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);
126124

@@ -322,7 +320,7 @@ void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutExc
322320

323321
final byte[] innerData = bufferData.getData().toByteArray();
324322

325-
final Map<String, Object> actualEventData = objectMapper.readValue(decompress(innerData), Map.class);
323+
final Map<String, Object> actualEventData = objectMapper.readValue(innerData, Map.class);
326324
assertThat(actualEventData, notNullValue());
327325
assertThat(actualEventData, hasKey("message"));
328326
assertThat(actualEventData.get("message"), equalTo(record.getData().get("message", String.class)));
@@ -362,7 +360,7 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep
362360

363361
final byte[] innerData = bufferData.getData().toByteArray();
364362

365-
assertThat(decompress(innerData), equalTo(writtenBytes));
363+
assertThat(innerData, equalTo(writtenBytes));
366364
}
367365

368366
@Nested
@@ -448,8 +446,8 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T
448446

449447
byte[] innerData = bufferData.getData().toByteArray();
450448

451-
assertThat(decompress(innerData), notNullValue());
452-
assertThrows(JsonParseException.class, () -> objectMapper.readValue(decompress(innerData), Map.class));
449+
assertThat(innerData, notNullValue());
450+
assertThrows(JsonParseException.class, () -> objectMapper.readValue(innerData, Map.class));
453451

454452
final byte[] deserializedBytes = decryptCipher.doFinal(innerData);
455453

@@ -494,10 +492,10 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr
494492

495493
final byte[] innerData = bufferData.getData().toByteArray();
496494

497-
assertThat(decompress(innerData), notNullValue());
498-
assertThat(decompress(innerData), not(equalTo(writtenBytes)));
495+
assertThat(innerData, notNullValue());
496+
assertThat(innerData, not(equalTo(writtenBytes)));
499497

500-
final byte[] decryptedBytes = decryptCipher.doFinal(decompress(innerData));
498+
final byte[] decryptedBytes = decryptCipher.doFinal(innerData);
501499

502500
assertThat(decryptedBytes, equalTo(writtenBytes));
503501
}
@@ -556,7 +554,4 @@ private Record<Event> createRecord() {
556554
return new Record<>(event);
557555
}
558556

559-
private byte[] decompress(byte[] input) {
560-
return Zstd.decompress(input, (int) Zstd.getFrameContentSize(input));
561-
}
562557
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
3131
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
3232
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
33-
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
3433
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
3534
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
3635
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
@@ -42,7 +41,10 @@
4241
import org.slf4j.MDC;
4342

4443
import java.time.Duration;
45-
import java.util.*;
44+
import java.util.Collection;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Optional;
4648
import java.util.concurrent.ExecutorService;
4749
import java.util.concurrent.Executors;
4850
import java.util.concurrent.TimeUnit;
@@ -80,8 +82,10 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
8082
CompressionOption manualCompressionConfig = CompressionOption.NONE;
8183
if (kafkaBufferConfig.getTopic().encryptionAtRestEnabled()) {
8284
// If encryption is enabled, disable Kafka built-in compression and do it manually.
83-
manualCompressionConfig = CompressionOption.fromOptionValue(kafkaBufferConfig.getKafkaProducerProperties().getCompressionType());
84-
kafkaBufferConfig.getKafkaProducerProperties().setCompressionType(CompressionOption.NONE.name().toLowerCase());
85+
if (kafkaBufferConfig.getKafkaProducerProperties() != null) {
86+
manualCompressionConfig = CompressionOption.fromOptionValue(kafkaBufferConfig.getKafkaProducerProperties().getCompressionType());
87+
kafkaBufferConfig.getKafkaProducerProperties().setCompressionType(CompressionOption.NONE.name().toLowerCase());
88+
}
8589
}
8690

8791
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory(), encryptionSupplier);

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,14 @@
3232
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
3333
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
3434
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
35-
import org.opensearch.dataprepper.plugins.kafka.configuration.*;
35+
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
36+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
37+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig;
38+
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
39+
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
40+
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
41+
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
42+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
3643
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
3744
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
3845
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
2020
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
2121
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
22-
import org.opensearch.dataprepper.plugins.kafka.configuration.*;
22+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
23+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
24+
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
25+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
2326
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
2427
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
2528
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
146146
final MockedConstruction<KafkaCustomProducerFactory> producerFactoryMock =
147147
mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
148148
producerFactory = mock;
149-
when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean())).thenReturn(producer);
149+
when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean(), any())).thenReturn(producer);
150150
});
151151
final MockedConstruction<KafkaCustomConsumerFactory> consumerFactoryMock =
152152
mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> {

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.junit.jupiter.api.Assertions.assertEquals;
4343
import static org.junit.jupiter.api.Assertions.assertTrue;
4444
import static org.junit.jupiter.api.Assertions.assertThrows;
45+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
4546
import static org.mockito.ArgumentMatchers.any;
4647
import static org.mockito.Mockito.mock;
4748
import static org.mockito.Mockito.spy;
@@ -112,7 +113,7 @@ public void produceRawDataTest() throws Exception {
112113
final ArgumentCaptor<ProducerRecord> recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
113114
verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class));
114115
assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName());
115-
assertEquals(recordArgumentCaptor.getValue().value(), byteData);
116+
assertArrayEquals((byte[]) recordArgumentCaptor.getValue().value(), byteData);
116117
assertEquals(recordArgumentCaptor.getValue().key(), key);
117118
verifyNoInteractions(numberOfRecordSendErrors);
118119
}
@@ -133,7 +134,7 @@ public void produceRawData_sendError() throws Exception {
133134
final ArgumentCaptor<ProducerRecord> recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
134135
verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class));
135136
assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName());
136-
assertEquals(recordArgumentCaptor.getValue().value(), byteData);
137+
assertArrayEquals((byte[]) recordArgumentCaptor.getValue().value(), byteData);
137138
assertEquals(recordArgumentCaptor.getValue().key(), key);
138139
verify(numberOfRawDataSendErrors).increment();
139140
}

0 commit comments

Comments
 (0)