Skip to content

Commit 1cdaed0

Browse files
committed
Initial addition of the PullIngester for the opensearch sink.
This creates a new PullIngester and implements the first one as the KafkaPullEngine. It reads an index to find the pull ingestion topic and then writes data to that topic. It routes shards using the same Murmur 3 approach that OpenSearch uses. The pull-based ingestion is marked as experimental. The configuration requires specifying the document Id currently. Resolves #6835 Signed-off-by: David Venable <dlv@amazon.com>
1 parent 506a04f commit 1cdaed0

32 files changed

Lines changed: 2127 additions & 22 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
dependencies {
11+
implementation project(':data-prepper-api')
12+
implementation project(':data-prepper-plugins:opensearch')
13+
implementation 'org.apache.kafka:kafka-clients:3.9.1'
14+
implementation 'com.fasterxml.jackson.core:jackson-databind'
15+
implementation 'javax.inject:javax.inject:1'
16+
compileOnly 'org.projectlombok:lombok:1.18.46'
17+
annotationProcessor 'org.projectlombok:lombok:1.18.46'
18+
testImplementation project(':data-prepper-test:plugin-test-framework')
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.sink.opensearch.pull.kafka;
11+
12+
import org.apache.kafka.clients.producer.KafkaProducer;
13+
import org.apache.kafka.clients.producer.ProducerConfig;
14+
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.common.serialization.ByteArraySerializer;
16+
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
18+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
19+
import org.opensearch.dataprepper.plugins.sink.opensearch.PullEngine;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.Properties;
24+
25+
@DataPrepperPlugin(name = "kafka", pluginType = PullEngine.class, pluginConfigurationType = KafkaPullEngineConfig.class,
26+
packagesToScan = {KafkaPullEngine.class})
27+
public class KafkaPullEngine implements PullEngine {
28+
private static final Logger LOG = LoggerFactory.getLogger(KafkaPullEngine.class);
29+
30+
private final String bootstrapServers;
31+
private final TopicManager topicManager;
32+
33+
private KafkaProducer<String, byte[]> producer;
34+
private String topicName;
35+
36+
@DataPrepperPluginConstructor
37+
public KafkaPullEngine(final KafkaPullEngineConfig config, final TopicManager topicManager) {
38+
this.bootstrapServers = String.join(",", config.getBootstrapServers());
39+
this.topicManager = topicManager;
40+
}
41+
42+
@Override
43+
public void initialize(final String topicName, final int partitionCount) {
44+
this.topicName = topicName;
45+
46+
topicManager.createTopicWithPartitions(topicName, partitionCount);
47+
48+
final Properties producerProps = new Properties();
49+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
50+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
51+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
52+
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
53+
54+
this.producer = new KafkaProducer<>(producerProps);
55+
LOG.info("KafkaPullEngine initialized with topic '{}' and {} partition(s)", topicName, partitionCount);
56+
}
57+
58+
@Override
59+
public void write(final int partition, final String key, final byte[] document) {
60+
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, partition, key, document);
61+
producer.send(record, (metadata, exception) -> {
62+
if (exception != null) {
63+
LOG.error("Failed to send record with key '{}' to partition {}", key, partition, exception);
64+
}
65+
});
66+
}
67+
68+
@Override
69+
public void flush() {
70+
if (producer != null) {
71+
producer.flush();
72+
}
73+
}
74+
75+
@Override
76+
public void shutdown() {
77+
if (producer != null) {
78+
producer.close();
79+
}
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.sink.opensearch.pull.kafka;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import jakarta.validation.constraints.NotEmpty;
14+
import jakarta.validation.constraints.NotNull;
15+
import lombok.Getter;
16+
17+
import java.util.List;
18+
19+
public class KafkaPullEngineConfig {
20+
@Getter
21+
@JsonProperty("bootstrap_servers")
22+
@NotNull
23+
@NotEmpty
24+
private List<String> bootstrapServers;
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.sink.opensearch.pull.kafka;
11+
12+
import org.apache.kafka.clients.admin.AdminClient;
13+
import org.apache.kafka.clients.admin.AdminClientConfig;
14+
import org.apache.kafka.clients.admin.NewPartitions;
15+
import org.apache.kafka.clients.admin.NewTopic;
16+
import org.apache.kafka.clients.admin.TopicDescription;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import javax.inject.Named;
21+
import java.util.Collections;
22+
import java.util.Map;
23+
import java.util.Properties;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
28+
@Named
29+
public class TopicManager {
30+
private static final Logger LOG = LoggerFactory.getLogger(TopicManager.class);
31+
private static final long TOPIC_READY_TIMEOUT_MS = 30_000;
32+
private static final long PARTITION_POLL_INTERVAL_MS = 500;
33+
34+
private final String bootstrapServers;
35+
36+
public TopicManager(final KafkaPullEngineConfig config) {
37+
this.bootstrapServers = String.join(",", config.getBootstrapServers());
38+
}
39+
40+
void createTopicWithPartitions(final String topicName, final int partitionCount) {
41+
final Properties adminProps = new Properties();
42+
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
43+
44+
try (final AdminClient adminClient = AdminClient.create(adminProps)) {
45+
final NewTopic newTopic = new NewTopic(topicName, partitionCount, (short) 1);
46+
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
47+
LOG.info("Created Kafka topic '{}' with {} partition(s)", topicName, partitionCount);
48+
waitForPartitions(adminClient, topicName, partitionCount);
49+
} catch (final ExecutionException e) {
50+
if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
51+
LOG.info("Kafka topic '{}' already exists, verifying partition count", topicName);
52+
ensurePartitionCount(topicName, partitionCount);
53+
} else {
54+
throw new RuntimeException("Failed to create Kafka topic: " + topicName, e);
55+
}
56+
} catch (final InterruptedException e) {
57+
Thread.currentThread().interrupt();
58+
throw new RuntimeException("Interrupted while creating Kafka topic: " + topicName, e);
59+
}
60+
}
61+
62+
private void ensurePartitionCount(final String topicName, final int requiredPartitions) {
63+
final Properties adminProps = new Properties();
64+
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
65+
66+
try (final AdminClient adminClient = AdminClient.create(adminProps)) {
67+
final Map<String, TopicDescription> descriptions = adminClient
68+
.describeTopics(Collections.singleton(topicName))
69+
.allTopicNames()
70+
.get(TOPIC_READY_TIMEOUT_MS, TimeUnit.MILLISECONDS);
71+
final TopicDescription description = descriptions.get(topicName);
72+
final int currentPartitions = description.partitions().size();
73+
74+
if (currentPartitions < requiredPartitions) {
75+
LOG.info("Topic '{}' has {} partition(s) but {} required, increasing partition count",
76+
topicName, currentPartitions, requiredPartitions);
77+
adminClient.createPartitions(
78+
Collections.singletonMap(topicName, NewPartitions.increaseTo(requiredPartitions))
79+
).all().get();
80+
LOG.info("Increased partition count for topic '{}' to {}", topicName, requiredPartitions);
81+
waitForPartitions(adminClient, topicName, requiredPartitions);
82+
} else {
83+
LOG.info("Topic '{}' already has {} partition(s), required {}", topicName, currentPartitions, requiredPartitions);
84+
}
85+
} catch (final ExecutionException | TimeoutException e) {
86+
throw new RuntimeException("Failed to verify/update partition count for topic: " + topicName, e);
87+
} catch (final InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
throw new RuntimeException("Interrupted while verifying partition count for topic: " + topicName, e);
90+
}
91+
}
92+
93+
private void waitForPartitions(final AdminClient adminClient, final String topicName, final int expectedPartitions) {
94+
final long deadline = System.currentTimeMillis() + TOPIC_READY_TIMEOUT_MS;
95+
while (System.currentTimeMillis() < deadline) {
96+
try {
97+
final Map<String, TopicDescription> descriptions = adminClient
98+
.describeTopics(Collections.singleton(topicName))
99+
.allTopicNames()
100+
.get(TOPIC_READY_TIMEOUT_MS, TimeUnit.MILLISECONDS);
101+
final TopicDescription description = descriptions.get(topicName);
102+
if (description != null && description.partitions().size() >= expectedPartitions) {
103+
LOG.info("All {} partition(s) ready for topic '{}'", expectedPartitions, topicName);
104+
return;
105+
}
106+
} catch (final InterruptedException e) {
107+
Thread.currentThread().interrupt();
108+
throw new RuntimeException("Interrupted while waiting for topic partitions", e);
109+
} catch (final ExecutionException | TimeoutException e) {
110+
LOG.debug("Waiting for topic '{}' partitions to become available", topicName);
111+
}
112+
try {
113+
Thread.sleep(PARTITION_POLL_INTERVAL_MS);
114+
} catch (final InterruptedException e) {
115+
Thread.currentThread().interrupt();
116+
throw new RuntimeException("Interrupted while waiting for topic partitions", e);
117+
}
118+
}
119+
LOG.warn("Timed out waiting for all {} partition(s) on topic '{}' to become available", expectedPartitions, topicName);
120+
}
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.sink.opensearch.pull.kafka;
11+
12+
import org.junit.jupiter.api.Test;
13+
import org.opensearch.dataprepper.plugins.sink.opensearch.PullEngine;
14+
import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest;
15+
import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile;
16+
import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite;
17+
18+
import static org.hamcrest.CoreMatchers.instanceOf;
19+
import static org.hamcrest.CoreMatchers.notNullValue;
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
22+
@DataPrepperPluginTest(pluginName = "kafka", pluginType = PullEngine.class)
23+
public class KafkaPluginEngineIT extends BaseDataPrepperPluginStandardTestSuite {
24+
@Test
25+
void constructs_with_valid_configuration(
26+
@PluginConfigurationFile("valid_kafka_config.yaml") final PullEngine pullEngine) {
27+
assertThat(pullEngine, notNullValue());
28+
assertThat(pullEngine, instanceOf(KafkaPullEngine.class));
29+
}
30+
}

0 commit comments

Comments
 (0)