Skip to content

Commit 2a982d8

Browse files
committed
Add ByteDecoder for opensearch_api source to support Kafka buffer
The opensearch_api source did not work with Kafka buffer because no ByteDecoder was registered. When buffer.isByteBuffer()=true, raw bytes are written but the consumer side had no way to reconstruct events from the NDJSON bulk format. This adds OpenSearchBulkByteDecoder which parses NDJSON bulk format (action/metadata + document line pairs) back into Data Prepper events with correct metadata attributes. Resolves #6876 Signed-off-by: Divakar Pratap Singh <divakar.p.singh@gmail.com>
1 parent b85efd3 commit 2a982d8

3 files changed

Lines changed: 245 additions & 0 deletions

File tree

data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
1212
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1313
import org.opensearch.dataprepper.model.buffer.Buffer;
14+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
1415
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
1516
import org.opensearch.dataprepper.model.event.Event;
1617
import org.opensearch.dataprepper.model.plugin.PluginFactory;
@@ -38,4 +39,9 @@ public BaseHttpService getHttpService(final int bufferWriteTimeoutInMillis, fina
3839
public String getHttpHealthCheckPath() {
3940
return HTTP_HEALTH_CHECK_PATH;
4041
}
42+
43+
@Override
44+
public ByteDecoder getDecoder() {
45+
return new OpenSearchBulkByteDecoder();
46+
}
4147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.opensearchapi;
12+
13+
import com.fasterxml.jackson.core.type.TypeReference;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.event.EventType;
18+
import org.opensearch.dataprepper.model.event.JacksonEvent;
19+
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
20+
import org.opensearch.dataprepper.model.record.Record;
21+
import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes;
22+
23+
import java.io.BufferedReader;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.io.InputStreamReader;
27+
import java.nio.charset.StandardCharsets;
28+
import java.time.Instant;
29+
import java.util.Arrays;
30+
import java.util.Map;
31+
import java.util.function.Consumer;
32+
33+
public class OpenSearchBulkByteDecoder implements ByteDecoder {
34+
35+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
36+
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};
37+
38+
@Override
39+
public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
40+
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
41+
String line;
42+
while ((line = reader.readLine()) != null) {
43+
if (line.isBlank()) continue;
44+
45+
Map<String, Object> actionLine = OBJECT_MAPPER.readValue(line, MAP_TYPE);
46+
String action = extractAction(actionLine);
47+
if (action == null) continue;
48+
49+
@SuppressWarnings("unchecked")
50+
Map<String, Object> metadata = (Map<String, Object>) actionLine.get(action);
51+
boolean isDelete = OpenSearchBulkActions.DELETE.toString().equals(action);
52+
53+
Map<String, Object> documentData = null;
54+
if (!isDelete) {
55+
String docLine = reader.readLine();
56+
if (docLine == null || docLine.isBlank()) continue;
57+
documentData = OBJECT_MAPPER.readValue(docLine, MAP_TYPE);
58+
}
59+
60+
JacksonEvent.Builder builder = JacksonEvent.builder()
61+
.withEventType(EventType.DOCUMENT.toString());
62+
if (documentData != null) {
63+
builder.withData(documentData);
64+
}
65+
if (timeReceived != null) {
66+
builder.withTimeReceived(timeReceived);
67+
}
68+
JacksonEvent event = builder.build();
69+
70+
event.getMetadata().setAttribute(
71+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION, action);
72+
if (metadata != null) {
73+
setIfPresent(event, metadata, "_index",
74+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX);
75+
setIfPresent(event, metadata, "_id",
76+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID);
77+
setIfPresent(event, metadata, "routing",
78+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING);
79+
setIfPresent(event, metadata, "pipeline",
80+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE);
81+
}
82+
83+
eventConsumer.accept(new Record<>(event));
84+
}
85+
}
86+
87+
private String extractAction(Map<String, Object> actionLine) {
88+
return Arrays.stream(OpenSearchBulkActions.values())
89+
.map(OpenSearchBulkActions::toString)
90+
.filter(actionLine::containsKey)
91+
.findFirst()
92+
.orElse(null);
93+
}
94+
95+
private void setIfPresent(JacksonEvent event, Map<String, Object> metadata, String key, String attribute) {
96+
Object value = metadata.get(key);
97+
if (value != null) {
98+
event.getMetadata().setAttribute(attribute, value.toString());
99+
}
100+
}
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.opensearchapi;
12+
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import org.opensearch.dataprepper.model.event.Event;
16+
import org.opensearch.dataprepper.model.record.Record;
17+
import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.IOException;
21+
import java.nio.charset.StandardCharsets;
22+
import java.time.Instant;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
28+
import static org.junit.jupiter.api.Assertions.assertTrue;
29+
30+
class OpenSearchBulkByteDecoderTest {
31+
32+
private OpenSearchBulkByteDecoder decoder;
33+
34+
@BeforeEach
35+
void setUp() {
36+
decoder = new OpenSearchBulkByteDecoder();
37+
}
38+
39+
@Test
40+
void parse_withIndexAction_createsEventWithCorrectMetadata() throws IOException {
41+
String bulk = "{\"index\":{\"_index\":\"my-index\",\"_id\":\"1\"}}\n" +
42+
"{\"field1\":\"value1\",\"field2\":42}\n";
43+
44+
List<Record<Event>> records = parseAll(bulk);
45+
46+
assertEquals(1, records.size());
47+
Event event = records.get(0).getData();
48+
assertEquals("value1", event.get("field1", String.class));
49+
assertEquals(42, event.get("field2", Integer.class));
50+
assertEquals("index", event.getMetadata().getAttribute(
51+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
52+
assertEquals("my-index", event.getMetadata().getAttribute(
53+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX));
54+
assertEquals("1", event.getMetadata().getAttribute(
55+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID));
56+
}
57+
58+
@Test
59+
void parse_withDeleteAction_createsEventWithNoBody() throws IOException {
60+
String bulk = "{\"delete\":{\"_index\":\"my-index\",\"_id\":\"2\"}}\n";
61+
62+
List<Record<Event>> records = parseAll(bulk);
63+
64+
assertEquals(1, records.size());
65+
Event event = records.get(0).getData();
66+
assertEquals("delete", event.getMetadata().getAttribute(
67+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
68+
assertEquals("my-index", event.getMetadata().getAttribute(
69+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX));
70+
}
71+
72+
@Test
73+
void parse_withMultipleActions_createsMultipleEvents() throws IOException {
74+
String bulk = "{\"index\":{\"_index\":\"idx\",\"_id\":\"1\"}}\n" +
75+
"{\"name\":\"doc1\"}\n" +
76+
"{\"create\":{\"_index\":\"idx\",\"_id\":\"2\"}}\n" +
77+
"{\"name\":\"doc2\"}\n" +
78+
"{\"delete\":{\"_index\":\"idx\",\"_id\":\"3\"}}\n";
79+
80+
List<Record<Event>> records = parseAll(bulk);
81+
82+
assertEquals(3, records.size());
83+
assertEquals("index", records.get(0).getData().getMetadata().getAttribute(
84+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
85+
assertEquals("create", records.get(1).getData().getMetadata().getAttribute(
86+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
87+
assertEquals("delete", records.get(2).getData().getMetadata().getAttribute(
88+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION));
89+
}
90+
91+
@Test
92+
void parse_withRoutingAndPipeline_setsMetadata() throws IOException {
93+
String bulk = "{\"index\":{\"_index\":\"idx\",\"routing\":\"r1\",\"pipeline\":\"p1\"}}\n" +
94+
"{\"data\":true}\n";
95+
96+
List<Record<Event>> records = parseAll(bulk);
97+
98+
assertEquals(1, records.size());
99+
Event event = records.get(0).getData();
100+
assertEquals("r1", event.getMetadata().getAttribute(
101+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING));
102+
assertEquals("p1", event.getMetadata().getAttribute(
103+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE));
104+
}
105+
106+
@Test
107+
void parse_withBlankLines_skipsBlankLines() throws IOException {
108+
String bulk = "\n\n{\"index\":{\"_index\":\"idx\"}}\n{\"x\":1}\n\n";
109+
110+
List<Record<Event>> records = parseAll(bulk);
111+
112+
assertEquals(1, records.size());
113+
}
114+
115+
@Test
116+
void parse_withEmptyInput_returnsNoEvents() throws IOException {
117+
List<Record<Event>> records = parseAll("");
118+
assertTrue(records.isEmpty());
119+
}
120+
121+
@Test
122+
void parse_withTimeReceived_setsTimeOnEvent() throws IOException {
123+
String bulk = "{\"index\":{\"_index\":\"idx\"}}\n{\"x\":1}\n";
124+
Instant now = Instant.now();
125+
126+
List<Record<Event>> records = new ArrayList<>();
127+
decoder.parse(new ByteArrayInputStream(bulk.getBytes(StandardCharsets.UTF_8)), now, records::add);
128+
129+
assertEquals(1, records.size());
130+
assertNotNull(records.get(0).getData().getMetadata().getTimeReceived());
131+
}
132+
133+
private List<Record<Event>> parseAll(String input) throws IOException {
134+
List<Record<Event>> records = new ArrayList<>();
135+
decoder.parse(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), Instant.now(), records::add);
136+
return records;
137+
}
138+
}

0 commit comments

Comments
 (0)