Skip to content

Commit 3d36eee

Browse files
committed
Fix equality checks
Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
1 parent d675ad7 commit 3d36eee

5 files changed

Lines changed: 24 additions & 10 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@
4141
import org.slf4j.MDC;
4242

4343
import java.time.Duration;
44-
import java.util.*;
44+
import java.util.Collection;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Optional;
4548
import java.util.concurrent.ExecutorService;
4649
import java.util.concurrent.Executors;
4750
import java.util.concurrent.TimeUnit;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,24 @@
2222
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2323
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
2424
import org.opensearch.dataprepper.model.buffer.Buffer;
25+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
2526
import org.opensearch.dataprepper.model.event.Event;
2627
import org.opensearch.dataprepper.model.record.Record;
27-
import org.opensearch.dataprepper.model.codec.ByteDecoder;
2828
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
2929
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig;
3030
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter;
3131
import org.opensearch.dataprepper.plugins.kafka.common.PlaintextKafkaDataConfig;
3232
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
3333
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
3434
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
35-
import org.opensearch.dataprepper.plugins.kafka.configuration.*;
35+
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
36+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig;
37+
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
38+
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
39+
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
40+
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
41+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
42+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
3643
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
3744
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
3845
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
2020
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
2121
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
22-
import org.opensearch.dataprepper.plugins.kafka.configuration.*;
22+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
23+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
24+
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
25+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
2326
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
2427
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
2528
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
146146
final MockedConstruction<KafkaCustomProducerFactory> producerFactoryMock =
147147
mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
148148
producerFactory = mock;
149-
when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean())).thenReturn(producer);
149+
when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean(), any())).thenReturn(producer);
150150
});
151151
final MockedConstruction<KafkaCustomConsumerFactory> consumerFactoryMock =
152152
mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> {

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.opensearch.dataprepper.model.event.JacksonEvent;
2929
import org.opensearch.dataprepper.model.record.Record;
3030
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
31-
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
3231
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
32+
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
3333
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
3434
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
3535
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
@@ -38,17 +38,18 @@
3838
import java.lang.reflect.InvocationTargetException;
3939
import java.lang.reflect.Method;
4040
import java.util.UUID;
41+
import java.util.concurrent.Future;
4142

43+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
4244
import static org.junit.jupiter.api.Assertions.assertEquals;
43-
import static org.junit.jupiter.api.Assertions.assertTrue;
4445
import static org.junit.jupiter.api.Assertions.assertThrows;
46+
import static org.junit.jupiter.api.Assertions.assertTrue;
4547
import static org.mockito.ArgumentMatchers.any;
4648
import static org.mockito.Mockito.mock;
4749
import static org.mockito.Mockito.spy;
4850
import static org.mockito.Mockito.verify;
4951
import static org.mockito.Mockito.verifyNoInteractions;
5052
import static org.mockito.Mockito.when;
51-
import java.util.concurrent.Future;
5253

5354

5455
@ExtendWith(MockitoExtension.class)
@@ -112,7 +113,7 @@ public void produceRawDataTest() throws Exception {
112113
final ArgumentCaptor<ProducerRecord> recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
113114
verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class));
114115
assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName());
115-
assertEquals(recordArgumentCaptor.getValue().value(), byteData);
116+
assertArrayEquals((byte[]) recordArgumentCaptor.getValue().value(), byteData);
116117
assertEquals(recordArgumentCaptor.getValue().key(), key);
117118
verifyNoInteractions(numberOfRecordSendErrors);
118119
}
@@ -133,7 +134,7 @@ public void produceRawData_sendError() throws Exception {
133134
final ArgumentCaptor<ProducerRecord> recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
134135
verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class));
135136
assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName());
136-
assertEquals(recordArgumentCaptor.getValue().value(), byteData);
137+
assertArrayEquals((byte[]) recordArgumentCaptor.getValue().value(), byteData);
137138
assertEquals(recordArgumentCaptor.getValue().key(), key);
138139
verify(numberOfRawDataSendErrors).increment();
139140
}

0 commit comments

Comments
 (0)