Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
Expand Down Expand Up @@ -38,4 +39,9 @@ public BaseHttpService getHttpService(final int bufferWriteTimeoutInMillis, fina
public String getHttpHealthCheckPath() {
return HTTP_HEALTH_CHECK_PATH;
}

@Override
public ByteDecoder getDecoder() {
return new OpenSearchBulkByteDecoder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.opensearchapi;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Consumer;

public class OpenSearchBulkByteDecoder implements ByteDecoder {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};

@Override
public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {
if (line.isBlank()) continue;

Map<String, Object> actionLine = OBJECT_MAPPER.readValue(line, MAP_TYPE);
String action = extractAction(actionLine);
if (action == null) continue;

@SuppressWarnings("unchecked")
Map<String, Object> metadata = (Map<String, Object>) actionLine.get(action);
boolean isDelete = OpenSearchBulkActions.DELETE.toString().equals(action);

Map<String, Object> documentData = null;
if (!isDelete) {
String docLine = reader.readLine();
if (docLine == null || docLine.isBlank()) continue;
documentData = OBJECT_MAPPER.readValue(docLine, MAP_TYPE);
}

JacksonEvent.Builder builder = JacksonEvent.builder()
.withEventType(EventType.DOCUMENT.toString());
if (documentData != null) {
builder.withData(documentData);
}
if (timeReceived != null) {
builder.withTimeReceived(timeReceived);
}
JacksonEvent event = builder.build();

event.getMetadata().setAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION, action);
if (metadata != null) {
setIfPresent(event, metadata, "_index",
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX);
setIfPresent(event, metadata, "_id",
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID);
setIfPresent(event, metadata, "routing",
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING);
setIfPresent(event, metadata, "pipeline",
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE);
}

eventConsumer.accept(new Record<>(event));
}
}

private String extractAction(Map<String, Object> actionLine) {
return Arrays.stream(OpenSearchBulkActions.values())
.map(OpenSearchBulkActions::toString)
.filter(actionLine::containsKey)
.findFirst()
.orElse(null);
}

private void setIfPresent(JacksonEvent event, Map<String, Object> metadata, String key, String attribute) {
Object value = metadata.get(key);
if (value != null) {
event.getMetadata().setAttribute(attribute, value.toString());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.opensearchapi;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class OpenSearchBulkByteDecoderTest {

private OpenSearchBulkByteDecoder decoder;

@BeforeEach
void setUp() {
decoder = new OpenSearchBulkByteDecoder();
}

@Test
void parse_withIndexAction_createsEventWithCorrectMetadata() throws IOException {
String bulk = "{\"index\":{\"_index\":\"my-index\",\"_id\":\"1\"}}\n" +
"{\"field1\":\"value1\",\"field2\":42}\n";

List<Record<Event>> records = parseAll(bulk);

assertEquals(1, records.size());
Event event = records.get(0).getData();
assertEquals("value1", event.get("field1", String.class));
assertEquals(42, event.get("field2", Integer.class));
assertEquals("index", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
assertEquals("my-index", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX));
assertEquals("1", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID));
}

@Test
void parse_withDeleteAction_createsEventWithNoBody() throws IOException {
String bulk = "{\"delete\":{\"_index\":\"my-index\",\"_id\":\"2\"}}\n";

List<Record<Event>> records = parseAll(bulk);

assertEquals(1, records.size());
Event event = records.get(0).getData();
assertEquals("delete", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
assertEquals("my-index", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX));
}

@Test
void parse_withMultipleActions_createsMultipleEvents() throws IOException {
String bulk = "{\"index\":{\"_index\":\"idx\",\"_id\":\"1\"}}\n" +
"{\"name\":\"doc1\"}\n" +
"{\"create\":{\"_index\":\"idx\",\"_id\":\"2\"}}\n" +
"{\"name\":\"doc2\"}\n" +
"{\"delete\":{\"_index\":\"idx\",\"_id\":\"3\"}}\n";

List<Record<Event>> records = parseAll(bulk);

assertEquals(3, records.size());
assertEquals("index", records.get(0).getData().getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
assertEquals("create", records.get(1).getData().getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
assertEquals("delete", records.get(2).getData().getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
}

@Test
void parse_withRoutingAndPipeline_setsMetadata() throws IOException {
String bulk = "{\"index\":{\"_index\":\"idx\",\"routing\":\"r1\",\"pipeline\":\"p1\"}}\n" +
"{\"data\":true}\n";

List<Record<Event>> records = parseAll(bulk);

assertEquals(1, records.size());
Event event = records.get(0).getData();
assertEquals("r1", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING));
assertEquals("p1", event.getMetadata().getAttribute(
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE));
}

@Test
void parse_withBlankLines_skipsBlankLines() throws IOException {
String bulk = "\n\n{\"index\":{\"_index\":\"idx\"}}\n{\"x\":1}\n\n";

List<Record<Event>> records = parseAll(bulk);

assertEquals(1, records.size());
}

@Test
void parse_withEmptyInput_returnsNoEvents() throws IOException {
List<Record<Event>> records = parseAll("");
assertTrue(records.isEmpty());
}

@Test
void parse_withTimeReceived_setsTimeOnEvent() throws IOException {
String bulk = "{\"index\":{\"_index\":\"idx\"}}\n{\"x\":1}\n";
Instant now = Instant.now();

List<Record<Event>> records = new ArrayList<>();
decoder.parse(new ByteArrayInputStream(bulk.getBytes(StandardCharsets.UTF_8)), now, records::add);

assertEquals(1, records.size());
assertNotNull(records.get(0).getData().getMetadata().getTimeReceived());
}

private List<Record<Event>> parseAll(String input) throws IOException {
List<Record<Event>> records = new ArrayList<>();
decoder.parse(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), Instant.now(), records::add);
return records;
}
}
Loading