diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/IsolationLevel.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/IsolationLevel.java new file mode 100644 index 0000000000..6566fa4a5f --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/IsolationLevel.java @@ -0,0 +1,33 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Map; +import java.util.Arrays; +import java.util.stream.Collectors; + +public enum IsolationLevel { + READ_UNCOMMITTED("read_uncommitted"), + READ_COMMITTED("read_committed"); + + private static final Map OPTIONS_MAP = Arrays.stream(IsolationLevel.values()) + .collect(Collectors.toMap( + value -> value.type, + value -> value + )); + + private final String type; + + IsolationLevel(final String type) { + this.type = type; + } + + @JsonCreator + public static IsolationLevel fromTypeValue(final String type) { + return OPTIONS_MAP.get(type.toLowerCase()); + } + + public String getType() { + return type; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaIsolationLevelConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaIsolationLevelConfig.java new file mode 100644 index 0000000000..82635b2fb5 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaIsolationLevelConfig.java @@ -0,0 +1,5 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +public interface KafkaIsolationLevelConfig { + IsolationLevel getIsolationLevel(); +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index 531f7cd58f..226bef6773 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -41,6 +41,8 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaIsolationLevelConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; @@ -174,6 +176,12 @@ public static void setConsumerTopicProperties(final Properties properties, final properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes()); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); + if (topicConfig instanceof KafkaIsolationLevelConfig) { + IsolationLevel isolationLevel = ((KafkaIsolationLevelConfig) topicConfig).getIsolationLevel(); + if (isolationLevel != null) { + properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel.getType()); + } + } } private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java index 20c07dd5a9..e87d5b3e2b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java @@ -13,11 +13,13 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaIsolationLevelConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import java.time.Duration; -class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig { +class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig, KafkaIsolationLevelConfig { static final boolean DEFAULT_AUTO_COMMIT = false; static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); static final String DEFAULT_FETCH_MAX_BYTES = "50mb"; @@ -98,6 +100,9 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig @JsonProperty("fetch_min_bytes") private String fetchMinBytes = DEFAULT_FETCH_MIN_BYTES; + @JsonProperty("isolation_level") + private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED; + @Override public String getEncryptionId() { return null; @@ -212,4 +217,9 @@ public Integer getWorkers() { public Duration getHeartBeatInterval() { return heartBeatInterval; } + + @Override + public IsolationLevel getIsolationLevel() { + return isolationLevel; + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/IsolationLevelTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/IsolationLevelTest.java new file mode 100644 index 0000000000..7939dc823c --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/IsolationLevelTest.java @@ -0,0 +1,66 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.emptyString; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.hamcrest.MatcherAssert.assertThat; + +public class IsolationLevelTest { + + @ParameterizedTest + @EnumSource(IsolationLevel.class) + void fromTypeValue_should_return_expected_enum(final IsolationLevel value) { + assertThat(IsolationLevel.fromTypeValue(value.getType()), is(value)); + assertThat(value, instanceOf(IsolationLevel.class)); + } + + @ParameterizedTest + @ArgumentsSource(IsolationLevelToKnownName.class) + void fromTypeValue_returns_expected_value(final IsolationLevel isolationLevel, final String knownString) { + assertThat(IsolationLevel.fromTypeValue(knownString), equalTo(isolationLevel)); + } + + @ParameterizedTest + @EnumSource(IsolationLevel.class) + void getType_returns_non_empty_string_for_all_types(final IsolationLevel isolationLevel) { + assertThat(isolationLevel.getType(), notNullValue()); + assertThat(isolationLevel.getType(), not(emptyString())); + } + + @ParameterizedTest + @ArgumentsSource(IsolationLevelToKnownName.class) + void getType_returns_expected_string(final IsolationLevel isolationLevel, final String expectedString) { + assertThat(isolationLevel.getType(), equalTo(expectedString)); + } + + @Test + void fromTypeValue_returns_null_for_unknown_string() { + assertThat(IsolationLevel.fromTypeValue("unknown"), nullValue()); + assertThat(IsolationLevel.fromTypeValue("READ_COMMITED_WRONG_CASE"), nullValue()); + } + + static class IsolationLevelToKnownName implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(IsolationLevel.READ_UNCOMMITTED, "read_uncommitted"), + arguments(IsolationLevel.READ_COMMITTED, "read_committed") + ); + } + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java index 5cb86c507d..b469cbb35c 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java @@ -13,6 +13,7 @@ import org.mockito.Mock; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; @@ -94,4 +95,11 @@ void test_setters() throws NoSuchFieldException, IllegalAccessException { setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.SSL); assertEquals(EncryptionType.SSL, encryptionConfig.getType()); } + + @Test + void test_isolation_level_deserialization_from_yaml() { + assertThat(kafkaSourceConfig.getTopics(), notNullValue()); + SourceTopicConfig topic = (SourceTopicConfig) kafkaSourceConfig.getTopics().get(0); + assertEquals(IsolationLevel.READ_COMMITTED, topic.getIsolationLevel()); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java index b5ad09b853..96fed850f2 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -53,4 +54,19 @@ void invalid_getFetchMaxBytes_zero_bytes() throws NoSuchFieldException, IllegalA setField(SourceTopicConfig.class, objectUnderTest, "fetchMaxBytes", "0b"); assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes()); } + + @Test + void verify_default_isolation_level() { + SourceTopicConfig objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.getIsolationLevel(), equalTo(IsolationLevel.READ_UNCOMMITTED)); + } + + @Test + void verify_custom_isolation_level() throws NoSuchFieldException, IllegalAccessException { + SourceTopicConfig objectUnderTest = createObjectUnderTest(); + + setField(SourceTopicConfig.class, objectUnderTest, "isolationLevel", IsolationLevel.READ_COMMITTED); + assertThat(objectUnderTest.getIsolationLevel(), equalTo(IsolationLevel.READ_COMMITTED)); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index 707968b508..831bff57f9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -22,6 +22,7 @@ log-pipeline: retry_backoff: PT100S consumer_max_poll_records: 500 max_partition_fetch_bytes: "10mb" + isolation_level: read_committed schema: registry_url: http://localhost:8081/ version: 1