diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 90e056c528..f54944e551 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -136,6 +136,7 @@ public final class BulkRetryStrategy { private final Counter documentsVersionConflictErrors; private final Counter documentsDuplicates; private final ExistingDocumentQueryManager existingDocumentQueryManager; + private final boolean isExternalVersioning; private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class); static class BulkOperationRequestResponse { @@ -167,8 +168,10 @@ public BulkRetryStrategy(final RequestFunction bulkRequestSupplier, final String pipelineName, final String pluginName, - final ExistingDocumentQueryManager existingDocumentQueryManager) { + final ExistingDocumentQueryManager existingDocumentQueryManager, + final boolean isExternalVersioning) { this.existingDocumentQueryManager = existingDocumentQueryManager; + this.isExternalVersioning = isExternalVersioning; this.requestFunction = requestFunction; this.logFailure = logFailure; this.successfulOperationsHandler = successfulOperationsHandler; @@ -323,9 +326,9 @@ private void handleFailures(final AccumulatingBulkRequest createBulkReq if (canRetryItem(bulkItemResponse, attemptNumber)) { requestToReissue.addOperation(bulkOperation); - } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { + } else if (isExternalVersioning && bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason()); - // This is not a successfully sent document, so do not add to "successfulOperations" - // and just release the eventHandle + // When using external versioning, version conflicts are expected and not true errors. + // Release the eventHandle without counting as a document error. bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() @@ -441,20 +444,20 @@ private void handleFailures(final AccumulatingBulkRequest, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final Supplier bulkRequestSupplier + ) { + return new BulkRetryStrategy( + requestFunction, + logFailure, + (operations) -> { + for (BulkOperationWrapper operation: operations) { + if (operation.getEvent() != null) { + operation.getEvent().getEventHandle().release(true); + } + if (operation.getEventHandle() != null) { + operation.getEventHandle().release(true); + } + } + }, + pluginMetrics, + Integer.MAX_VALUE, + bulkRequestSupplier, + PIPELINE_NAME, + PLUGIN_NAME, + null, + true); } @Test @@ -629,12 +659,6 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception { assertThat(maxRetriesLimitReached, equalTo(true)); assertEquals(numEventsSucceeded, 2); assertEquals(numEventsFailed, 2); - - final List documentVersionConflictMeasurement = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); - assertEquals(1, documentVersionConflictMeasurement.size()); - assertEquals(1.0, documentVersionConflictMeasurement.get(0).getValue(), 0); } @Test @@ -694,6 +718,128 @@ public void testExecuteNonRetryableResponse() throws Exception { assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0); } + @Test + public void testExecute_VersionConflictDoesNotIncrementDocumentErrors() throws Exception { + final String testIndex = "version-conflict-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTestWithExternalVersioning( + requestFunction, logFailureConsumer, bulkRequestSupplier); + + final IndexOperation indexOp1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOp2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOp3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + + final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1); + final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2); + final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(wrapper1); + accumulatingBulkRequest.addOperation(wrapper2); + accumulatingBulkRequest.addOperation(wrapper3); + + // Response: 1 success, 1 version conflict, 1 bad request + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse(); + final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex); + final List responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem); + + final BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.errors()).thenReturn(true); + when(bulkResponse.items()).thenReturn(responseItems); + when(requestFunction.apply(any())).thenReturn(bulkResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(accumulatingBulkRequest); + + // Version conflict should NOT be counted as a document error + final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertEquals(1, documentErrorsMeasurements.size()); + assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0); + + // Version conflict should be counted in its own metric + final List versionConflictMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); + assertEquals(1, versionConflictMeasurements.size()); + assertEquals(1.0, versionConflictMeasurements.get(0).getValue(), 0); + + // Success metric should count the 1 successful document + final List successMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertEquals(1, successMeasurements.size()); + assertEquals(1.0, successMeasurements.get(0).getValue(), 0); + + // Version conflict event handle should be released with true (success) + verify(eventHandle2).release(true); + } + + @Test + public void testExecute_VersionConflictIncrementsDocumentErrors_WhenNotExternalVersioning() throws Exception { + final String testIndex = "version-conflict-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + // Use default (non-external versioning) + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, bulkRequestSupplier); + + final IndexOperation indexOp1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOp2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOp3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + + final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1); + final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2); + final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(wrapper1); + accumulatingBulkRequest.addOperation(wrapper2); + accumulatingBulkRequest.addOperation(wrapper3); + + // Response: 1 success, 1 version conflict, 1 bad request + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse(); + final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex); + final List responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem); + + final BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.errors()).thenReturn(true); + when(bulkResponse.items()).thenReturn(responseItems); + when(requestFunction.apply(any())).thenReturn(bulkResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(accumulatingBulkRequest); + + // Without external versioning, version conflict SHOULD be counted as a document error + final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertEquals(1, documentErrorsMeasurements.size()); + assertEquals(2.0, documentErrorsMeasurements.get(0).getValue(), 0); + + // Version conflict metric should NOT be incremented without external versioning + final List versionConflictMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); + assertEquals(1, versionConflictMeasurements.size()); + assertEquals(0.0, versionConflictMeasurements.get(0).getValue(), 0); + + // Success metric should count the 1 successful document + final List successMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertEquals(1, successMeasurements.size()); + assertEquals(1.0, successMeasurements.get(0).getValue(), 0); + } + @Test public void testExecute_DeleteNotFound_RetriesAndSucceeds() throws Exception { final String testIndex = "delete-index"; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 07b656b21d..627ba92d58 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -21,6 +21,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; @@ -51,6 +52,7 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Collections; import java.util.ArrayList; import java.util.List; @@ -606,4 +608,59 @@ void dataStreamIndex_ensureTimestamp_setsTimestampWhenMissing() throws Exception // Verify the dataStreamIndex was set correctly assertThat(objectUnderTest, notNullValue()); } + + @ParameterizedTest + @MethodSource("externalVersionTypeProvider") + void initialize_sets_isExternalVersioning_true_on_BulkRetryStrategy_for_external_version_types( + final VersionType versionType) throws Exception { + when(indexConfiguration.getVersionType()).thenReturn(versionType); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + final BulkRetryStrategy bulkRetryStrategy = getField(objectUnderTest, "bulkRetryStrategy"); + final boolean isExternalVersioning = getField(bulkRetryStrategy, "isExternalVersioning"); + assertThat(isExternalVersioning, equalTo(true)); + } + + @ParameterizedTest + @MethodSource("nonExternalVersionTypeProvider") + void initialize_sets_isExternalVersioning_false_on_BulkRetryStrategy_for_non_external_version_types( + final VersionType versionType) throws Exception { + lenient().when(indexConfiguration.getVersionType()).thenReturn(versionType); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + final BulkRetryStrategy bulkRetryStrategy = getField(objectUnderTest, "bulkRetryStrategy"); + final boolean isExternalVersioning = getField(bulkRetryStrategy, "isExternalVersioning"); + assertThat(isExternalVersioning, equalTo(false)); + } + + private static Stream externalVersionTypeProvider() { + return Stream.of( + Arguments.of(VersionType.External), + Arguments.of(VersionType.ExternalGte) + ); + } + + private static Stream nonExternalVersionTypeProvider() { + return Stream.of( + Arguments.of(VersionType.Internal), + Arguments.of((VersionType) null) + ); + } + + @SuppressWarnings("unchecked") + private static T getField(final Object target, final String fieldName) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(target); + } }