Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum IsolationLevel {
Comment thread
lhooss marked this conversation as resolved.
READ_UNCOMMITTED("read_uncommitted"),
READ_COMMITTED("read_committed");

private static final Map<String, IsolationLevel> OPTIONS_MAP = Arrays.stream(IsolationLevel.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

IsolationLevel(final String type) {
this.type = type;
}

@JsonCreator
public static IsolationLevel fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}

public String getType() {
return type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

public interface KafkaIsolationLevelConfig {
IsolationLevel getIsolationLevel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaIsolationLevelConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;

import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
Expand Down Expand Up @@ -174,6 +176,12 @@ public static void setConsumerTopicProperties(final Properties properties, final
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes());
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
if (topicConfig instanceof KafkaIsolationLevelConfig) {
IsolationLevel isolationLevel = ((KafkaIsolationLevelConfig) topicConfig).getIsolationLevel();
if (isolationLevel != null) {
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel.getType());
}
}
}

private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaIsolationLevelConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;

class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig {
class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig, KafkaIsolationLevelConfig {
static final boolean DEFAULT_AUTO_COMMIT = false;
static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
static final String DEFAULT_FETCH_MAX_BYTES = "50mb";
Expand Down Expand Up @@ -98,6 +100,9 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig
@JsonProperty("fetch_min_bytes")
private String fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;

@JsonProperty("isolation_level")
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;

@Override
public String getEncryptionId() {
return null;
Expand Down Expand Up @@ -212,4 +217,9 @@ public Integer getWorkers() {
public Duration getHeartBeatInterval() {
return heartBeatInterval;
}

@Override
public IsolationLevel getIsolationLevel() {
return isolationLevel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.emptyString;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.hamcrest.MatcherAssert.assertThat;

public class IsolationLevelTest {

@ParameterizedTest
@EnumSource(IsolationLevel.class)
void fromTypeValue_should_return_expected_enum(final IsolationLevel value) {
assertThat(IsolationLevel.fromTypeValue(value.getType()), is(value));
assertThat(value, instanceOf(IsolationLevel.class));
}

@ParameterizedTest
@ArgumentsSource(IsolationLevelToKnownName.class)
void fromTypeValue_returns_expected_value(final IsolationLevel isolationLevel, final String knownString) {
assertThat(IsolationLevel.fromTypeValue(knownString), equalTo(isolationLevel));
}

@ParameterizedTest
@EnumSource(IsolationLevel.class)
void getType_returns_non_empty_string_for_all_types(final IsolationLevel isolationLevel) {
assertThat(isolationLevel.getType(), notNullValue());
assertThat(isolationLevel.getType(), not(emptyString()));
}

@ParameterizedTest
@ArgumentsSource(IsolationLevelToKnownName.class)
void getType_returns_expected_string(final IsolationLevel isolationLevel, final String expectedString) {
assertThat(isolationLevel.getType(), equalTo(expectedString));
}

@Test
void fromTypeValue_returns_null_for_unknown_string() {
assertThat(IsolationLevel.fromTypeValue("unknown"), nullValue());
assertThat(IsolationLevel.fromTypeValue("READ_COMMITED_WRONG_CASE"), nullValue());
}

static class IsolationLevelToKnownName implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(IsolationLevel.READ_UNCOMMITTED, "read_uncommitted"),
arguments(IsolationLevel.READ_COMMITTED, "read_committed")
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.mockito.Mock;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;
import org.yaml.snakeyaml.Yaml;

import java.io.FileReader;
Expand Down Expand Up @@ -94,4 +95,11 @@ void test_setters() throws NoSuchFieldException, IllegalAccessException {
setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.SSL);
assertEquals(EncryptionType.SSL, encryptionConfig.getType());
}

@Test
void test_isolation_level_deserialization_from_yaml() {
assertThat(kafkaSourceConfig.getTopics(), notNullValue());
SourceTopicConfig topic = (SourceTopicConfig) kafkaSourceConfig.getTopics().get(0);
assertEquals(IsolationLevel.READ_COMMITTED, topic.getIsolationLevel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -53,4 +54,19 @@ void invalid_getFetchMaxBytes_zero_bytes() throws NoSuchFieldException, IllegalA
setField(SourceTopicConfig.class, objectUnderTest, "fetchMaxBytes", "0b");
assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes());
}

@Test
void verify_default_isolation_level() {
SourceTopicConfig objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.getIsolationLevel(), equalTo(IsolationLevel.READ_UNCOMMITTED));
}

@Test
void verify_custom_isolation_level() throws NoSuchFieldException, IllegalAccessException {
SourceTopicConfig objectUnderTest = createObjectUnderTest();

setField(SourceTopicConfig.class, objectUnderTest, "isolationLevel", IsolationLevel.READ_COMMITTED);
assertThat(objectUnderTest.getIsolationLevel(), equalTo(IsolationLevel.READ_COMMITTED));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ log-pipeline:
retry_backoff: PT100S
consumer_max_poll_records: 500
max_partition_fetch_bytes: "10mb"
isolation_level: read_committed
schema:
registry_url: http://localhost:8081/
version: 1
Expand Down
Loading