Skip to content

Commit 6fbd388

Browse files
committed
Add ByteDecoder for opensearch_api source to support Kafka buffer
The opensearch_api source did not work with Kafka buffer because no ByteDecoder was registered. When buffer.isByteBuffer()=true, raw bytes are written but the consumer side had no way to reconstruct events from the NDJSON bulk format. This adds OpenSearchBulkByteDecoder which parses NDJSON bulk format (action/metadata + document line pairs) back into Data Prepper events with correct metadata attributes. Resolves #6876 Signed-off-by: Divakar Pratap Singh <divakar.p.singh@gmail.com>
1 parent b85efd3 commit 6fbd388

2 files changed

Lines changed: 107 additions & 0 deletions

File tree

data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
1212
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1313
import org.opensearch.dataprepper.model.buffer.Buffer;
14+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
1415
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
1516
import org.opensearch.dataprepper.model.event.Event;
1617
import org.opensearch.dataprepper.model.plugin.PluginFactory;
@@ -38,4 +39,9 @@ public BaseHttpService getHttpService(final int bufferWriteTimeoutInMillis, fina
3839
public String getHttpHealthCheckPath() {
3940
return HTTP_HEALTH_CHECK_PATH;
4041
}
42+
43+
@Override
44+
public ByteDecoder getDecoder() {
45+
return new OpenSearchBulkByteDecoder();
46+
}
4147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.opensearchapi;
12+
13+
import com.fasterxml.jackson.core.type.TypeReference;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.event.EventType;
18+
import org.opensearch.dataprepper.model.event.JacksonEvent;
19+
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
20+
import org.opensearch.dataprepper.model.record.Record;
21+
import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes;
22+
23+
import java.io.BufferedReader;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.io.InputStreamReader;
27+
import java.nio.charset.StandardCharsets;
28+
import java.time.Instant;
29+
import java.util.Arrays;
30+
import java.util.Map;
31+
import java.util.function.Consumer;
32+
33+
public class OpenSearchBulkByteDecoder implements ByteDecoder {
34+
35+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
36+
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};
37+
38+
@Override
39+
public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
40+
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
41+
String line;
42+
while ((line = reader.readLine()) != null) {
43+
if (line.isBlank()) continue;
44+
45+
Map<String, Object> actionLine = OBJECT_MAPPER.readValue(line, MAP_TYPE);
46+
String action = extractAction(actionLine);
47+
if (action == null) continue;
48+
49+
@SuppressWarnings("unchecked")
50+
Map<String, Object> metadata = (Map<String, Object>) actionLine.get(action);
51+
boolean isDelete = OpenSearchBulkActions.DELETE.toString().equals(action);
52+
53+
Map<String, Object> documentData = null;
54+
if (!isDelete) {
55+
String docLine = reader.readLine();
56+
if (docLine == null || docLine.isBlank()) continue;
57+
documentData = OBJECT_MAPPER.readValue(docLine, MAP_TYPE);
58+
}
59+
60+
JacksonEvent.Builder builder = JacksonEvent.builder()
61+
.withEventType(EventType.DOCUMENT.toString());
62+
if (documentData != null) {
63+
builder.withData(documentData);
64+
}
65+
if (timeReceived != null) {
66+
builder.withTimeReceived(timeReceived);
67+
}
68+
JacksonEvent event = builder.build();
69+
70+
event.getMetadata().setAttribute(
71+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION, action);
72+
if (metadata != null) {
73+
setIfPresent(event, metadata, "_index",
74+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX);
75+
setIfPresent(event, metadata, "_id",
76+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID);
77+
setIfPresent(event, metadata, "routing",
78+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING);
79+
setIfPresent(event, metadata, "pipeline",
80+
BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE);
81+
}
82+
83+
eventConsumer.accept(new Record<>(event));
84+
}
85+
}
86+
87+
private String extractAction(Map<String, Object> actionLine) {
88+
return Arrays.stream(OpenSearchBulkActions.values())
89+
.map(OpenSearchBulkActions::toString)
90+
.filter(actionLine::containsKey)
91+
.findFirst()
92+
.orElse(null);
93+
}
94+
95+
private void setIfPresent(JacksonEvent event, Map<String, Object> metadata, String key, String attribute) {
96+
Object value = metadata.get(key);
97+
if (value != null) {
98+
event.getMetadata().setAttribute(attribute, value.toString());
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)