From 2a982d844af2e60dccc46ad54f60bf7041466e58 Mon Sep 17 00:00:00 2001 From: Divakar Pratap Singh Date: Fri, 22 May 2026 02:45:36 +0000 Subject: [PATCH] Add ByteDecoder for opensearch_api source to support Kafka buffer The opensearch_api source did not work with Kafka buffer because no ByteDecoder was registered. When buffer.isByteBuffer()=true, raw bytes are written but the consumer side had no way to reconstruct events from the NDJSON bulk format. This adds OpenSearchBulkByteDecoder which parses NDJSON bulk format (action/metadata + document line pairs) back into Data Prepper events with correct metadata attributes. Resolves #6876 Signed-off-by: Divakar Pratap Singh --- .../opensearchapi/OpenSearchAPISource.java | 6 + .../OpenSearchBulkByteDecoder.java | 101 +++++++++++++ .../OpenSearchBulkByteDecoderTest.java | 138 ++++++++++++++++++ 3 files changed, 245 insertions(+) create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoder.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoderTest.java diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java index 9bfa1a9c53..4f95c8e108 100644 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; @@ -38,4 +39,9 @@ public BaseHttpService getHttpService(final int bufferWriteTimeoutInMillis, fina public String getHttpHealthCheckPath() { return HTTP_HEALTH_CHECK_PATH; } + + @Override + public ByteDecoder getDecoder() { + return new OpenSearchBulkByteDecoder(); + } } diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoder.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoder.java new file mode 100644 index 0000000000..52417bb742 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoder.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Consumer; + +public class OpenSearchBulkByteDecoder implements ByteDecoder { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE = new TypeReference<>() {}; + + @Override + public void parse(InputStream inputStream, Instant timeReceived, Consumer> eventConsumer) throws IOException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + if (line.isBlank()) continue; + + Map actionLine = OBJECT_MAPPER.readValue(line, MAP_TYPE); + String action = extractAction(actionLine); + if (action == null) continue; + + @SuppressWarnings("unchecked") + Map metadata = (Map) actionLine.get(action); + boolean isDelete = OpenSearchBulkActions.DELETE.toString().equals(action); + + Map documentData = null; + if (!isDelete) { + String docLine = reader.readLine(); + if (docLine == null || docLine.isBlank()) continue; + documentData = OBJECT_MAPPER.readValue(docLine, MAP_TYPE); + } + + JacksonEvent.Builder builder = JacksonEvent.builder() + .withEventType(EventType.DOCUMENT.toString()); + if (documentData != null) { + builder.withData(documentData); + } + if (timeReceived != null) { + builder.withTimeReceived(timeReceived); + } + JacksonEvent event = builder.build(); + + event.getMetadata().setAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION, action); + if (metadata != null) { + setIfPresent(event, metadata, "_index", + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX); + setIfPresent(event, metadata, "_id", + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID); + setIfPresent(event, metadata, "routing", + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING); + setIfPresent(event, metadata, "pipeline", + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE); + } + + eventConsumer.accept(new Record<>(event)); + } + } + + private String extractAction(Map actionLine) { + return Arrays.stream(OpenSearchBulkActions.values()) + .map(OpenSearchBulkActions::toString) + .filter(actionLine::containsKey) + .findFirst() + .orElse(null); + } + + private void setIfPresent(JacksonEvent event, Map metadata, String key, String attribute) { + Object value = metadata.get(key); + if (value != null) { + event.getMetadata().setAttribute(attribute, value.toString()); + } + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoderTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoderTest.java new file mode 100644 index 0000000000..7fd4d70268 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchBulkByteDecoderTest.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class OpenSearchBulkByteDecoderTest { + + private OpenSearchBulkByteDecoder decoder; + + @BeforeEach + void setUp() { + decoder = new OpenSearchBulkByteDecoder(); + } + + @Test + void parse_withIndexAction_createsEventWithCorrectMetadata() throws IOException { + String bulk = "{\"index\":{\"_index\":\"my-index\",\"_id\":\"1\"}}\n" + + "{\"field1\":\"value1\",\"field2\":42}\n"; + + List> records = parseAll(bulk); + + assertEquals(1, records.size()); + Event event = records.get(0).getData(); + assertEquals("value1", event.get("field1", String.class)); + assertEquals(42, event.get("field2", Integer.class)); + assertEquals("index", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + assertEquals("my-index", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX)); + assertEquals("1", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID)); + } + + @Test + void parse_withDeleteAction_createsEventWithNoBody() throws IOException { + String bulk = "{\"delete\":{\"_index\":\"my-index\",\"_id\":\"2\"}}\n"; + + List> records = parseAll(bulk); + + assertEquals(1, records.size()); + Event event = records.get(0).getData(); + assertEquals("delete", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + assertEquals("my-index", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX)); + } + + @Test + void parse_withMultipleActions_createsMultipleEvents() throws IOException { + String bulk = "{\"index\":{\"_index\":\"idx\",\"_id\":\"1\"}}\n" + + "{\"name\":\"doc1\"}\n" + + "{\"create\":{\"_index\":\"idx\",\"_id\":\"2\"}}\n" + + "{\"name\":\"doc2\"}\n" + + "{\"delete\":{\"_index\":\"idx\",\"_id\":\"3\"}}\n"; + + List> records = parseAll(bulk); + + assertEquals(3, records.size()); + assertEquals("index", records.get(0).getData().getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + assertEquals("create", records.get(1).getData().getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + assertEquals("delete", records.get(2).getData().getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + } + + @Test + void parse_withRoutingAndPipeline_setsMetadata() throws IOException { + String bulk = "{\"index\":{\"_index\":\"idx\",\"routing\":\"r1\",\"pipeline\":\"p1\"}}\n" + + "{\"data\":true}\n"; + + List> records = parseAll(bulk); + + assertEquals(1, records.size()); + Event event = records.get(0).getData(); + assertEquals("r1", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING)); + assertEquals("p1", event.getMetadata().getAttribute( + BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE)); + } + + @Test + void parse_withBlankLines_skipsBlankLines() throws IOException { + String bulk = "\n\n{\"index\":{\"_index\":\"idx\"}}\n{\"x\":1}\n\n"; + + List> records = parseAll(bulk); + + assertEquals(1, records.size()); + } + + @Test + void parse_withEmptyInput_returnsNoEvents() throws IOException { + List> records = parseAll(""); + assertTrue(records.isEmpty()); + } + + @Test + void parse_withTimeReceived_setsTimeOnEvent() throws IOException { + String bulk = "{\"index\":{\"_index\":\"idx\"}}\n{\"x\":1}\n"; + Instant now = Instant.now(); + + List> records = new ArrayList<>(); + decoder.parse(new ByteArrayInputStream(bulk.getBytes(StandardCharsets.UTF_8)), now, records::add); + + assertEquals(1, records.size()); + assertNotNull(records.get(0).getData().getMetadata().getTimeReceived()); + } + + private List> parseAll(String input) throws IOException { + List> records = new ArrayList<>(); + decoder.parse(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), Instant.now(), records::add); + return records; + } +}