diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java index 68dbed2280..97830c3c59 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.model.configuration; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -30,6 +31,9 @@ public SinkForwardConfig( @JsonProperty("pipelines") final List pipelineNames, @JsonProperty("with_data") final Map withData, @JsonProperty("with_metadata") final Map withMetadata) { + if (pipelineNames.size() != 1) { + throw new InvalidPluginConfigurationException("Supports only one forwarding pipeline"); + } this.pipelineNames = pipelineNames; this.withData = withData; this.withMetadata = withMetadata; @@ -46,5 +50,6 @@ public Map getWithMetadata() { public Map getWithData() { return withData; } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java index 0d3b9485e1..010b027e8f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java @@ -97,6 +97,7 @@ public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecords for (Map.Entry entry: forwardToPipelines.entrySet()) { entry.getValue().sendEvents(records); } + sinkForwardRecordsContext.clearRecords(); return true; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java index f972aad3cb..0528a67a76 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java @@ -7,7 +7,6 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.InternalEventHandle; import java.util.ArrayList; import java.util.Collection; @@ -25,28 +24,21 @@ public SinkForwardRecordsContext(SinkContext sinkContext) { public void addRecord(Record record) { if (!forwardPipelinesPresent) return; - InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle(); - if (eventHandle != null) { - eventHandle.acquireReference(); - } records.add(record); } public void addRecords(Collection> newRecords) { if (!forwardPipelinesPresent) return; - newRecords.forEach((record) -> { - Event event = record.getData(); - InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle(); - if (eventHandle != null) { - eventHandle.acquireReference(); - } - }); records.addAll(newRecords); } public List> getRecords() { return records; } + + public void clearRecords() { + records.clear(); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java index a9b64daed9..f962842533 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java @@ -5,12 +5,14 @@ package org.opensearch.dataprepper.model.configuration; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.mockito.Mockito.mock; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.List; import java.util.Map; @@ -26,8 +28,8 @@ void testDefaults() { } @Test - void testCustomValues() { - List pipelines = mock(List.class); + void pipelines_lsit_with_one_pipeline_succeeds() { + List pipelines = List.of("pipeline1"); Map withData = mock(Map.class); Map withMetadata = mock(Map.class); SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata); @@ -35,5 +37,18 @@ void testCustomValues() { assertThat(sinkForwardConfig.getWithData(), equalTo(withData)); assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata)); } + + @Test + void pipelines_list_with_two_or_more_pipelines_throws_exception() { + List pipelines = List.of("pipeline1", "pipeline2"); + Map withData = mock(Map.class); + Map withMetadata = mock(Map.class); + assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(pipelines, withData, withMetadata)); + } + + @Test + void empty_pipelines_list_throws_exception() { + assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(List.of(), Map.of(), Map.of())); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java index 72a5f64c6b..6f409aec16 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java @@ -93,17 +93,21 @@ public void testForwardToPipelinesWithPipelineMap() { verify(forwardPipeline2, times(0)).sendEvents(eq(records)); sinkForwardRecordsContext.addRecords(records); assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true)); - verify(forwardPipeline1, times(1)).sendEvents(eq(records)); - verify(forwardPipeline2, times(1)).sendEvents(eq(records)); + verify(forwardPipeline1, times(1)).sendEvents(any()); + verify(forwardPipeline2, times(1)).sendEvents(any()); verify(event, times(1)).put(any(String.class), any(Object.class)); verify(event, times(1)).getMetadata(); verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class)); + records = Collections.singletonList(record); + sinkForwardRecordsContext.addRecords(records); assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, null), equalTo(true)); - verify(forwardPipeline1, times(2)).sendEvents(eq(records)); - verify(forwardPipeline2, times(2)).sendEvents(eq(records)); + verify(forwardPipeline1, times(2)).sendEvents(any()); + verify(forwardPipeline2, times(2)).sendEvents(any()); + records = Collections.singletonList(record); + sinkForwardRecordsContext.addRecords(records); assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(true)); - verify(forwardPipeline1, times(3)).sendEvents(eq(records)); - verify(forwardPipeline2, times(3)).sendEvents(eq(records)); + verify(forwardPipeline1, times(3)).sendEvents(any()); + verify(forwardPipeline2, times(3)).sendEvents(any()); } @Test @@ -148,6 +152,7 @@ public void testWithNoForwardToPipelines() { SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); sinkForwardRecordsContext.addRecords(List.of(record)); assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false)); + assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0)); } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java index 84450db5dc..abf7bf4a66 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java @@ -9,13 +9,11 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import static org.mockito.Mockito.mock; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; import java.util.List; @@ -30,16 +28,9 @@ public void testSinkForwardRecordContextBasic() { SinkContext sinkContext = mock(SinkContext.class); when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); - Event event = mock(Event.class); - DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); - doNothing().when(eventHandle).acquireReference(); Record record1 = mock(Record.class); Record record2 = mock(Record.class); Record record3 = mock(Record.class); - when(record1.getData()).thenReturn(event); - when(record2.getData()).thenReturn(event); - when(record3.getData()).thenReturn(event); - when(event.getEventHandle()).thenReturn(eventHandle); sinkForwardRecordsContext.addRecord(record1); sinkForwardRecordsContext.addRecords(List.of(record2, record3)); List> records = sinkForwardRecordsContext.getRecords(); @@ -52,19 +43,28 @@ public void testSinkForwardRecordContextWithForwardingPipelines() { HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class); when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline)); sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); - Event event = mock(Event.class); - DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); - doNothing().when(eventHandle).acquireReference(); Record record1 = mock(Record.class); Record record2 = mock(Record.class); Record record3 = mock(Record.class); - when(record1.getData()).thenReturn(event); - when(record2.getData()).thenReturn(event); - when(record3.getData()).thenReturn(event); - when(event.getEventHandle()).thenReturn(eventHandle); sinkForwardRecordsContext.addRecord(record1); sinkForwardRecordsContext.addRecords(List.of(record2, record3)); List> records = sinkForwardRecordsContext.getRecords(); assertThat(records.size(), equalTo(3)); + sinkForwardRecordsContext.clearRecords(); + assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0)); + } + + @Test + public void testSinkForwardRecordContextClearRecords() { + SinkContext sinkContext = mock(SinkContext.class); + HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline)); + sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + Record record1 = mock(Record.class); + Record record2 = mock(Record.class); + sinkForwardRecordsContext.addRecords(List.of(record1, record2)); + assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(2)); + sinkForwardRecordsContext.clearRecords(); + assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0)); } } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 67ad1caef4..2a5fae4dc8 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -38,6 +38,7 @@ import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; @@ -94,6 +95,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -151,21 +153,25 @@ public class OpenSearchSinkIT { private PluginConfigObservable pluginConfigObservable; public OpenSearchSink createObjectUnderTest(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) { + sinkContext = mock(SinkContext.class); + when(sinkContext.getTagsTargetKey()).thenReturn(null); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME); when(pluginSetting.getName()).thenReturn(PLUGIN_NAME); OpenSearchSink sink = new OpenSearchSink( - pluginSetting, null, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); + pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); if (doInitialize) { sink.doInitialize(); } return sink; } - public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) { + public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, final Map forwardPipelineMap, boolean doInitialize) { sinkContext = mock(SinkContext.class); testTagsTargetKey = RandomStringUtils.randomAlphabetic(5); when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey); + when(sinkContext.getForwardToPipelines()).thenReturn(forwardPipelineMap); when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME); when(pluginSetting.getName()).thenReturn(PLUGIN_NAME); @@ -846,6 +852,36 @@ public Stream provideArguments(ExtensionContext context) { } } + @Test + public void testOutputForwardsCreatedDocumentsToAPipeline() throws IOException, InterruptedException { + HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class); + Map forwardPipelineMap = Map.of("fwd_pipeline1", forwardPipeline1); + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); + final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, forwardPipelineMap, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + verify(sinkContext).forwardRecords(any(), eq(null), eq(null)); + } + @Test public void testOutputCustomIndex() throws IOException, InterruptedException { final String testIndexAlias = "test-alias"; @@ -1255,7 +1291,7 @@ public void testEventOutputWithTags() throws IOException, InterruptedException { final List> testRecords = Collections.singletonList(new Record<>(testEvent)); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, true); + final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, Map.of(), true); sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 06d7cbe785..90e056c528 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -28,11 +28,13 @@ import java.net.SocketTimeoutException; import java.time.Duration; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; public final class BulkRetryStrategy { @@ -114,6 +116,7 @@ public final class BulkRetryStrategy { private final RequestFunction, BulkResponse> requestFunction; private final BiConsumer, Throwable> logFailure; + private final Consumer> successfulOperationsHandler; private final PluginMetrics pluginMetrics; private final Supplier bulkRequestSupplier; private final int maxRetries; @@ -158,6 +161,7 @@ String getExceptionMessage() { public BulkRetryStrategy(final RequestFunction, BulkResponse> requestFunction, final BiConsumer, Throwable> logFailure, + final Consumer> successfulOperationsHandler, final PluginMetrics pluginMetrics, final int maxRetries, final Supplier bulkRequestSupplier, @@ -167,6 +171,7 @@ public BulkRetryStrategy(final RequestFunction successfulOperations = new ArrayList<>(bulkRequestForRetry.getOperations()); + successfulOperationsHandler.accept(successfulOperations); final int totalDuplicateDocuments = bulkResponse.items().stream().filter(this::isDuplicateDocument).mapToInt(i -> 1).sum(); documentsDuplicates.increment(totalDuplicateDocuments); } @@ -384,6 +388,7 @@ private AccumulatingBulkRequest createBulkReq final AccumulatingBulkRequest requestToReissue = bulkRequestSupplier.get(); final ImmutableList.Builder nonRetryableFailures = ImmutableList.builder(); int index = 0; + List successfulOperations = new ArrayList<>(response.items().size()); for (final BulkResponseItem bulkItemResponse : response.items()) { BulkOperationWrapper bulkOperation = (BulkOperationWrapper)request.getOperationAt(index); @@ -399,6 +404,8 @@ private AccumulatingBulkRequest createBulkReq } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason()); + // This is not a successfully sent document, so do not add to "successfulOperations" + // and just release the eventHandle bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() @@ -413,10 +420,11 @@ private AccumulatingBulkRequest createBulkReq if(isDuplicateDocument(bulkItemResponse)) { documentsDuplicates.increment(); } - bulkOperation.releaseEventHandle(true); + successfulOperations.add(bulkOperation); } index++; } + successfulOperationsHandler.accept(successfulOperations); final ImmutableList failedBulkOperations = nonRetryableFailures.build(); if(!failedBulkOperations.isEmpty()) { logFailure.accept(failedBulkOperations, null); @@ -428,6 +436,7 @@ private AccumulatingBulkRequest createBulkReq private void handleFailures(final AccumulatingBulkRequest accumulatingBulkRequest, final List itemResponses) { assert accumulatingBulkRequest.getOperationsCount() == itemResponses.size(); final ImmutableList.Builder failures = ImmutableList.builder(); + final List successfulOperations = new ArrayList<>(itemResponses.size()); for (int i = 0; i < itemResponses.size(); i++) { final BulkResponseItem bulkItemResponse = itemResponses.get(i); final BulkOperationWrapper bulkOperation = accumulatingBulkRequest.getOperationAt(i); @@ -435,6 +444,8 @@ private void handleFailures(final AccumulatingBulkRequest> { private volatile boolean initialized; private final SinkContext sinkContext; private final ExpressionEvaluator expressionEvaluator; + private final boolean useEventInBulkOperation; private FailedBulkOperationConverter failedBulkOperationConverter; private DataStreamDetector dataStreamDetector; @@ -167,6 +170,7 @@ public class OpenSearchSink extends AbstractSink> { private final ExecutorService queryExecutorService; private final int processWorkerThreads; + private final SinkForwardRecordsContext sinkForwardRecordsContext; @DataPrepperPluginConstructor public OpenSearchSink(final PluginSetting pluginSetting, @@ -180,8 +184,10 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.processWorkerThreads = pipelineDescription.getNumberOfProcessWorkers(); this.awsCredentialsSupplier = awsCredentialsSupplier; this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); this.expressionEvaluator = expressionEvaluator; this.pipeline = pipelineDescription.getPipelineName(); + this.useEventInBulkOperation = (getFailurePipeline() != null || sinkContext.getForwardToPipelines().size() > 0); bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS); @@ -300,6 +306,7 @@ private void doInitializeInternal() throws IOException { () -> openSearchClientRefresher.get()); bulkRetryStrategy = new BulkRetryStrategy(bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()), this::logFailureForBulkRequests, + this::successfulOperationsHandler, pluginMetrics, maxRetries, bulkRequestSupplier, @@ -524,7 +531,7 @@ public void doOutput(final Collection> records) { final String queryTermKey = openSearchSinkConfig.getIndexConfiguration().getQueryTerm(); final String termValue = queryTermKey != null ? event.get(queryTermKey, String.class) : null; - BulkOperationWrapper bulkOperationWrapper = getFailurePipeline() != null ? + BulkOperationWrapper bulkOperationWrapper = (useEventInBulkOperation) ? new BulkOperationWrapper(bulkOperation, event, serializedJsonNode, termValue) : new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode, termValue); @@ -592,6 +599,27 @@ private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { }); } + @VisibleForTesting + void successfulOperationsHandler(final List successfulOperations) { + if (successfulOperations.size() == 0) { + return; + } + if (sinkContext.getForwardToPipelines().size() == 0) { + for (final BulkOperationWrapper bulkOperation: successfulOperations) { + if (bulkOperation.getEvent() != null) { + bulkOperation.getEvent().getEventHandle().release(true); + } else { + bulkOperation.getEventHandle().release(true); + } + } + return; + } + for (final BulkOperationWrapper bulkOperation: successfulOperations) { + sinkForwardRecordsContext.addRecord(new Record<>(bulkOperation.getEvent())); + } + sinkContext.forwardRecords(sinkForwardRecordsContext, null, null); + } + private void logFailureForBulkRequests(final List failedBulkOperations, final Throwable failure) { final List dlqObjects = failedBulkOperations.stream() @@ -724,7 +752,7 @@ private DlqObject createDlqObjectFromEvent(final Event event, .withPipelineName(pipeline) .withPluginId(PLUGIN_NAME); - if (getFailurePipeline() != null) { + if (useEventInBulkOperation) { builder.withEvent(event); } else { builder.withEventHandle(event.getEventHandle()); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 3ffdf43e11..002486b738 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -129,6 +129,16 @@ public BulkRetryStrategy createObjectUnderTest( return new BulkRetryStrategy( requestFunction, logFailure, + (operations) -> { + for (BulkOperationWrapper operation: operations) { + if (operation.getEvent() != null) { + operation.getEvent().getEventHandle().release(true); + } + if (operation.getEventHandle() != null) { + operation.getEventHandle().release(true); + } + } + }, pluginMetrics, Integer.MAX_VALUE, bulkRequestSupplier, @@ -146,6 +156,16 @@ public BulkRetryStrategy createObjectUnderTest( return new BulkRetryStrategy( requestFunction, logFailure, + (operations) -> { + for (BulkOperationWrapper operation: operations) { + if (operation.getEvent() != null) { + operation.getEvent().getEventHandle().release(true); + } + if (operation.getEventHandle() != null) { + operation.getEventHandle().release(true); + } + } + }, pluginMetrics, maxRetries, bulkRequestSupplier, @@ -164,6 +184,16 @@ public BulkRetryStrategy createObjectUnderTest( return new BulkRetryStrategy( requestFunction, logFailure, + (operations) -> { + for (BulkOperationWrapper operation: operations) { + if (operation.getEvent() != null) { + operation.getEvent().getEventHandle().release(true); + } + if (operation.getEventHandle() != null) { + operation.getEventHandle().release(true); + } + } + }, pluginMetrics, maxRetries, bulkRequestSupplier, diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index bb37642659..74573c0d2c 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -24,6 +24,8 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -47,7 +49,9 @@ import java.io.IOException; import java.util.Collections; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -230,6 +234,69 @@ void test_initialization_with_failure_and_retry_with_query_manager() throws IOEx verify(pluginConfigObservable, times(2)).addPluginConfigObserver(any()); } + @Test + void test_sink_successful_records_handling_without_forwarding_pipelines_bulk_operations_with_event_handles() throws Exception { + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); + final EventHandle eventHandle = mock(EventHandle.class); + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + BulkOperationWrapper op1 = mock(BulkOperationWrapper.class); + BulkOperationWrapper op2 = mock(BulkOperationWrapper.class); + when(op1.getEvent()).thenReturn(null); + when(op2.getEvent()).thenReturn(null); + when(op1.getEventHandle()).thenReturn(eventHandle); + when(op2.getEventHandle()).thenReturn(eventHandle); + List operationsList = new ArrayList<>(); + operationsList.add(op1); + operationsList.add(op2); + objectUnderTest.successfulOperationsHandler(operationsList); + verify(eventHandle, times(2)).release(eq(true)); + + } + + @Test + void test_sink_successful_records_handling_with_forwarding_pipelines() throws Exception { + HeadlessPipeline forwardPipeline = mock(HeadlessPipeline.class); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("fwd_pipeline", forwardPipeline)); + final EventHandle eventHandle = mock(EventHandle.class); + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + BulkOperationWrapper op1 = mock(BulkOperationWrapper.class); + BulkOperationWrapper op2 = mock(BulkOperationWrapper.class); + Event event = mock(Event.class); + when(op1.getEvent()).thenReturn(event); + when(op2.getEvent()).thenReturn(event); + List operationsList = new ArrayList<>(); + operationsList.add(op1); + operationsList.add(op2); + objectUnderTest.successfulOperationsHandler(operationsList); + verify(eventHandle, times(0)).release(eq(true)); + verify(op1, times(1)).getEvent(); + verify(op2, times(1)).getEvent(); + verify(sinkContext, times(1)).forwardRecords(any(SinkForwardRecordsContext.class), eq(null), eq(null)); + + } + + + @Test + void test_sink_successful_records_handling_without_forwarding_pipelines_bulk_operations_with_events() throws Exception { + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); + final EventHandle eventHandle = mock(EventHandle.class); + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + BulkOperationWrapper op1 = mock(BulkOperationWrapper.class); + BulkOperationWrapper op2 = mock(BulkOperationWrapper.class); + Event event = mock(Event.class); + when(event.getEventHandle()).thenReturn(eventHandle); + when(op1.getEvent()).thenReturn(event); + when(op2.getEvent()).thenReturn(event); + List operationsList = new ArrayList<>(); + operationsList.add(op1); + operationsList.add(op2); + objectUnderTest.successfulOperationsHandler(operationsList); + verify(eventHandle, times(2)).release(eq(true)); + verify(op1, times(0)).getEventHandle(); + verify(op2, times(0)).getEventHandle(); + + } + @Test void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_creates_DLQObject() throws IOException { when(pluginSetting.getName()).thenReturn("opensearch");