From 91dcd7e720dc00b7a2cb19487b3544395f083ee2 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 10 Apr 2026 14:27:42 -0500 Subject: [PATCH 1/4] Enable kafka sink integration tests as a github workflow action Signed-off-by: Taylor Gray --- .github/workflows/kafka-plugin-integration-tests.yml | 3 ++- .gitignore | 5 +++++ .../dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java | 4 ++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/kafka-plugin-integration-tests.yml b/.github/workflows/kafka-plugin-integration-tests.yml index 25a1481339..4fe8a1bc6a 100644 --- a/.github/workflows/kafka-plugin-integration-tests.yml +++ b/.github/workflows/kafka-plugin-integration-tests.yml @@ -66,7 +66,8 @@ jobs: -Dtests.kafka.bootstrap_servers=localhost:9092 \ -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin \ -Dtests.kafka.kms_key=alias/DataPrepperTesting \ - --tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT + --tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT \ + --tests '*kafka.sink*' - name: Upload Unit Test Results if: always() diff --git a/.gitignore b/.gitignore index bb53f3e3b0..cda6680ebd 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,11 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml # Eclipse/IDE compiled output **/bin/ +*.project +*.classpath +*.factorypath +**/.settings/** +*.prefs # Development tools .DS_Store diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java index 69dc2c9d5e..5b01370171 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java @@ -60,6 +60,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + +/** + * Enabled as a github action workflow + */ public class KafkaSinkAvroTypeIT { private static final int TEST_ID = 123456; From 9bf9084618e59d71ee4d7d0370ebce892b98c5e3 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 13 Apr 2026 10:57:36 -0500 Subject: [PATCH 2/4] Fix flaky kafka source integ tests Signed-off-by: Taylor Gray --- .../dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 39fda47d30..a7e421ba53 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -131,6 +131,7 @@ public void setup() throws Throwable { sourceConfig = mock(KafkaSourceConfig.class); pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); + timer = mock(Timer.class); buffer = mock(Buffer.class); encryptionConfig = mock(EncryptionConfig.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); From ce1acb23fd31707df3982a727e9f23e1ea27b455 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 13 Apr 2026 12:11:35 -0500 Subject: [PATCH 3/4] Fix kafka sink tests Signed-off-by: Taylor Gray --- .../kafka-plugin-integration-tests.yml | 2 +- .../kafka/sink/KafkaSinkJsonTypeIT.java | 161 ++++++------------ .../kafka/sink/KafkaSinkPlainTextTypeIT.java | 156 ++++++----------- 3 files changed, 100 insertions(+), 219 deletions(-) diff --git a/.github/workflows/kafka-plugin-integration-tests.yml b/.github/workflows/kafka-plugin-integration-tests.yml index 4fe8a1bc6a..4c604456e7 100644 --- a/.github/workflows/kafka-plugin-integration-tests.yml +++ b/.github/workflows/kafka-plugin-integration-tests.yml @@ -67,7 +67,7 @@ jobs: -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin \ -Dtests.kafka.kms_key=alias/DataPrepperTesting \ --tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT \ - --tests '*kafka.sink*' + --tests KafkaSinkJsonTypeIT --tests KafkaSinkPlainTextTypeIT - name: Upload Unit Test Results if: always() diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java index 51b74ae316..11349a461a 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java @@ -14,14 +14,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonDeserializer; -import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -34,13 +31,13 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -50,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -58,45 +56,22 @@ public class KafkaSinkJsonTypeIT { private static final int TEST_ID = 123456; - @Mock - private KafkaSinkConfig kafkaSinkConfig; - @Mock + private KafkaSinkConfig kafkaSinkConfig; private TopicProducerConfig topicConfig; - private KafkaSink kafkaSink; - private String bootstrapServers; private String testTopic; - private PluginSetting pluginSetting; - - @Mock private PluginFactory pluginFactory; - - @Mock private PluginMetrics pluginMetrics; - private SinkContext sinkContext; - - @Mock private DlqProvider dlqProvider; - - @Mock private DlqWriter dlqWriter; - - @Mock private ExpressionEvaluator evaluator; - - @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - - private PlainTextAuthConfig plainTextAuthConfig; - private AuthConfig.SaslAuthConfig saslAuthConfig; - private AuthConfig authConfig; - - private static final Properties props = new Properties(); - + private EncryptionConfig encryptionConfig; + private Properties props; public KafkaSink createObjectUnderTest() { return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier); @@ -104,9 +79,9 @@ public KafkaSink createObjectUnderTest() { @BeforeEach public void setup() { - plainTextAuthConfig = mock(PlainTextAuthConfig.class); - saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); - authConfig = mock(AuthConfig.class); + props = new Properties(); + encryptionConfig = mock(EncryptionConfig.class); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); evaluator = mock(ExpressionEvaluator.class); dlqWriter = mock(DlqWriter.class); @@ -114,6 +89,8 @@ public void setup() { sinkContext = mock(SinkContext.class); pluginFactory = mock(PluginFactory.class); pluginSetting = mock(PluginSetting.class); + pluginMetrics = mock(PluginMetrics.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); when(pluginSetting.getName()).thenReturn("name"); when(pluginSetting.getPipelineName()).thenReturn("pipelinename"); @@ -124,34 +101,39 @@ public void setup() { when(kafkaSinkConfig.getSchemaConfig()).thenReturn(null); when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); + when(kafkaSinkConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(kafkaSinkConfig.getAuthConfig()).thenReturn(null); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); + when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON); + when(topicConfig.isCreateTopic()).thenReturn(false); + when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); + bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } - @Test - public void TestPollRecordsJsonSASLPlainText() throws Exception { - configureJasConfForSASLPlainText(); + @AfterEach + public void tearDown() { + try (AdminClient adminClient = AdminClient.create(props)) { + adminClient.deleteTopics(Collections.singleton(testTopic)) + .all().whenComplete((v, throwable) -> {}); + } + } + @Test + public void TestPollRecordsJson() throws Exception { final int numRecords = 1; - when(topicConfig.isCreateTopic()).thenReturn(false); - when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); - when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); kafkaSink = createObjectUnderTest(); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = topicConfig.getName(); - - createTopic(created, topicName); + createTopic(created, testTopic); final List> records = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { final Map eventData = new HashMap<>(); eventData.put("name", "testName"); @@ -165,9 +147,7 @@ public void TestPollRecordsJsonSASLPlainText() throws Exception { Thread.sleep(4000); - consumeTestMessages(records); - - deleteTopic(created, topicName); + consumeAndVerifyMessages(records); } private void createTopic(AtomicBoolean created, String topicName) throws InterruptedException { @@ -181,76 +161,37 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru } created.set(true); } - while (created.get() != true) { + while (!created.get()) { Thread.sleep(1000); } } - private void deleteTopic(AtomicBoolean created, String topicName) throws InterruptedException { - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } - } - - private void configureJasConfForSASLPlainText() { - String username = System.getProperty("tests.kafka.authconfig.username"); - String password = System.getProperty("tests.kafka.authconfig.password"); - when(plainTextAuthConfig.getUsername()).thenReturn(username); - when(plainTextAuthConfig.getPassword()).thenReturn(password); - when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); - when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); - - String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; - props.put(SaslConfigs.SASL_JAAS_CONFIG, jasConf); - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.toString()); - } - - private void consumeTestMessages(List> recList) { + private void consumeAndVerifyMessages(List> expectedRecords) { final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - JsonDeserializer.class); - - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - kafkaConsumer.subscribe(Arrays.asList(topicConfig.getName())); + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { + kafkaConsumer.subscribe(Collections.singletonList(testTopic)); - pollRecords(recList, kafkaConsumer); - } - - private void pollRecords(List> recList, KafkaConsumer kafkaConsumer) { - int recListCounter = 0; - boolean isPollNext = true; - while (isPollNext) { - ConsumerRecords records = kafkaConsumer.poll(1000); - if (!records.isEmpty() && records.count() > 0) { + List consumed = new ArrayList<>(); + int maxRetries = 15; + while (consumed.size() < expectedRecords.size() && maxRetries-- > 0) { + ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { - Record recordEvent = recList.get(recListCounter); - String inputJsonStr = recordEvent.getData().toJsonString(); - - JsonNode recValue = record.value(); - String ss = recValue.asText(); - - assertThat(ss, CoreMatchers.containsString(inputJsonStr)); - if (recListCounter + 1 == recList.size()) { - isPollNext = false; - } - recListCounter++; - break; + consumed.add(record.value()); } } + + assertThat(consumed.size(), equalTo(expectedRecords.size())); + for (int i = 0; i < expectedRecords.size(); i++) { + Event expectedEvent = expectedRecords.get(i).getData(); + JsonNode actual = consumed.get(i); + assertThat(actual.get("name").asText(), equalTo(expectedEvent.get("name", String.class))); + assertThat(actual.get("id").asText(), equalTo(expectedEvent.get("id", String.class))); + } } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java index ea855b0ec3..ee233dbe3b 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java @@ -13,13 +13,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; -import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -32,13 +29,13 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -48,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -56,45 +54,22 @@ public class KafkaSinkPlainTextTypeIT { private static final int TEST_ID = 123456; - @Mock - private KafkaSinkConfig kafkaSinkConfig; - @Mock + private KafkaSinkConfig kafkaSinkConfig; private TopicProducerConfig topicConfig; - private KafkaSink kafkaSink; - private String bootstrapServers; private String testTopic; - private PluginSetting pluginSetting; - - @Mock private PluginFactory pluginFactory; - - @Mock private PluginMetrics pluginMetrics; - private SinkContext sinkContext; - - @Mock private DlqProvider dlqProvider; - - @Mock private DlqWriter dlqWriter; - - @Mock private ExpressionEvaluator evaluator; - - @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - - private PlainTextAuthConfig plainTextAuthConfig; - private AuthConfig.SaslAuthConfig saslAuthConfig; - private AuthConfig authConfig; - - private static final Properties props = new Properties(); - + private EncryptionConfig encryptionConfig; + private Properties props; public KafkaSink createObjectUnderTest() { return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier); @@ -102,9 +77,9 @@ public KafkaSink createObjectUnderTest() { @BeforeEach public void setup() { - plainTextAuthConfig = mock(PlainTextAuthConfig.class); - saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); - authConfig = mock(AuthConfig.class); + props = new Properties(); + encryptionConfig = mock(EncryptionConfig.class); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); evaluator = mock(ExpressionEvaluator.class); dlqWriter = mock(DlqWriter.class); @@ -112,6 +87,8 @@ public void setup() { sinkContext = mock(SinkContext.class); pluginFactory = mock(PluginFactory.class); pluginSetting = mock(PluginSetting.class); + pluginMetrics = mock(PluginMetrics.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); when(pluginSetting.getName()).thenReturn("name"); when(pluginSetting.getPipelineName()).thenReturn("pipelinename"); @@ -122,34 +99,39 @@ public void setup() { when(kafkaSinkConfig.getSchemaConfig()).thenReturn(null); when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT.toString()); when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}"); + when(kafkaSinkConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(kafkaSinkConfig.getAuthConfig()).thenReturn(null); testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5); topicConfig = mock(TopicProducerConfig.class); when(topicConfig.getName()).thenReturn(testTopic); + when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); + when(topicConfig.isCreateTopic()).thenReturn(false); + when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); + bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } + @AfterEach + public void tearDown() { + try (AdminClient adminClient = AdminClient.create(props)) { + adminClient.deleteTopics(Collections.singleton(testTopic)) + .all().whenComplete((v, throwable) -> {}); + } + } + @Test public void TestPollRecordsPlainText() throws Exception { - - configureJasConfForSASLPlainText(); - final int numRecords = 1; - when(topicConfig.isCreateTopic()).thenReturn(false); - when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); - when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig); kafkaSink = createObjectUnderTest(); AtomicBoolean created = new AtomicBoolean(false); - final String topicName = topicConfig.getName(); - - createTopic(created, topicName); + createTopic(created, testTopic); final List> records = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { final Map eventData = new HashMap<>(); eventData.put("name", "testName"); @@ -163,23 +145,7 @@ public void TestPollRecordsPlainText() throws Exception { Thread.sleep(4000); - consumeTestMessages(records); - - deleteTopic(created, topicName); - } - - private void configureJasConfForSASLPlainText() { - String username = System.getProperty("tests.kafka.authconfig.username"); - String password = System.getProperty("tests.kafka.authconfig.password"); - when(plainTextAuthConfig.getUsername()).thenReturn(username); - when(plainTextAuthConfig.getPassword()).thenReturn(password); - when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); - when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); - - String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; - props.put(SaslConfigs.SASL_JAAS_CONFIG, jasConf); - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.toString()); + consumeAndVerifyMessages(records); } private void createTopic(AtomicBoolean created, String topicName) throws InterruptedException { @@ -193,61 +159,35 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru } created.set(true); } - while (created.get() != true) { - Thread.sleep(1000); - } - } - - private void deleteTopic(AtomicBoolean created, String topicName) throws InterruptedException { - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { + while (!created.get()) { Thread.sleep(1000); } } - private void consumeTestMessages(List> recList) { + private void consumeAndVerifyMessages(List> expectedRecords) { final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5); - props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - - KafkaConsumer kafkaConsumer = new KafkaConsumer(props); - - kafkaConsumer.subscribe(Arrays.asList(topicConfig.getName())); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - pollRecords(recList, kafkaConsumer); - } + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props)) { + kafkaConsumer.subscribe(Collections.singletonList(testTopic)); - private void pollRecords(List> recList, KafkaConsumer kafkaConsumer) { - int recListCounter = 0; - boolean isPollNext = true; - while (isPollNext) { - ConsumerRecords records = kafkaConsumer.poll(1000); - if (!records.isEmpty() && records.count() > 0) { + List consumed = new ArrayList<>(); + int maxRetries = 15; + while (consumed.size() < expectedRecords.size() && maxRetries-- > 0) { + ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { - Record recordEvent = recList.get(recListCounter); - String inputJsonStr = recordEvent.getData().toJsonString(); - - String recValue = record.value(); - assertThat(recValue, CoreMatchers.containsString(inputJsonStr)); - if (recListCounter + 1 == recList.size()) { - isPollNext = false; - } - recListCounter++; - break; + consumed.add(record.value()); } } + + assertThat(consumed.size(), equalTo(expectedRecords.size())); + for (int i = 0; i < expectedRecords.size(); i++) { + String expectedJson = expectedRecords.get(i).getData().toJsonString(); + assertThat(consumed.get(i), equalTo(expectedJson)); + } } } -} \ No newline at end of file +} From a39a0eee96e36c512971618908557f32ebc69b89 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 13 Apr 2026 16:46:40 -0500 Subject: [PATCH 4/4] Address PR comment to use awaitility Signed-off-by: Taylor Gray --- .../plugins/kafka/sink/KafkaSinkAvroTypeIT.java | 3 --- .../plugins/kafka/sink/KafkaSinkJsonTypeIT.java | 14 +++++--------- .../kafka/sink/KafkaSinkPlainTextTypeIT.java | 14 +++++--------- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java index 5b01370171..ce91d29b8b 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java @@ -61,9 +61,6 @@ import static org.mockito.Mockito.when; -/** - * Enabled as a github action workflow - */ public class KafkaSinkAvroTypeIT { private static final int TEST_ID = 123456; diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java index 11349a461a..4d39bba096 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -145,8 +146,6 @@ public void TestPollRecordsJson() throws Exception { kafkaSink.doInitialize(); kafkaSink.doOutput(records); - Thread.sleep(4000); - consumeAndVerifyMessages(records); } @@ -161,9 +160,7 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru } created.set(true); } - while (!created.get()) { - Thread.sleep(1000); - } + await().atMost(Duration.ofSeconds(30)).until(created::get); } private void consumeAndVerifyMessages(List> expectedRecords) { @@ -177,15 +174,14 @@ private void consumeAndVerifyMessages(List> expectedRecords) { kafkaConsumer.subscribe(Collections.singletonList(testTopic)); List consumed = new ArrayList<>(); - int maxRetries = 15; - while (consumed.size() < expectedRecords.size() && maxRetries-- > 0) { + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { consumed.add(record.value()); } - } + assertThat(consumed.size(), equalTo(expectedRecords.size())); + }); - assertThat(consumed.size(), equalTo(expectedRecords.size())); for (int i = 0; i < expectedRecords.size(); i++) { Event expectedEvent = expectedRecords.get(i).getData(); JsonNode actual = consumed.get(i); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java index ee233dbe3b..f905b32e9a 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkPlainTextTypeIT.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -143,8 +144,6 @@ public void TestPollRecordsPlainText() throws Exception { kafkaSink.doInitialize(); kafkaSink.doOutput(records); - Thread.sleep(4000); - consumeAndVerifyMessages(records); } @@ -159,9 +158,7 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru } created.set(true); } - while (!created.get()) { - Thread.sleep(1000); - } + await().atMost(Duration.ofSeconds(30)).until(created::get); } private void consumeAndVerifyMessages(List> expectedRecords) { @@ -175,15 +172,14 @@ private void consumeAndVerifyMessages(List> expectedRecords) { kafkaConsumer.subscribe(Collections.singletonList(testTopic)); List consumed = new ArrayList<>(); - int maxRetries = 15; - while (consumed.size() < expectedRecords.size() && maxRetries-- > 0) { + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord record : records) { consumed.add(record.value()); } - } + assertThat(consumed.size(), equalTo(expectedRecords.size())); + }); - assertThat(consumed.size(), equalTo(expectedRecords.size())); for (int i = 0; i < expectedRecords.size(); i++) { String expectedJson = expectedRecords.get(i).getData().toJsonString(); assertThat(consumed.get(i), equalTo(expectedJson));