diff --git a/.github/workflows/kafka-plugin-integration-tests.yml b/.github/workflows/kafka-plugin-integration-tests.yml index 25a1481339..4c604456e7 100644 --- a/.github/workflows/kafka-plugin-integration-tests.yml +++ b/.github/workflows/kafka-plugin-integration-tests.yml @@ -66,7 +66,8 @@ jobs: -Dtests.kafka.bootstrap_servers=localhost:9092 \ -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin \ -Dtests.kafka.kms_key=alias/DataPrepperTesting \ - --tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT + --tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT \ + --tests KafkaSinkJsonTypeIT --tests KafkaSinkPlainTextTypeIT - name: Upload Unit Test Results if: always() diff --git a/.gitignore b/.gitignore index bb53f3e3b0..cda6680ebd 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,11 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml # Eclipse/IDE compiled output **/bin/ +*.project +*.classpath +*.factorypath +**/.settings/** +*.prefs # Development tools .DS_Store diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java index 69dc2c9d5e..ce91d29b8b 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java @@ -60,6 +60,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + public class KafkaSinkAvroTypeIT { private static final int TEST_ID = 123456; diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java index 51b74ae316..4d39bba096 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java @@ -14,14 +14,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonDeserializer; -import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -34,13 +31,13 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -50,6 +47,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -58,45 +57,22 @@ public class KafkaSinkJsonTypeIT { private static final int TEST_ID = 123456; - @Mock - private KafkaSinkConfig kafkaSinkConfig; - @Mock + private KafkaSinkConfig kafkaSinkConfig; private TopicProducerConfig topicConfig; - private KafkaSink kafkaSink; - private String bootstrapServers; private String testTopic; - private PluginSetting pluginSetting; - - @Mock private PluginFactory pluginFactory; - - @Mock private PluginMetrics pluginMetrics; - private SinkContext sinkContext; - - @Mock private DlqProvider dlqProvider; - - @Mock private DlqWriter dlqWriter; - - @Mock private ExpressionEvaluator evaluator; - - @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - - private PlainTextAuthConfig plainTextAuthConfig; - private AuthConfig.SaslAuthConfig saslAuthConfig; - private AuthConfig authConfig; - - private static final Properties props = new Properties(); - + private EncryptionConfig encryptionConfig; + private Properties props; public KafkaSink createObjectUnderTest() { return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier); @@ -104,9 +80,9 @@ public KafkaSink createObjectUnderTest() { @BeforeEach public void setup() { - plainTextAuthConfig = mock(PlainTextAuthConfig.class); - saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); - authConfig = mock(AuthConfig.class); + props = new Properties(); + encryptionConfig = mock(EncryptionConfig.class); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); evaluator = mock(ExpressionEvaluator.class); dlqWriter = mock(DlqWriter.class); @@ -114,6 +90,8 @@ public void setup() { sinkContext = mock(SinkContext.class); pluginFactory = mock(PluginFactory.class); pluginSetting = mock(PluginSetting.class); + pluginMetrics = mock(PluginMetrics.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); when(pluginSetting.getName()).thenReturn("name"); when(pluginSetting.getPipelineName()).thenReturn("pipelinename"); @@ -124,34 +102,39 @@ public void setup() { when(kafkaSinkConfig.getSchemaConfig()).thenReturn(null); when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); + when(kafkaSinkConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(kafkaSinkConfig.getAuthConfig()).thenReturn(null); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); + when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON); + when(topicConfig.isCreateTopic()).thenReturn(false); + when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); + bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } - @Test - public void TestPollRecordsJsonSASLPlainText() throws Exception { - configureJasConfForSASLPlainText(); + @AfterEach + public void tearDown() { + try (AdminClient adminClient = AdminClient.create(props)) { + adminClient.deleteTopics(Collections.singleton(testTopic)) + .all().whenComplete((v, throwable) -> {}); + } + } + @Test + public void TestPollRecordsJson() throws Exception { final int numRecords = 1; - when(topicConfig.isCreateTopic()).thenReturn(false); - when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); - when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); kafkaSink = createObjectUnderTest(); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = topicConfig.getName(); - - createTopic(created, topicName); + createTopic(created, testTopic); final List> records = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { final Map eventData = new HashMap<>(); eventData.put("name", "testName"); @@ -163,11 +146,7 @@ public void TestPollRecordsJsonSASLPlainText() throws Exception { kafkaSink.doInitialize(); kafkaSink.doOutput(records); - Thread.sleep(4000); - - consumeTestMessages(records); - - deleteTopic(created, topicName); + consumeAndVerifyMessages(records); } private void createTopic(AtomicBoolean created, String topicName) throws InterruptedException { @@ -181,76 +160,34 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru } created.set(true); } - while (created.get() != true) { - Thread.sleep(1000); - } - } - - private void deleteTopic(AtomicBoolean created, String topicName) throws InterruptedException { - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } - } - - private void configureJasConfForSASLPlainText() { - String username = System.getProperty("tests.kafka.authconfig.username"); - String password = System.getProperty("tests.kafka.authconfig.password"); - when(plainTextAuthConfig.getUsername()).thenReturn(username); - when(plainTextAuthConfig.getPassword()).thenReturn(password); - when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); - when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); - - String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; - props.put(SaslConfigs.SASL_JAAS_CONFIG, jasConf); - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.toString()); + await().atMost(Duration.ofSeconds(30)).until(created::get); } - private void consumeTestMessages(List> recList) { + private void consumeAndVerifyMessages(List> expectedRecords) { final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - JsonDeserializer.class); - - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); - - kafkaConsumer.subscribe(Arrays.asList(topicConfig.getName())); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - pollRecords(recList, kafkaConsumer); - } + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { + kafkaConsumer.subscribe(Collections.singletonList(testTopic)); - private void pollRecords(List> recList, KafkaConsumer kafkaConsumer) { - int recListCounter = 0; - boolean isPollNext = true; - while (isPollNext) { - ConsumerRecords records = kafkaConsumer.poll(1000); - if (!records.isEmpty() && records.count() > 0) { + List consumed = new ArrayList<>(); + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { - Record recordEvent = recList.get(recListCounter); - String inputJsonStr = recordEvent.getData().toJsonString(); - - JsonNode recValue = record.value(); - String ss = recValue.asText(); - - assertThat(ss, CoreMatchers.containsString(inputJsonStr)); - if (recListCounter + 1 == recList.size()) { - isPollNext = false; - } - recListCounter++; - break; + consumed.add(record.value()); } + assertThat(consumed.size(), equalTo(expectedRecords.size())); + }); + + for (int i = 0; i < expectedRecords.size(); i++) { + Event expectedEvent = expectedRecords.get(i).getData(); + JsonNode actual = consumed.get(i); + assertThat(actual.get("name").asText(), equalTo(expectedEvent.get("name", String.class))); + assertThat(actual.get("id").asText(), equalTo(expectedEvent.get("id", String.class))); } } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java index ea855b0ec3..f905b32e9a 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java @@ -13,13 +13,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; -import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -32,13 +29,13 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -48,6 +45,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -56,45 +55,22 @@ public class KafkaSinkPlainTextTypeIT { private static final int TEST_ID = 123456; - @Mock - private KafkaSinkConfig kafkaSinkConfig; - @Mock + private KafkaSinkConfig kafkaSinkConfig; private TopicProducerConfig topicConfig; - private KafkaSink kafkaSink; - private String bootstrapServers; private String testTopic; - private PluginSetting pluginSetting; - - @Mock private PluginFactory pluginFactory; - - @Mock private PluginMetrics pluginMetrics; - private SinkContext sinkContext; - - @Mock private DlqProvider dlqProvider; - - @Mock private DlqWriter dlqWriter; - - @Mock private ExpressionEvaluator evaluator; - - @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - - private PlainTextAuthConfig plainTextAuthConfig; - private AuthConfig.SaslAuthConfig saslAuthConfig; - private AuthConfig authConfig; - - private static final Properties props = new Properties(); - + private EncryptionConfig encryptionConfig; + private Properties props; public KafkaSink createObjectUnderTest() { return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier); @@ -102,9 +78,9 @@ public KafkaSink createObjectUnderTest() { @BeforeEach public void setup() { - plainTextAuthConfig = mock(PlainTextAuthConfig.class); - saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); - authConfig = mock(AuthConfig.class); + props = new Properties(); + encryptionConfig = mock(EncryptionConfig.class); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); evaluator = mock(ExpressionEvaluator.class); dlqWriter = mock(DlqWriter.class); @@ -112,6 +88,8 @@ public void setup() { sinkContext = mock(SinkContext.class); pluginFactory = mock(PluginFactory.class); pluginSetting = mock(PluginSetting.class); + pluginMetrics = mock(PluginMetrics.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); when(pluginSetting.getName()).thenReturn("name"); when(pluginSetting.getPipelineName()).thenReturn("pipelinename"); @@ -122,34 +100,39 @@ public void setup() { when(kafkaSinkConfig.getSchemaConfig()).thenReturn(null); when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); + when(kafkaSinkConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(kafkaSinkConfig.getAuthConfig()).thenReturn(null); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); + when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); + when(topicConfig.isCreateTopic()).thenReturn(false); + when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); + bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } + @AfterEach + public void tearDown() { + try (AdminClient adminClient = AdminClient.create(props)) { + adminClient.deleteTopics(Collections.singleton(testTopic)) + .all().whenComplete((v, throwable) -> {}); + } + } + @Test public void TestPollRecordsPlainText() throws Exception { - - configureJasConfForSASLPlainText(); - final int numRecords = 1; - when(topicConfig.isCreateTopic()).thenReturn(false); - when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); - when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); kafkaSink = createObjectUnderTest(); AtomicBoolean created = new AtomicBoolean(false); - final String topicName = topicConfig.getName(); - - createTopic(created, topicName); + createTopic(created, testTopic); final List> records = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { final Map eventData = new HashMap<>(); eventData.put("name", "testName"); @@ -161,25 +144,7 @@ public void TestPollRecordsPlainText() throws Exception { kafkaSink.doInitialize(); kafkaSink.doOutput(records); - Thread.sleep(4000); - - consumeTestMessages(records); - - deleteTopic(created, topicName); - } - - private void configureJasConfForSASLPlainText() { - String username = System.getProperty("tests.kafka.authconfig.username"); - String password = System.getProperty("tests.kafka.authconfig.password"); - when(plainTextAuthConfig.getUsername()).thenReturn(username); - when(plainTextAuthConfig.getPassword()).thenReturn(password); - when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); - when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); - - String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; - props.put(SaslConfigs.SASL_JAAS_CONFIG, jasConf); - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.toString()); + consumeAndVerifyMessages(records); } private void createTopic(AtomicBoolean created, String topicName) throws InterruptedException { @@ -193,61 +158,32 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru } created.set(true); } - while (created.get() != true) { - Thread.sleep(1000); - } - } - - private void deleteTopic(AtomicBoolean created, String topicName) throws InterruptedException { - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } + await().atMost(Duration.ofSeconds(30)).until(created::get); } - private void consumeTestMessages(List> recList) { + private void consumeAndVerifyMessages(List> expectedRecords) { final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); - props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - - KafkaConsumer kafkaConsumer = new KafkaConsumer(props); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - kafkaConsumer.subscribe(Arrays.asList(topicConfig.getName())); + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { + kafkaConsumer.subscribe(Collections.singletonList(testTopic)); - pollRecords(recList, kafkaConsumer); - } - - private void pollRecords(List> recList, KafkaConsumer kafkaConsumer) { - int recListCounter = 0; - boolean isPollNext = true; - while (isPollNext) { - ConsumerRecords records = kafkaConsumer.poll(1000); - if (!records.isEmpty() && records.count() > 0) { + List consumed = new ArrayList<>(); + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { - Record recordEvent = recList.get(recListCounter); - String inputJsonStr = recordEvent.getData().toJsonString(); - - String recValue = record.value(); - assertThat(recValue, CoreMatchers.containsString(inputJsonStr)); - if (recListCounter + 1 == recList.size()) { - isPollNext = false; - } - recListCounter++; - break; + consumed.add(record.value()); } + assertThat(consumed.size(), equalTo(expectedRecords.size())); + }); + + for (int i = 0; i < expectedRecords.size(); i++) { + String expectedJson = expectedRecords.get(i).getData().toJsonString(); + assertThat(consumed.get(i), equalTo(expectedJson)); } } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 39fda47d30..a7e421ba53 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -131,6 +131,7 @@ public void setup() throws Throwable { sourceConfig = mock(KafkaSourceConfig.class); pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); + timer = mock(Timer.class); buffer = mock(Buffer.class); encryptionConfig = mock(EncryptionConfig.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);