diff --git a/data-prepper-plugins/opensearch-api-source/build.gradle b/data-prepper-plugins/opensearch-api-source/build.gradle index 492397ddc9..dac6882bfd 100644 --- a/data-prepper-plugins/opensearch-api-source/build.gradle +++ b/data-prepper-plugins/opensearch-api-source/build.gradle @@ -22,6 +22,7 @@ dependencies { testImplementation project(':data-prepper-test:test-common') testImplementation project(':data-prepper-api').sourceSets.test.output testImplementation 'org.assertj:assertj-core:3.25.3' + testImplementation libs.opensearch.rhlc compileOnly 'org.projectlombok:lombok:1.18.20' annotationProcessor 'org.projectlombok:lombok:1.18.20' } diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java index 90cb77a21f..9f0d9e1183 100644 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java @@ -5,10 +5,13 @@ package org.opensearch.dataprepper.plugins.source.opensearchapi; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.linecorp.armeria.common.AggregatedHttpRequest; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.Blocking; @@ -55,6 +58,7 @@ public class OpenSearchAPIService implements BaseHttpService { public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // TODO: support other data-types as request body, e.g. json_lines, msgpack private final MultiLineJsonCodec jsonCodec = new MultiLineJsonCodec(); @@ -102,6 +106,7 @@ public HttpResponse doPostBulkIndex(final ServiceRequestContext serviceRequestCo } private HttpResponse processBulkRequest(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest, final BulkAPIRequestParams bulkAPIRequestParams) throws Exception { + final long startNanos = System.nanoTime(); requestsReceivedCounter.increment(); payloadSizeSummary.record(aggregatedHttpRequest.content().length()); @@ -132,7 +137,16 @@ private HttpResponse processBulkRequest(final ServiceRequestContext serviceReque throw e; } successRequestsCounter.increment(); - return HttpResponse.of(HttpStatus.OK); + return buildBulkResponse(startNanos, false); + } + + private HttpResponse buildBulkResponse(final long startNanos, final boolean errors) { + final long tookMillis = (System.nanoTime() - startNanos) / 1_000_000; + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("took", tookMillis); + root.put("errors", errors); + root.putArray("items"); + return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, root.toString()); } private boolean isValidBulkAction(Map actionMap) { diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/BulkResponseParsingTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/BulkResponseParsingTest.java new file mode 100644 index 0000000000..a47db74179 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/BulkResponseParsingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import org.junit.jupiter.api.Test; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests that validate the OpenSearch RestHighLevelClient (used by Flink connector) + * can successfully parse the empty-items bulk response format. + * + * This is the critical compatibility test: if fromXContent doesn't throw, + * Flink won't crash. + */ +class BulkResponseParsingTest { + + @Test + void testEmptyItemsSuccessResponse() throws Exception { + String json = "{\"took\":5,\"errors\":false,\"items\":[]}"; + + XContentParser parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); + BulkResponse response = BulkResponse.fromXContent(parser); + + assertFalse(response.hasFailures()); + assertEquals(0, response.getItems().length); + assertEquals(5, response.getTook().millis()); + } + + @Test + void testEmptyItemsErrorResponse() throws Exception { + String json = "{\"took\":12,\"errors\":true,\"items\":[]}"; + + XContentParser parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); + BulkResponse response = BulkResponse.fromXContent(parser); + + // errors=true but items is empty, so hasFailures checks items array + // This tests the edge case where errors flag is true but no items detail the error + assertEquals(0, response.getItems().length); + assertEquals(12, response.getTook().millis()); + } + + @Test + void testZeroTookValue() throws Exception { + String json = "{\"took\":0,\"errors\":false,\"items\":[]}"; + + XContentParser parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); + BulkResponse response = BulkResponse.fromXContent(parser); + + assertFalse(response.hasFailures()); + assertEquals(0, response.getItems().length); + } + + @Test + void testLargeTookValue() throws Exception { + String json = "{\"took\":30000,\"errors\":false,\"items\":[]}"; + + XContentParser parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); + BulkResponse response = BulkResponse.fromXContent(parser); + + assertFalse(response.hasFailures()); + assertEquals(30000, response.getTook().millis()); + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/FlinkBulkResponseIT.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/FlinkBulkResponseIT.java new file mode 100644 index 0000000000..096590d22a --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/FlinkBulkResponseIT.java @@ -0,0 +1,190 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpHost; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; + +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; + +import java.util.Collections; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Integration test: sends a real BulkRequest via RestHighLevelClient + * (same client Flink uses) to the opensearch_api source endpoint + * and verifies the empty-items response is parsed without errors. + * + * This proves end-to-end compatibility with the Flink OpenSearch connector. + */ +@ExtendWith(MockitoExtension.class) +class FlinkBulkResponseIT { + + private static final int TEST_PORT = 19202; + private static final int BUFFER_SIZE = 100; + private static final int TIMEOUT_MS = 10_000; + + @Mock + private PipelineDescription pipelineDescription; + + private Server server; + private RestHighLevelClient restHighLevelClient; + + @BeforeEach + void setUp() throws Exception { + lenient().when(pipelineDescription.getPipelineName()).thenReturn("test-pipeline"); + + // Set up metrics + MetricsTestUtil.initMetrics(); + PluginMetrics pluginMetrics = PluginMetrics.fromNames("opensearch_api", "test-pipeline"); + + // Create buffer + BlockingBuffer> buffer = new BlockingBuffer<>(BUFFER_SIZE, 8, "test-pipeline"); + + // Create the service + OpenSearchAPIService service = new OpenSearchAPIService(TIMEOUT_MS, buffer, pluginMetrics); + + // Start an Armeria server with the service + ServerBuilder sb = Server.builder(); + sb.http(TEST_PORT); + sb.annotatedService(service, new HttpRequestExceptionHandler(pluginMetrics)); + server = sb.build(); + server.start().join(); + + // Create RestHighLevelClient pointing at our test server + restHighLevelClient = new RestHighLevelClient( + RestClient.builder(new HttpHost("localhost", TEST_PORT, "http"))); + } + + @AfterEach + void tearDown() throws Exception { + if (restHighLevelClient != null) { + restHighLevelClient.close(); + } + if (server != null) { + server.stop().join(); + } + } + + @Test + void testFlinkBulkRequestWithEmptyItemsResponse() throws Exception { + // Build a bulk request identical to what Flink would send + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("test-index") + .id("doc-1") + .source(Map.of("message", "hello from flink", "timestamp", System.currentTimeMillis()), XContentType.JSON)); + bulkRequest.add(new IndexRequest("test-index") + .id("doc-2") + .source(Map.of("message", "second document", "count", 42), XContentType.JSON)); + + // Execute the bulk request (this is exactly what Flink does internally) + BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); + + // Verify: no failures, response parsed successfully + assertNotNull(response); + assertFalse(response.hasFailures(), "BulkResponse should report no failures"); + assertEquals(0, response.getItems().length, "Items array should be empty"); + assertNotNull(response.getTook(), "Took value should be present"); + } + + @Test + void testFlinkBulkRequestMultipleDocuments() throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 10; i++) { + bulkRequest.add(new IndexRequest("metrics-index") + .id("metric-" + i) + .source(Map.of("value", i, "name", "cpu_usage"), XContentType.JSON)); + } + + BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); + + assertNotNull(response); + assertFalse(response.hasFailures()); + assertEquals(0, response.getItems().length); + } + + @Test + void testFlinkBulkRequestFailsOnBufferOverflow() throws Exception { + // Create a service with a tiny buffer (capacity=1) to force overflow + MetricsTestUtil.initMetrics(); + PluginMetrics overflowMetrics = PluginMetrics.fromNames("opensearch_api_overflow", "test-pipeline"); + BlockingBuffer> tinyBuffer = new BlockingBuffer<>(1, 1, "test-pipeline"); + OpenSearchAPIService overflowService = new OpenSearchAPIService(TIMEOUT_MS, tinyBuffer, overflowMetrics); + + Server overflowServer = Server.builder() + .http(TEST_PORT + 1) + .annotatedService(overflowService, new HttpRequestExceptionHandler(overflowMetrics)) + .build(); + overflowServer.start().join(); + + RestHighLevelClient overflowClient = new RestHighLevelClient( + RestClient.builder(new HttpHost("localhost", TEST_PORT + 1, "http"))); + + try { + // Send more documents than the buffer can hold + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 50; i++) { + bulkRequest.add(new IndexRequest("test-index") + .id("overflow-" + i) + .source(Map.of("data", "x".repeat(1000)), XContentType.JSON)); + } + + // This should throw because Data Prepper returns non-200 on buffer overflow + // Flink would see this as a complete bulk failure and retry + org.opensearch.OpenSearchStatusException exception = + org.junit.jupiter.api.Assertions.assertThrows( + org.opensearch.OpenSearchStatusException.class, + () -> overflowClient.bulk(bulkRequest, RequestOptions.DEFAULT)); + + // Verify it's a 413 (entity too large) - current DP behavior + // Note: real OpenSearch would return 200 with errors:true in items + assertTrue(exception.getMessage().contains("Unable to parse response body") + || exception.status().getStatus() == 413 + || exception.status().getStatus() == 408); + } finally { + overflowClient.close(); + overflowServer.stop().join(); + } + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java index 28a567c4f4..72e7c418ee 100644 --- a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.opensearchapi; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.linecorp.armeria.common.AggregatedHttpRequest; import com.linecorp.armeria.common.AggregatedHttpResponse; @@ -49,6 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -439,4 +443,44 @@ private AggregatedHttpRequest generateGoodBulkRequestWithMultipleActions(int num HttpData httpData = HttpData.ofUtf8(String.join("\n", jsonList)); return HttpRequest.of(requestHeaders, httpData).aggregate().get(); } + + @Test + public void testBulkResponseContainsValidJson() throws Exception { + AggregatedHttpRequest testRequest = generateRandomValidBulkRequest(2); + AggregatedHttpResponse response = openSearchAPIService.doPostBulk(serviceRequestContext, testRequest, + null, null).aggregate().get(); + + assertEquals(HttpStatus.OK, response.status()); + String body = response.contentUtf8(); + JsonNode root = mapper.readTree(body); + + assertTrue(root.has("took")); + assertTrue(root.get("took").isNumber()); + assertTrue(root.get("took").longValue() >= 0); + assertFalse(root.get("errors").booleanValue()); + assertTrue(root.get("items").isArray()); + assertEquals(0, root.get("items").size()); + } + + @Test + public void testBulkResponseContentTypeIsJson() throws Exception { + AggregatedHttpRequest testRequest = generateRandomValidBulkRequest(1); + AggregatedHttpResponse response = openSearchAPIService.doPostBulk(serviceRequestContext, testRequest, + null, null).aggregate().get(); + + assertTrue(response.contentType().is(MediaType.JSON_UTF_8)); + } + + @Test + public void testBulkResponseWithIndexPathContainsValidJson() throws Exception { + AggregatedHttpRequest testRequest = generateRandomValidBulkRequestWithNoIndexInBody(1); + AggregatedHttpResponse response = openSearchAPIService.doPostBulkIndex(serviceRequestContext, testRequest, + "my-index", null, null).aggregate().get(); + + assertEquals(HttpStatus.OK, response.status()); + JsonNode root = mapper.readTree(response.contentUtf8()); + assertNotNull(root.get("took")); + assertFalse(root.get("errors").booleanValue()); + assertEquals(0, root.get("items").size()); + } }