Skip to content

Pull-ingestion phase 1: Write to existing OpenSearch index using Kafka #6835

@dlvenable

Description

@dlvenable

Pull-Based Ingestion Phase 1: End-to-End Single-Index Ingestion

Summary

Wire up all components needed for pull-based ingestion into an existing OpenSearch index, where Data Prepper writes documents to Kafka and OpenSearch pulls them via its ingestion source. Phase 1
requires explicit configuration for document_id, document_version, and index — no defaults or auto-generation.

What's Implemented

  • PullEngine interface — abstraction for the streaming transport (initialize, write, flush, shutdown)
  • PullIngester — implements Ingester, routes events to the correct Kafka partition using Murmur3 hash of document ID
  • IndexRouter — resolves shard assignment using Murmur3 consistent with OpenSearch's OperationRouting
  • IndexShardProvider — queries OpenSearch for index settings (shard count, ingestion topic) with Caffeine cache
  • PullIngestionEnvelopeBuilder — constructs the mapper envelope (_id, _version, _source, _op_type)
  • DocumentIdResolver — evaluates document_id / document_id_field expressions
  • PullIndexingConfig — configuration class using PluginModel for the engine field, marked @Experimental
  • Ingester selection in OpenSearchSink — loads PullEngine via PluginFactory when pull_indexing is configured
  • opensearch-pull-kafka project — new Gradle project containing:
    • KafkaPullEngine@DataPrepperPlugin implementing PullEngine with Kafka producer
    • KafkaPullEngineConfig — plugin config (bootstrap_servers)
    • TopicManager — creates/validates Kafka topic partition count (handles race condition when topic pre-exists)
    • Dependency injection via @Named / packagesToScan
  • Unit tests for all components (randomized values, Hamcrest assertions, createObjectUnderTest() pattern)
  • Plugin framework integration test (KafkaPluginEngineIT) — verifies the plugin constructs correctly with valid config

Constraints (Phase 1)

  • document_id is required — events without a resolvable document ID fail
  • document_version is required — no default timestamp-based version yet
  • Index must already exist with ingestion_source configured (Data Prepper does not create it in Phase 1)
  • Only index, create, and delete actions are supported
  • routing / routing_field fall back to document ID for partition assignment

Configuration Example

opensearch:
  hosts: ["https://opensearch:9200"]
  index: "my-index"
  document_id: "${/doc_id}"
  document_version: "${/timestamp}"
  action: "index"
  pull_indexing:
    engine:
      kafka:
        bootstrap_servers:
          - "kafka:9092"

Meta

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

Status

Unplanned

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions