Initial addition of the PullIngester for the opensearch sink.#6842
Initial addition of the PullIngester for the opensearch sink.#6842dlvenable wants to merge 2 commits into
Conversation
af6842c to
1cdaed0
Compare
This creates a new PullIngester and implements the first one as the KafkaPullEngine. It reads an index to find the pull ingestion topic and then writes data to that topic. It routes shards using the same Murmur 3 approach that OpenSearch uses. The pull-based ingestion is marked as experimental. The configuration requires specifying the document Id currently. Resolves opensearch-project#6835 Signed-off-by: David Venable <dlv@amazon.com>
1cdaed0 to
2002dc8
Compare
| openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentConfig(), | ||
| openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentResourceName(), | ||
| configuredIndexAlias); | ||
| if (openSearchSinkConfiguration.getPullIndexing() == null) { |
There was a problem hiding this comment.
Is semantic enrichment not applicable for pull indexing?
There was a problem hiding this comment.
I'm guessing that it is. We can add this later. This PR is just phase 1 of the implementation.
| return calculateShard(routingValue, numberOfShards); | ||
| } | ||
|
|
||
| static int calculateShard(final String routingValue, final int numberOfShards) { |
There was a problem hiding this comment.
Is this the correct approach. What if something changes on Opensearch Side? Isn't there an API in opensearch for this?
There was a problem hiding this comment.
As I understand you cannot change the primary shard count on an index. Split and shrink will create a new index with different primary shard counts.
| * Murmur3 hash matching OpenSearch's Murmur3HashFunction.hash(String). | ||
| * 32-bit Murmur3 with seed 0, operating on the UTF-8 bytes of the input. | ||
| */ | ||
| static int murmur3Hash(final String routing) { |
There was a problem hiding this comment.
Same comment as above. What if Opensearch's hash function changes? I think we should get this from Opensearch API instead of having our own copy?
There was a problem hiding this comment.
@kkondaka , I agree here and would like to get this as a library. However, this is currently not exposed as a library in OpenSearch. We can open an issue to make this available to Data Prepper for pull ingestion.
| if (currentPartitions < requiredPartitions) { | ||
| LOG.info("Topic '{}' has {} partition(s) but {} required, increasing partition count", | ||
| topicName, currentPartitions, requiredPartitions); | ||
| adminClient.createPartitions( |
There was a problem hiding this comment.
Is it possible that this may be invoked by more than one DataPrepper instance (when they are working as one DataPrepper cluster)? If yes, there may be a race condition here. Maybe you need to catch exceptions like "InvalidPartitionsException or ReassignmentInProgressException", retry?
There was a problem hiding this comment.
This is a good catch. I'll follow up.
|
|
||
| @Override | ||
| public void shutdown() { | ||
| if (producer != null) { |
There was a problem hiding this comment.
Probably explicitly flush() before close() to avoid any data loss?
| pullEngine.write(partition, docId, envelope); | ||
| pullIngestionMetrics.recordBytes(envelope.length); | ||
| pullIngestionMetrics.incrementDocumentsSucceeded(); | ||
| event.getEventHandle().release(true); |
There was a problem hiding this comment.
If positive ack is done here, is there a possibility of some error occurring when the data is pulled from kafka to index? How do they end up in DLQ?
There was a problem hiding this comment.
I actually plan to support DLQ in #6838. This PR is phase 1 of the implementation.
… Kafka topic. Signed-off-by: David Venable <dlv@amazon.com>
Description
This creates a new
PullIngesterand implements the first one as theKafkaPullEngine. It reads an index to find the pull ingestion topic and then writes data to that topic. It routes shards using the same Murmur 3 approach that OpenSearch uses. The pull-based ingestion is marked as experimental. The configuration requires specifying the document Id currently.Issues Resolved
Resolves #6835
This is the first phase of #6796.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.