Skip to content

Commit 2f1060a

Browse files
lhoossMohamed Houssam LAHRACH
authored andcommitted
Support isolation.level config for Kafka consumer
Signed-off-by: Mohamed Houssam LAHRACH <lahrach.houssam@gmail.com> Signed-off-by: Mohamed Houssam LAHRACH <mohamedhoussam.lahrach@cegedim.com>
1 parent 44b62a2 commit 2f1060a

7 files changed

Lines changed: 82 additions & 1 deletion

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.opensearch.dataprepper.plugins.kafka.configuration;
2+
3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
5+
import java.util.Map;
6+
import java.util.Arrays;
7+
import java.util.stream.Collectors;
8+
9+
public enum IsolationLevel {
10+
READ_UNCOMMITTED("read_uncommitted"),
11+
READ_COMMITTED("read_committed");
12+
13+
private static final Map<String, IsolationLevel> OPTIONS_MAP = Arrays.stream(IsolationLevel.values())
14+
.collect(Collectors.toMap(
15+
value -> value.type,
16+
value -> value
17+
));
18+
19+
private final String type;
20+
21+
IsolationLevel(final String type) {
22+
this.type = type;
23+
}
24+
25+
@JsonCreator
26+
public static IsolationLevel fromTypeValue(final String type) {
27+
return OPTIONS_MAP.get(type.toLowerCase());
28+
}
29+
30+
public String getType() {
31+
return type;
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.opensearch.dataprepper.plugins.kafka.configuration;
2+
3+
public interface KafkaIsolationLevelConfig {
4+
IsolationLevel getIsolationLevel();
5+
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
4242
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
4343
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
44+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaIsolationLevelConfig;
45+
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;
4446

4547
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
4648
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
@@ -174,6 +176,12 @@ public static void setConsumerTopicProperties(final Properties properties, final
174176
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
175177
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes());
176178
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
179+
if (topicConfig instanceof KafkaIsolationLevelConfig) {
180+
IsolationLevel isolationLevel = ((KafkaIsolationLevelConfig) topicConfig).getIsolationLevel();
181+
if (isolationLevel != null) {
182+
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel.getType());
183+
}
184+
}
177185
}
178186

179187
private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) {

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
1414
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
1515
import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig;
16+
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaIsolationLevelConfig;
17+
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;
1618
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
1719

1820
import java.time.Duration;
1921

20-
class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig {
22+
class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig, KafkaIsolationLevelConfig {
2123
static final boolean DEFAULT_AUTO_COMMIT = false;
2224
static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
2325
static final String DEFAULT_FETCH_MAX_BYTES = "50mb";
@@ -98,6 +100,9 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig
98100
@JsonProperty("fetch_min_bytes")
99101
private String fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;
100102

103+
@JsonProperty("isolation_level")
104+
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
105+
101106
@Override
102107
public String getEncryptionId() {
103108
return null;
@@ -212,4 +217,9 @@ public Integer getWorkers() {
212217
public Duration getHeartBeatInterval() {
213218
return heartBeatInterval;
214219
}
220+
221+
@Override
222+
public IsolationLevel getIsolationLevel() {
223+
return isolationLevel;
224+
}
215225
}

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceConfigTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.mockito.Mock;
1414
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
1515
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
16+
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;
1617
import org.yaml.snakeyaml.Yaml;
1718

1819
import java.io.FileReader;
@@ -94,4 +95,11 @@ void test_setters() throws NoSuchFieldException, IllegalAccessException {
9495
setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.SSL);
9596
assertEquals(EncryptionType.SSL, encryptionConfig.getType());
9697
}
98+
99+
@Test
100+
void test_isolation_level_deserialization_from_yaml() {
101+
assertThat(kafkaSourceConfig.getTopics(), notNullValue());
102+
SourceTopicConfig topic = (SourceTopicConfig) kafkaSourceConfig.getTopics().get(0);
103+
assertEquals(IsolationLevel.READ_COMMITTED, topic.getIsolationLevel());
104+
}
97105
}

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfigTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.junit.jupiter.api.Test;
99
import org.opensearch.dataprepper.model.types.ByteCount;
10+
import org.opensearch.dataprepper.plugins.kafka.configuration.IsolationLevel;
1011

1112
import static org.hamcrest.CoreMatchers.equalTo;
1213
import static org.hamcrest.MatcherAssert.assertThat;
@@ -53,4 +54,19 @@ void invalid_getFetchMaxBytes_zero_bytes() throws NoSuchFieldException, IllegalA
5354
setField(SourceTopicConfig.class, objectUnderTest, "fetchMaxBytes", "0b");
5455
assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes());
5556
}
57+
58+
@Test
59+
void verify_default_isolation_level() {
60+
SourceTopicConfig objectUnderTest = createObjectUnderTest();
61+
assertThat(objectUnderTest.getIsolationLevel(), equalTo(IsolationLevel.READ_UNCOMMITTED));
62+
}
63+
64+
@Test
65+
void verify_custom_isolation_level() throws NoSuchFieldException, IllegalAccessException {
66+
SourceTopicConfig objectUnderTest = createObjectUnderTest();
67+
68+
setField(SourceTopicConfig.class, objectUnderTest, "isolationLevel", IsolationLevel.READ_COMMITTED);
69+
assertThat(objectUnderTest.getIsolationLevel(), equalTo(IsolationLevel.READ_COMMITTED));
70+
}
71+
5672
}

data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ log-pipeline:
2222
retry_backoff: PT100S
2323
consumer_max_poll_records: 500
2424
max_partition_fetch_bytes: "10mb"
25+
isolation_level: read_committed
2526
schema:
2627
registry_url: http://localhost:8081/
2728
version: 1

0 commit comments

Comments
 (0)