Skip to content

Initial addition of the PullIngester for the opensearch sink.#6842

Open
dlvenable wants to merge 2 commits into
opensearch-project:mainfrom
dlvenable:6835-pull-ingestion-phase1
Open

Initial addition of the PullIngester for the opensearch sink.#6842
dlvenable wants to merge 2 commits into
opensearch-project:mainfrom
dlvenable:6835-pull-ingestion-phase1

Conversation

@dlvenable
Copy link
Copy Markdown
Member

Description

This creates a new PullIngester and implements the first one as the KafkaPullEngine. It reads an index to find the pull ingestion topic and then writes data to that topic. It routes shards using the same Murmur 3 approach that OpenSearch uses. The pull-based ingestion is marked as experimental. The configuration requires specifying the document Id currently.

Issues Resolved

Resolves #6835

This is the first phase of #6796.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dlvenable dlvenable force-pushed the 6835-pull-ingestion-phase1 branch 2 times, most recently from af6842c to 1cdaed0 Compare May 11, 2026 14:14
This creates a new PullIngester and implements the first one as the KafkaPullEngine. It reads an index to find the pull ingestion topic and then writes data to that topic. It routes shards using the same Murmur 3 approach that OpenSearch uses. The pull-based ingestion is marked as experimental. The configuration requires specifying the document Id currently.

Resolves opensearch-project#6835

Signed-off-by: David Venable <dlv@amazon.com>
@dlvenable dlvenable force-pushed the 6835-pull-ingestion-phase1 branch from 1cdaed0 to 2002dc8 Compare May 19, 2026 15:08
openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentConfig(),
openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentResourceName(),
configuredIndexAlias);
if (openSearchSinkConfiguration.getPullIndexing() == null) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is semantic enrichment not applicable for pull indexing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that it is. We can add this later. This PR is just phase 1 of the implementation.

return calculateShard(routingValue, numberOfShards);
}

static int calculateShard(final String routingValue, final int numberOfShards) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct approach. What if something changes on Opensearch Side? Isn't there an API in opensearch for this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand you cannot change the primary shard count on an index. Split and shrink will create a new index with different primary shard counts.

* Murmur3 hash matching OpenSearch's Murmur3HashFunction.hash(String).
* 32-bit Murmur3 with seed 0, operating on the UTF-8 bytes of the input.
*/
static int murmur3Hash(final String routing) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above. What if Opensearch's hash function changes? I think we should get this from Opensearch API instead of having our own copy?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkondaka , I agree here and would like to get this as a library. However, this is currently not exposed as a library in OpenSearch. We can open an issue to make this available to Data Prepper for pull ingestion.

if (currentPartitions < requiredPartitions) {
LOG.info("Topic '{}' has {} partition(s) but {} required, increasing partition count",
topicName, currentPartitions, requiredPartitions);
adminClient.createPartitions(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this may be invoked by more than one DataPrepper instance (when they are working as one DataPrepper cluster)? If yes, there may be a race condition here. Maybe you need to catch exceptions like "InvalidPartitionsException or ReassignmentInProgressException", retry?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch. I'll follow up.


@Override
public void shutdown() {
if (producer != null) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably explicitly flush() before close() to avoid any data loss?

pullEngine.write(partition, docId, envelope);
pullIngestionMetrics.recordBytes(envelope.length);
pullIngestionMetrics.incrementDocumentsSucceeded();
event.getEventHandle().release(true);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If positive ack is done here, is there a possibility of some error occurring when the data is pulled from kafka to index? How do they end up in DLQ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually plan to support DLQ in #6838. This PR is phase 1 of the implementation.

… Kafka topic.

Signed-off-by: David Venable <dlv@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pull-ingestion phase 1: Write to existing OpenSearch index using Kafka

2 participants