From 16566ac025b4b6d6f0d5855456f6e6fbd6d8fe0c Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Mon, 23 Jun 2025 14:29:05 -0700 Subject: [PATCH 1/7] Add tests to check compression Signed-off-by: Jeffrey Aaron Jeyasingh --- .../plugins/kafka/buffer/KafkaBufferIT.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 1ad76c614a..fb2aefea49 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -59,6 +59,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; @@ -74,6 +75,7 @@ public class KafkaBufferIT { private PluginSetting pluginSetting; private KafkaBufferConfig kafkaBufferConfig; + private KafkaBufferConfig kafkaBufferCompressionConfig; @Mock private AcknowledgementSetManager acknowledgementSetManager; @Mock @@ -112,7 +114,15 @@ void setUp() { topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class); + final Map topicConfigMapCompression = Map.of( + "name", topicName, + "group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6), + "create_topic", true, + "encryption_key", "6fib8P/ML7Lh7lUEHCFYCt+bschigjNwmEZUctkP5dw=" // sample key + ); + bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers"); + bootstrapServersCommaDelimited = "localhost:9092"; LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited); @@ -124,6 +134,15 @@ void setUp() { ); kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class); + final Map bufferCompressionConfigMap = Map.of( + "topics", List.of(topicConfigMapCompression), + "bootstrap_servers", List.of(bootstrapServersCommaDelimited), + "encryption", Map.of("type", "none"), + "producer_properties", Map.of("compression_type", "zstd") + ); + + kafkaBufferCompressionConfig = objectMapper.convertValue(bufferCompressionConfigMap, KafkaBufferConfig.class); + byteDecoder = null; } @@ -135,6 +154,10 @@ private KafkaBuffer createObjectUnderTest() { return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null, encryptionSupplier); } + private KafkaBuffer createObjectUnderTestWithCompression() { + return new KafkaBuffer(pluginSetting, kafkaBufferCompressionConfig, acknowledgementSetManager, null, null, null, encryptionSupplier); + } + @Test void write_and_read() throws TimeoutException { KafkaBuffer objectUnderTest = createObjectUnderTest(); @@ -162,6 +185,33 @@ void write_and_read() throws TimeoutException { assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0)); } + @Test + void write_and_read_compression() throws TimeoutException { + KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression(); + + Record record = createRecord(); + objectUnderTest.write(record, 1_000); + + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not. + //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata())); + assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); + assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0)); + assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(1)); + objectUnderTest.checkpoint(readResult.getValue()); + assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0)); + assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0)); + } + @Test void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldException, IllegalAccessException { KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties(); @@ -288,6 +338,32 @@ void writeBytes_and_read() throws Exception { assertThat(onlyResult.getData().toMap(), equalTo(inputDataMap)); } + @Test + void writeBytes_and_read_with_compression() throws Exception { + byteDecoder = new JsonDecoder(); + + final KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression(); + + final Map inputDataMap = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final byte[] bytes = objectMapper.writeValueAsBytes(inputDataMap); + final String key = UUID.randomUUID().toString(); + objectUnderTest.writeBytes(bytes, key, 1_000); + + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not. + //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata())); + assertThat(onlyResult.getData().toMap(), equalTo(inputDataMap)); + } + @Test void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutException, IOException { final KafkaBuffer objectUnderTest = createObjectUnderTest(); From 135a574b2bdf438502d8503260d5b58fb0f9895d Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Mon, 23 Jun 2025 14:52:41 -0700 Subject: [PATCH 2/7] Fix style Signed-off-by: Jeffrey Aaron Jeyasingh --- .../dataprepper/plugins/kafka/buffer/KafkaBufferIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index fb2aefea49..b1c564fca4 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -59,7 +59,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; From b656122ff3bdc41f3129e2a593cced11405e18e2 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Mon, 23 Jun 2025 16:43:27 -0700 Subject: [PATCH 3/7] Fixed Encrypted tests Signed-off-by: Jeffrey Aaron Jeyasingh --- .../plugins/kafka/buffer/KafkaBufferIT.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index b1c564fca4..b6b3bff9a0 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -448,6 +448,31 @@ class Encrypted { @BeforeEach void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException { + random = new Random(); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + acknowledgementSet = mock(AcknowledgementSet.class); + lenient().doAnswer((a) -> null).when(acknowledgementSet).complete(); + lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + + when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); + + topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5); + + Map topicConfigMap = new java.util.HashMap<>(Map.of( + "name", topicName, + "group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6), + "create_topic", true + )); + + topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class); + + final Map bufferConfigMap = new java.util.HashMap<>(Map.of( + "topics", List.of(topicConfigMap), + "bootstrap_servers", List.of(bootstrapServersCommaDelimited), + "encryption", Map.of("type", "none") + )); + final KeyGenerator aesKeyGenerator = KeyGenerator.getInstance("AES"); aesKeyGenerator.init(256); final SecretKey secretKey = aesKeyGenerator.generateKey(); @@ -459,11 +484,11 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded()); aesKey = new String(base64Bytes); - final Map topicConfigMap = objectMapper.convertValue(topicConfig, Map.class); topicConfigMap.put("encryption_key", aesKey); - final Map bufferConfigMap = objectMapper.convertValue(kafkaBufferConfig, Map.class); bufferConfigMap.put("topics", List.of(topicConfigMap)); kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class); + + byteDecoder = null; } @Test From 6d37671505a1869ddb7b3b1f4e7d840aad3a96d3 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Wed, 2 Jul 2025 12:16:51 -0700 Subject: [PATCH 4/7] Remove line used for debugging Signed-off-by: Jeffrey Aaron Jeyasingh --- .../dataprepper/plugins/kafka/buffer/KafkaBufferIT.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index b6b3bff9a0..7b7e5f5c1e 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -56,14 +56,10 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; @@ -121,7 +117,6 @@ void setUp() { ); bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers"); - bootstrapServersCommaDelimited = "localhost:9092"; LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited); From edc852317b2647868c5e496a7e3e9d53bb2b1853 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Wed, 2 Jul 2025 12:18:50 -0700 Subject: [PATCH 5/7] Fix imports This reverts commit 6d37671505a1869ddb7b3b1f4e7d840aad3a96d3. Signed-off-by: Jeffrey Aaron Jeyasingh --- .../dataprepper/plugins/kafka/buffer/KafkaBufferIT.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 7b7e5f5c1e..578f7c8760 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -56,10 +56,14 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; From d675ad734361a6ba1521b3b8a39e64121595a23e Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Wed, 2 Jul 2025 15:37:57 -0700 Subject: [PATCH 6/7] Remove unused import Signed-off-by: Jeffrey Aaron Jeyasingh --- .../opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index 9915d62831..1a768ca612 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -30,7 +30,6 @@ import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory; -import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; From 3d36eeef084773c73818abc5a92069e93b4767d6 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Jeyasingh Date: Wed, 2 Jul 2025 16:13:22 -0700 Subject: [PATCH 7/7] Fix equality checks Signed-off-by: Jeffrey Aaron Jeyasingh --- .../dataprepper/plugins/kafka/buffer/KafkaBuffer.java | 5 ++++- .../kafka/consumer/KafkaCustomConsumerFactory.java | 11 +++++++++-- .../kafka/producer/KafkaCustomProducerFactory.java | 5 ++++- .../plugins/kafka/buffer/KafkaBufferTest.java | 2 +- .../kafka/producer/KafkaCustomProducerTest.java | 11 ++++++----- 5 files changed, 24 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index 1a768ca612..b4d128fd74 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -41,7 +41,10 @@ import org.slf4j.MDC; import java.time.Duration; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index d8d217e2a9..bd6d23608e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -22,9 +22,9 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.breaker.CircuitBreaker; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter; @@ -32,7 +32,14 @@ import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext; import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; -import org.opensearch.dataprepper.plugins.kafka.configuration.*; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java index 20b105ac25..b1fba3521e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java @@ -19,7 +19,10 @@ import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext; import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; -import org.opensearch.dataprepper.plugins.kafka.configuration.*; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; import org.opensearch.dataprepper.plugins.kafka.service.TopicService; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 93ccb6d5cb..3076a18e14 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -146,7 +146,7 @@ public KafkaBuffer createObjectUnderTest(final List consume final MockedConstruction producerFactoryMock = mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> { producerFactory = mock; - when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean())).thenReturn(producer); + when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean(), any())).thenReturn(producer); }); final MockedConstruction consumerFactoryMock = mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index 63d937e2ca..55a33a4b49 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -28,8 +28,8 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics; @@ -38,17 +38,18 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.UUID; +import java.util.concurrent.Future; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import java.util.concurrent.Future; @ExtendWith(MockitoExtension.class) @@ -112,7 +113,7 @@ public void produceRawDataTest() throws Exception { final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class); verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class)); assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName()); - assertEquals(recordArgumentCaptor.getValue().value(), byteData); + assertArrayEquals((byte[]) recordArgumentCaptor.getValue().value(), byteData); assertEquals(recordArgumentCaptor.getValue().key(), key); verifyNoInteractions(numberOfRecordSendErrors); } @@ -133,7 +134,7 @@ public void produceRawData_sendError() throws Exception { final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class); verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class)); assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName()); - assertEquals(recordArgumentCaptor.getValue().value(), byteData); + assertArrayEquals((byte[]) recordArgumentCaptor.getValue().value(), byteData); assertEquals(recordArgumentCaptor.getValue().key(), key); verify(numberOfRawDataSendErrors).increment(); }