Skip to content

Commit 7236d5b

Browse files
committed
Fix opensearch_api _bulk endpoint to return proper JSON response
The _bulk endpoint returned an empty HTTP 200 with no body. Clients using RestHighLevelClient (Flink, Logstash, opensearch-py) expect a JSON response with {took, errors, items[]} and fail with 'Unable to parse response body'. This adds buildBulkResponse() which constructs a response matching the OpenSearch Bulk API format. Resolves #6877 Signed-off-by: Divakar Pratap Singh <divakar.p.singh@gmail.com>
1 parent b85efd3 commit 7236d5b

1 file changed

Lines changed: 51 additions & 1 deletion

File tree

  • data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi

data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55

66
package org.opensearch.dataprepper.plugins.source.opensearchapi;
77

8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import com.fasterxml.jackson.databind.node.ArrayNode;
10+
import com.fasterxml.jackson.databind.node.ObjectNode;
811
import com.linecorp.armeria.common.AggregatedHttpRequest;
912
import com.linecorp.armeria.common.HttpData;
1013
import com.linecorp.armeria.common.HttpResponse;
1114
import com.linecorp.armeria.common.HttpStatus;
15+
import com.linecorp.armeria.common.MediaType;
1216
import com.linecorp.armeria.common.annotation.Nullable;
1317
import com.linecorp.armeria.server.ServiceRequestContext;
1418
import com.linecorp.armeria.server.annotation.Blocking;
@@ -55,6 +59,7 @@ public class OpenSearchAPIService implements BaseHttpService {
5559
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";
5660

5761
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class);
62+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
5863

5964
// TODO: support other data-types as request body, e.g. json_lines, msgpack
6065
private final MultiLineJsonCodec jsonCodec = new MultiLineJsonCodec();
@@ -132,7 +137,52 @@ private HttpResponse processBulkRequest(final ServiceRequestContext serviceReque
132137
throw e;
133138
}
134139
successRequestsCounter.increment();
135-
return HttpResponse.of(HttpStatus.OK);
140+
return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, buildBulkResponse(bulkRequestPayloadList));
141+
}
142+
143+
private String buildBulkResponse(final List<Map<String, Object>> bulkRequestPayloadList) {
144+
final ObjectNode root = OBJECT_MAPPER.createObjectNode();
145+
root.put("took", 0);
146+
root.put("errors", false);
147+
148+
final ArrayNode items = root.putArray("items");
149+
final Iterator<Map<String, Object>> it = bulkRequestPayloadList.iterator();
150+
int seq = 0;
151+
while (it.hasNext()) {
152+
final Map<String, Object> actionLine = it.next();
153+
final String action = Arrays.stream(OpenSearchBulkActions.values())
154+
.map(OpenSearchBulkActions::toString)
155+
.filter(actionLine::containsKey)
156+
.findFirst().orElse("index");
157+
158+
final boolean isDelete = OpenSearchBulkActions.DELETE.toString().equals(action);
159+
if (!isDelete && it.hasNext()) {
160+
it.next();
161+
}
162+
163+
@SuppressWarnings("unchecked")
164+
final Map<String, Object> meta = (Map<String, Object>) actionLine.get(action);
165+
final String index = meta != null && meta.get("_index") != null ? meta.get("_index").toString() : "unknown";
166+
final String id = meta != null && meta.get("_id") != null ? meta.get("_id").toString() : String.valueOf(seq);
167+
168+
final ObjectNode item = OBJECT_MAPPER.createObjectNode();
169+
final ObjectNode actionResult = item.putObject(action);
170+
actionResult.put("_index", index);
171+
actionResult.put("_id", id);
172+
actionResult.put("_version", 1);
173+
actionResult.put("result", isDelete ? "deleted" : "created");
174+
actionResult.put("status", isDelete ? 200 : 201);
175+
actionResult.put("_seq_no", seq);
176+
actionResult.put("_primary_term", 1);
177+
final ObjectNode shards = actionResult.putObject("_shards");
178+
shards.put("total", 1);
179+
shards.put("successful", 1);
180+
shards.put("failed", 0);
181+
182+
items.add(item);
183+
seq++;
184+
}
185+
return root.toString();
136186
}
137187

138188
private boolean isValidBulkAction(Map<String, Object> actionMap) {

0 commit comments

Comments
 (0)