From 91d5147c639dc1315e721a7c97edea69baea9039 Mon Sep 17 00:00:00 2001 From: Divakar Pratap Singh Date: Thu, 21 May 2026 07:16:51 +0000 Subject: [PATCH] Fix opensearch_api _bulk endpoint to return proper JSON response The _bulk endpoint returned an empty HTTP 200 with no body. Clients using RestHighLevelClient (Flink, Logstash, opensearch-py) expect a JSON response with {took, errors, items[]} and fail with 'Unable to parse response body'. This adds buildBulkResponse() which constructs a response matching the OpenSearch Bulk API format, iterating through the parsed action/doc pairs and producing one item per action. Resolves #6877 Signed-off-by: Divakar Pratap Singh --- .../opensearchapi/OpenSearchAPIService.java | 56 ++++++++++++++++++- .../OpenSearchAPIServiceTest.java | 36 ++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java index 90cb77a21f..feb932d19f 100644 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java @@ -5,10 +5,14 @@ package org.opensearch.dataprepper.plugins.source.opensearchapi; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.linecorp.armeria.common.AggregatedHttpRequest; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.Blocking; @@ -55,6 +59,7 @@ public class OpenSearchAPIService implements BaseHttpService { public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // TODO: support other data-types as request body, e.g. json_lines, msgpack private final MultiLineJsonCodec jsonCodec = new MultiLineJsonCodec(); @@ -132,7 +137,56 @@ private HttpResponse processBulkRequest(final ServiceRequestContext serviceReque throw e; } successRequestsCounter.increment(); - return HttpResponse.of(HttpStatus.OK); + return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, buildBulkResponse(bulkRequestPayloadList)); + } + + private String buildBulkResponse(final List> bulkRequestPayloadList) { + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("took", 0); + root.put("errors", false); + + final ArrayNode items = root.putArray("items"); + final Iterator> it = bulkRequestPayloadList.iterator(); + int seq = 0; + while (it.hasNext()) { + final Map actionLine = it.next(); + final String action = Arrays.stream(OpenSearchBulkActions.values()) + .map(OpenSearchBulkActions::toString) + .filter(actionLine::containsKey) + .findFirst().orElse(null); + + if (action == null) { + continue; + } + + final boolean isDelete = OpenSearchBulkActions.DELETE.toString().equals(action); + if (!isDelete && it.hasNext()) { + it.next(); + } + + @SuppressWarnings("unchecked") + final Map meta = (Map) actionLine.get(action); + final String index = meta != null && meta.get("_index") != null ? meta.get("_index").toString() : "unknown"; + final String id = meta != null && meta.get("_id") != null ? meta.get("_id").toString() : String.valueOf(seq); + + final ObjectNode item = OBJECT_MAPPER.createObjectNode(); + final ObjectNode actionResult = item.putObject(action); + actionResult.put("_index", index); + actionResult.put("_id", id); + actionResult.put("_version", 1); + actionResult.put("result", isDelete ? "deleted" : "created"); + actionResult.put("status", isDelete ? 200 : 201); + actionResult.put("_seq_no", seq); + actionResult.put("_primary_term", 1); + final ObjectNode shards = actionResult.putObject("_shards"); + shards.put("total", 1); + shards.put("successful", 1); + shards.put("failed", 0); + + items.add(item); + seq++; + } + return root.toString(); } private boolean isValidBulkAction(Map actionMap) { diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java index 28a567c4f4..4e4561e7bf 100644 --- a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java @@ -439,4 +439,40 @@ private AggregatedHttpRequest generateGoodBulkRequestWithMultipleActions(int num HttpData httpData = HttpData.ofUtf8(String.join("\n", jsonList)); return HttpRequest.of(requestHeaders, httpData).aggregate().get(); } + + @Test + public void testBulkRequestAPIResponseContainsValidBulkJson() throws Exception { + AggregatedHttpRequest testRequest = generateRandomValidBulkRequest(3); + AggregatedHttpResponse postResponse = openSearchAPIService.doPostBulk( + serviceRequestContext, testRequest, null, null).aggregate().get(); + + assertEquals(HttpStatus.OK, postResponse.status()); + String body = postResponse.contentUtf8(); + Map bulkResponse = new ObjectMapper().readValue(body, Map.class); + assertEquals(false, bulkResponse.get("errors")); + assertEquals(0, bulkResponse.get("took")); + List items = (List) bulkResponse.get("items"); + assertNotNull(items); + // generateRandomValidBulkRequest(3) creates 3 index actions + assertEquals(3, items.size()); + } + + @Test + public void testBulkRequestAPIResponseWithIndexInPathContainsValidBulkJson() throws Exception { + AggregatedHttpRequest testRequest = generateRandomValidBulkRequestWithNoIndexInBody(2); + AggregatedHttpResponse postResponse = openSearchAPIService.doPostBulkIndex( + serviceRequestContext, testRequest, "my-index", null, null).aggregate().get(); + + assertEquals(HttpStatus.OK, postResponse.status()); + String body = postResponse.contentUtf8(); + Map bulkResponse = new ObjectMapper().readValue(body, Map.class); + assertEquals(false, bulkResponse.get("errors")); + List items = (List) bulkResponse.get("items"); + assertNotNull(items); + assertEquals(2, items.size()); + } + + private static void assertNotNull(Object obj) { + if (obj == null) throw new AssertionError("Expected non-null"); + } }