Pull-Based Ingestion Phase 1: End-to-End Single-Index Ingestion
Summary
Wire up all components needed for pull-based ingestion into an existing OpenSearch index, where Data Prepper writes documents to Kafka and OpenSearch pulls them via its ingestion source. Phase 1
requires explicit configuration for document_id, document_version, and index — no defaults or auto-generation.
What's Implemented
PullEngine interface — abstraction for the streaming transport (initialize, write, flush, shutdown)
PullIngester — implements Ingester, routes events to the correct Kafka partition using Murmur3 hash of document ID
IndexRouter — resolves shard assignment using Murmur3 consistent with OpenSearch's OperationRouting
IndexShardProvider — queries OpenSearch for index settings (shard count, ingestion topic) with Caffeine cache
PullIngestionEnvelopeBuilder — constructs the mapper envelope (_id, _version, _source, _op_type)
DocumentIdResolver — evaluates document_id / document_id_field expressions
PullIndexingConfig — configuration class using PluginModel for the engine field, marked @Experimental
- Ingester selection in
OpenSearchSink — loads PullEngine via PluginFactory when pull_indexing is configured
opensearch-pull-kafka project — new Gradle project containing:
KafkaPullEngine — @DataPrepperPlugin implementing PullEngine with Kafka producer
KafkaPullEngineConfig — plugin config (bootstrap_servers)
TopicManager — creates/validates Kafka topic partition count (handles race condition when topic pre-exists)
- Dependency injection via
@Named / packagesToScan
- Unit tests for all components (randomized values, Hamcrest assertions,
createObjectUnderTest() pattern)
- Plugin framework integration test (
KafkaPluginEngineIT) — verifies the plugin constructs correctly with valid config
Constraints (Phase 1)
document_id is required — events without a resolvable document ID fail
document_version is required — no default timestamp-based version yet
- Index must already exist with
ingestion_source configured (Data Prepper does not create it in Phase 1)
- Only
index, create, and delete actions are supported
routing / routing_field fall back to document ID for partition assignment
Configuration Example
opensearch:
hosts: ["https://opensearch:9200"]
index: "my-index"
document_id: "${/doc_id}"
document_version: "${/timestamp}"
action: "index"
pull_indexing:
engine:
kafka:
bootstrap_servers:
- "kafka:9092"
Meta
Pull-Based Ingestion Phase 1: End-to-End Single-Index Ingestion
Summary
Wire up all components needed for pull-based ingestion into an existing OpenSearch index, where Data Prepper writes documents to Kafka and OpenSearch pulls them via its ingestion source. Phase 1
requires explicit configuration for
document_id,document_version, and index — no defaults or auto-generation.What's Implemented
PullEngineinterface — abstraction for the streaming transport (initialize,write,flush,shutdown)PullIngester— implementsIngester, routes events to the correct Kafka partition using Murmur3 hash of document IDIndexRouter— resolves shard assignment using Murmur3 consistent with OpenSearch'sOperationRoutingIndexShardProvider— queries OpenSearch for index settings (shard count, ingestion topic) with Caffeine cachePullIngestionEnvelopeBuilder— constructs the mapper envelope (_id,_version,_source,_op_type)DocumentIdResolver— evaluatesdocument_id/document_id_fieldexpressionsPullIndexingConfig— configuration class usingPluginModelfor theenginefield, marked@ExperimentalOpenSearchSink— loadsPullEngineviaPluginFactorywhenpull_indexingis configuredopensearch-pull-kafkaproject — new Gradle project containing:KafkaPullEngine—@DataPrepperPluginimplementingPullEnginewith Kafka producerKafkaPullEngineConfig— plugin config (bootstrap_servers)TopicManager— creates/validates Kafka topic partition count (handles race condition when topic pre-exists)@Named/packagesToScancreateObjectUnderTest()pattern)KafkaPluginEngineIT) — verifies the plugin constructs correctly with valid configConstraints (Phase 1)
document_idis required — events without a resolvable document ID faildocument_versionis required — no default timestamp-based version yetingestion_sourceconfigured (Data Prepper does not create it in Phase 1)index,create, anddeleteactions are supportedrouting/routing_fieldfall back to document ID for partition assignmentConfiguration Example
Meta