From 6d81b896955574da2970dc15f572933bf9823714 Mon Sep 17 00:00:00 2001 From: Keyur-S-Patel Date: Tue, 14 Apr 2026 21:02:44 +0000 Subject: [PATCH] Fix invalid document version events still included in bulk requests (#6601) Events with invalid document_version values are correctly sent to the DLQ but were still being added to the OpenSearch bulk request. Added continue statements after the NumberFormatException and RuntimeException catch blocks in doOutput() to skip the rest of the loop iteration for failed events. Made version conflict handling in BulkRetryStrategy conditional on external versioning. Version conflicts are only silently handled (skipping documentErrors) when document_version_type is set to external or external_gte. For internal versioning or when unset, version conflicts are treated as document errors. BulkRetryStrategy now accepts a boolean isExternalVersioning parameter instead of importing VersionType directly, keeping the OpenSearch client type out of its API. The VersionType check is done in OpenSearchSink via the isExternalVersionType helper method. Signed-off-by: Keyur-S-Patel --- .../sink/opensearch/BulkRetryStrategy.java | 25 +-- .../sink/opensearch/OpenSearchSink.java | 7 +- .../opensearch/BulkRetryStrategyTests.java | 164 +++++++++++++++++- .../sink/opensearch/OpenSearchSinkTest.java | 57 ++++++ 4 files changed, 232 insertions(+), 21 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 90e056c528..f54944e551 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -136,6 +136,7 @@ public final class BulkRetryStrategy { private final Counter documentsVersionConflictErrors; private final Counter documentsDuplicates; private final ExistingDocumentQueryManager existingDocumentQueryManager; + private final boolean isExternalVersioning; private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class); static class BulkOperationRequestResponse { @@ -167,8 +168,10 @@ public BulkRetryStrategy(final RequestFunction bulkRequestSupplier, final String pipelineName, final String pluginName, - final ExistingDocumentQueryManager existingDocumentQueryManager) { + final ExistingDocumentQueryManager existingDocumentQueryManager, + final boolean isExternalVersioning) { this.existingDocumentQueryManager = existingDocumentQueryManager; + this.isExternalVersioning = isExternalVersioning; this.requestFunction = requestFunction; this.logFailure = logFailure; this.successfulOperationsHandler = successfulOperationsHandler; @@ -323,9 +326,9 @@ private void handleFailures(final AccumulatingBulkRequest createBulkReq if (canRetryItem(bulkItemResponse, attemptNumber)) { requestToReissue.addOperation(bulkOperation); - } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { + } else if (isExternalVersioning && bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason()); - // This is not a successfully sent document, so do not add to "successfulOperations" - // and just release the eventHandle + // When using external versioning, version conflicts are expected and not true errors. + // Release the eventHandle without counting as a document error. bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() @@ -441,20 +444,20 @@ private void handleFailures(final AccumulatingBulkRequest, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final Supplier bulkRequestSupplier + ) { + return new BulkRetryStrategy( + requestFunction, + logFailure, + (operations) -> { + for (BulkOperationWrapper operation: operations) { + if (operation.getEvent() != null) { + operation.getEvent().getEventHandle().release(true); + } + if (operation.getEventHandle() != null) { + operation.getEventHandle().release(true); + } + } + }, + pluginMetrics, + Integer.MAX_VALUE, + bulkRequestSupplier, + PIPELINE_NAME, + PLUGIN_NAME, + null, + true); } @Test @@ -629,12 +659,6 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception { assertThat(maxRetriesLimitReached, equalTo(true)); assertEquals(numEventsSucceeded, 2); assertEquals(numEventsFailed, 2); - - final List documentVersionConflictMeasurement = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); - assertEquals(1, documentVersionConflictMeasurement.size()); - assertEquals(1.0, documentVersionConflictMeasurement.get(0).getValue(), 0); } @Test @@ -694,6 +718,128 @@ public void testExecuteNonRetryableResponse() throws Exception { assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0); } + @Test + public void testExecute_VersionConflictDoesNotIncrementDocumentErrors() throws Exception { + final String testIndex = "version-conflict-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTestWithExternalVersioning( + requestFunction, logFailureConsumer, bulkRequestSupplier); + + final IndexOperation indexOp1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOp2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOp3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + + final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1); + final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2); + final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(wrapper1); + accumulatingBulkRequest.addOperation(wrapper2); + accumulatingBulkRequest.addOperation(wrapper3); + + // Response: 1 success, 1 version conflict, 1 bad request + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse(); + final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex); + final List responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem); + + final BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.errors()).thenReturn(true); + when(bulkResponse.items()).thenReturn(responseItems); + when(requestFunction.apply(any())).thenReturn(bulkResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(accumulatingBulkRequest); + + // Version conflict should NOT be counted as a document error + final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertEquals(1, documentErrorsMeasurements.size()); + assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0); + + // Version conflict should be counted in its own metric + final List versionConflictMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); + assertEquals(1, versionConflictMeasurements.size()); + assertEquals(1.0, versionConflictMeasurements.get(0).getValue(), 0); + + // Success metric should count the 1 successful document + final List successMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertEquals(1, successMeasurements.size()); + assertEquals(1.0, successMeasurements.get(0).getValue(), 0); + + // Version conflict event handle should be released with true (success) + verify(eventHandle2).release(true); + } + + @Test + public void testExecute_VersionConflictIncrementsDocumentErrors_WhenNotExternalVersioning() throws Exception { + final String testIndex = "version-conflict-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + // Use default (non-external versioning) + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, bulkRequestSupplier); + + final IndexOperation indexOp1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOp2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOp3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + + final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1); + final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2); + final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(wrapper1); + accumulatingBulkRequest.addOperation(wrapper2); + accumulatingBulkRequest.addOperation(wrapper3); + + // Response: 1 success, 1 version conflict, 1 bad request + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse(); + final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex); + final List responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem); + + final BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.errors()).thenReturn(true); + when(bulkResponse.items()).thenReturn(responseItems); + when(requestFunction.apply(any())).thenReturn(bulkResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(accumulatingBulkRequest); + + // Without external versioning, version conflict SHOULD be counted as a document error + final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertEquals(1, documentErrorsMeasurements.size()); + assertEquals(2.0, documentErrorsMeasurements.get(0).getValue(), 0); + + // Version conflict metric should NOT be incremented without external versioning + final List versionConflictMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); + assertEquals(1, versionConflictMeasurements.size()); + assertEquals(0.0, versionConflictMeasurements.get(0).getValue(), 0); + + // Success metric should count the 1 successful document + final List successMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertEquals(1, successMeasurements.size()); + assertEquals(1.0, successMeasurements.get(0).getValue(), 0); + } + @Test public void testExecute_DeleteNotFound_RetriesAndSucceeds() throws Exception { final String testIndex = "delete-index"; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 07b656b21d..627ba92d58 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -21,6 +21,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; @@ -51,6 +52,7 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Collections; import java.util.ArrayList; import java.util.List; @@ -606,4 +608,59 @@ void dataStreamIndex_ensureTimestamp_setsTimestampWhenMissing() throws Exception // Verify the dataStreamIndex was set correctly assertThat(objectUnderTest, notNullValue()); } + + @ParameterizedTest + @MethodSource("externalVersionTypeProvider") + void initialize_sets_isExternalVersioning_true_on_BulkRetryStrategy_for_external_version_types( + final VersionType versionType) throws Exception { + when(indexConfiguration.getVersionType()).thenReturn(versionType); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + final BulkRetryStrategy bulkRetryStrategy = getField(objectUnderTest, "bulkRetryStrategy"); + final boolean isExternalVersioning = getField(bulkRetryStrategy, "isExternalVersioning"); + assertThat(isExternalVersioning, equalTo(true)); + } + + @ParameterizedTest + @MethodSource("nonExternalVersionTypeProvider") + void initialize_sets_isExternalVersioning_false_on_BulkRetryStrategy_for_non_external_version_types( + final VersionType versionType) throws Exception { + lenient().when(indexConfiguration.getVersionType()).thenReturn(versionType); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + final BulkRetryStrategy bulkRetryStrategy = getField(objectUnderTest, "bulkRetryStrategy"); + final boolean isExternalVersioning = getField(bulkRetryStrategy, "isExternalVersioning"); + assertThat(isExternalVersioning, equalTo(false)); + } + + private static Stream externalVersionTypeProvider() { + return Stream.of( + Arguments.of(VersionType.External), + Arguments.of(VersionType.ExternalGte) + ); + } + + private static Stream nonExternalVersionTypeProvider() { + return Stream.of( + Arguments.of(VersionType.Internal), + Arguments.of((VersionType) null) + ); + } + + @SuppressWarnings("unchecked") + private static T getField(final Object target, final String fieldName) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(target); + } }