Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ jobs:
-Dtests.kafka.bootstrap_servers=localhost:9092 \
-Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin \
-Dtests.kafka.kms_key=alias/DataPrepperTesting \
--tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT
--tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT \
--tests KafkaSinkJsonTypeIT --tests KafkaSinkPlainTextTypeIT

- name: Upload Unit Test Results
if: always()
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml

# Eclipse/IDE compiled output
**/bin/
*.project
*.classpath
*.factorypath
**/.settings/**
*.prefs

# Development tools
.DS_Store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class KafkaSinkAvroTypeIT {
private static final int TEST_ID = 123456;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -34,13 +31,13 @@
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -50,6 +47,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -58,62 +57,41 @@

public class KafkaSinkJsonTypeIT {
private static final int TEST_ID = 123456;
@Mock
private KafkaSinkConfig kafkaSinkConfig;

@Mock
private KafkaSinkConfig kafkaSinkConfig;
private TopicProducerConfig topicConfig;

private KafkaSink kafkaSink;

private String bootstrapServers;
private String testTopic;

private PluginSetting pluginSetting;

@Mock
private PluginFactory pluginFactory;

@Mock
private PluginMetrics pluginMetrics;

private SinkContext sinkContext;

@Mock
private DlqProvider dlqProvider;

@Mock
private DlqWriter dlqWriter;

@Mock
private ExpressionEvaluator evaluator;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

private PlainTextAuthConfig plainTextAuthConfig;
private AuthConfig.SaslAuthConfig saslAuthConfig;
private AuthConfig authConfig;

private static final Properties props = new Properties();

private EncryptionConfig encryptionConfig;
private Properties props;

public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
public void setup() {
plainTextAuthConfig = mock(PlainTextAuthConfig.class);
saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class);
authConfig = mock(AuthConfig.class);
props = new Properties();
encryptionConfig = mock(EncryptionConfig.class);
when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE);

evaluator = mock(ExpressionEvaluator.class);
dlqWriter = mock(DlqWriter.class);
dlqProvider = mock(DlqProvider.class);
sinkContext = mock(SinkContext.class);
pluginFactory = mock(PluginFactory.class);
pluginSetting = mock(PluginSetting.class);
pluginMetrics = mock(PluginMetrics.class);
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
when(pluginSetting.getName()).thenReturn("name");
when(pluginSetting.getPipelineName()).thenReturn("pipelinename");

Expand All @@ -124,34 +102,39 @@ public void setup() {
when(kafkaSinkConfig.getSchemaConfig()).thenReturn(null);
when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON.toString());
when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}");
when(kafkaSinkConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
when(kafkaSinkConfig.getAuthConfig()).thenReturn(null);

testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5);

topicConfig = mock(TopicProducerConfig.class);
when(topicConfig.getName()).thenReturn(testTopic);
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON);
when(topicConfig.isCreateTopic()).thenReturn(false);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need this for the GitHub Actions to pass. Unless there is some code in this test that creates the topic.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic is created in the test code separately, so the sink plugin does not create the topic, just the test.

when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig);

bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}

@Test
public void TestPollRecordsJsonSASLPlainText() throws Exception {
configureJasConfForSASLPlainText();
@AfterEach
public void tearDown() {
try (AdminClient adminClient = AdminClient.create(props)) {
adminClient.deleteTopics(Collections.singleton(testTopic))
.all().whenComplete((v, throwable) -> {});
}
}

@Test
public void TestPollRecordsJson() throws Exception {
final int numRecords = 1;
when(topicConfig.isCreateTopic()).thenReturn(false);
when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig);
when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig);
kafkaSink = createObjectUnderTest();


AtomicBoolean created = new AtomicBoolean(false);
final String topicName = topicConfig.getName();

createTopic(created, topicName);
createTopic(created, testTopic);

final List<Record<Event>> records = new ArrayList<>();

for (int i = 0; i < numRecords; i++) {
final Map<String, String> eventData = new HashMap<>();
eventData.put("name", "testName");
Expand All @@ -163,11 +146,7 @@ public void TestPollRecordsJsonSASLPlainText() throws Exception {
kafkaSink.doInitialize();
kafkaSink.doOutput(records);

Thread.sleep(4000);

consumeTestMessages(records);

deleteTopic(created, topicName);
consumeAndVerifyMessages(records);
}

private void createTopic(AtomicBoolean created, String topicName) throws InterruptedException {
Expand All @@ -181,76 +160,34 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru
}
created.set(true);
}
while (created.get() != true) {
Thread.sleep(1000);
}
}

private void deleteTopic(AtomicBoolean created, String topicName) throws InterruptedException {
try (AdminClient adminClient = AdminClient.create(props)) {
try {
adminClient.deleteTopics(Collections.singleton(topicName))
.all().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
created.set(false);
}
while (created.get() != false) {
Thread.sleep(1000);
}
}

private void configureJasConfForSASLPlainText() {
String username = System.getProperty("tests.kafka.authconfig.username");
String password = System.getProperty("tests.kafka.authconfig.password");
when(plainTextAuthConfig.getUsername()).thenReturn(username);
when(plainTextAuthConfig.getPassword()).thenReturn(password);
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);

String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, jasConf);
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.toString());
await().atMost(Duration.ofSeconds(30)).until(created::get);
}

private void consumeTestMessages(List<Record<Event>> recList) {
private void consumeAndVerifyMessages(List<Record<Event>> expectedRecords) {
final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
props.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);

KafkaConsumer<String, JsonNode> kafkaConsumer = new KafkaConsumer<>(props);

kafkaConsumer.subscribe(Arrays.asList(topicConfig.getName()));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

pollRecords(recList, kafkaConsumer);
}
try (KafkaConsumer<String, JsonNode> kafkaConsumer = new KafkaConsumer<>(props)) {
kafkaConsumer.subscribe(Collections.singletonList(testTopic));

private void pollRecords(List<Record<Event>> recList, KafkaConsumer<String, JsonNode> kafkaConsumer) {
int recListCounter = 0;
boolean isPollNext = true;
while (isPollNext) {
ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(1000);
if (!records.isEmpty() && records.count() > 0) {
List<JsonNode> consumed = new ArrayList<>();
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, JsonNode> record : records) {
Record<Event> recordEvent = recList.get(recListCounter);
String inputJsonStr = recordEvent.getData().toJsonString();

JsonNode recValue = record.value();
String ss = recValue.asText();

assertThat(ss, CoreMatchers.containsString(inputJsonStr));
if (recListCounter + 1 == recList.size()) {
isPollNext = false;
}
recListCounter++;
break;
consumed.add(record.value());
}
assertThat(consumed.size(), equalTo(expectedRecords.size()));
});

for (int i = 0; i < expectedRecords.size(); i++) {
Event expectedEvent = expectedRecords.get(i).getData();
JsonNode actual = consumed.get(i);
assertThat(actual.get("name").asText(), equalTo(expectedEvent.get("name", String.class)));
assertThat(actual.get("id").asText(), equalTo(expectedEvent.get("id", String.class)));
}
}
}
}
}
Loading
Loading