From 6ebbd86053e2ad9fa8aee9938880a874a61bcf02 Mon Sep 17 00:00:00 2001 From: Pallempati Saketh Date: Mon, 21 Apr 2025 15:45:10 +0530 Subject: [PATCH 1/2] feat(sink/opensearch): Retry DELETE operations on 404 status Makes the BulkRetryStrategy retry DELETE operations that initially fail with a 404 (Not Found) status. This handles race conditions where a delete might be processed before its corresponding create in the same bulk request due to eventual consistency. - Modified canRetry and createBulkRequestForRetry to identify and allow retries specifically for DELETE + 404. - Added unit tests verifying the new retry behavior for DELETE + 404 and ensuring other 404s are retried. Signed-off-by: Pallempati Saketh --- .../sink/opensearch/BulkRetryStrategy.java | 27 +- .../opensearch/BulkRetryStrategyTests.java | 242 +++++++++++++++++- 2 files changed, 251 insertions(+), 18 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 594cadcffd..85cad0c6af 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -15,6 +15,7 @@ import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.BulkResponse; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.core.bulk.OperationType; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation; @@ -237,8 +238,14 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte public boolean canRetry(final BulkResponse response) { for (final BulkResponseItem bulkItemResponse : response.items()) { - if (isItemInError(bulkItemResponse) && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) { - return true; + if (isItemInError(bulkItemResponse)) { + boolean isGenerallyRetryable = !NON_RETRY_STATUS.contains(bulkItemResponse.status()); + boolean isDeleteNotFound = bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() && + bulkItemResponse.operationType() == OperationType.Delete; + + if (isGenerallyRetryable || isDeleteNotFound) { + return true; + } } } return false; @@ -339,10 +346,10 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r private AccumulatingBulkRequest createBulkRequestForRetry( final AccumulatingBulkRequest request, final BulkResponse response, final Exception previousException) { if (shouldSendAllForQuerying(previousException)) { - for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) { - existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper); - } - return bulkRequestSupplier.get(); + for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) { + existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper); + } + return bulkRequestSupplier.get(); } if (response == null) { @@ -354,7 +361,7 @@ private AccumulatingBulkRequest createBulkReq int index = 0; for (final BulkResponseItem bulkItemResponse : response.items()) { BulkOperationWrapper bulkOperation = - (BulkOperationWrapper)request.getOperationAt(index); + (BulkOperationWrapper)request.getOperationAt(index); if (isItemInError(bulkItemResponse)) { if (existingDocumentQueryManager != null && POTENTIAL_DUPLICATES_ERRORS.contains(bulkItemResponse.status())) { existingDocumentQueryManager.addBulkOperation(bulkOperation); @@ -362,7 +369,11 @@ private AccumulatingBulkRequest createBulkReq continue; } - if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) { + boolean isGenerallyRetryable = !NON_RETRY_STATUS.contains(bulkItemResponse.status()); + boolean isDeleteNotFound = bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() && + bulkItemResponse.operationType() == OperationType.Delete; + + if (isGenerallyRetryable || isDeleteNotFound) { requestToReissue.addOperation(bulkOperation); } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index f50822fab2..1b76927d7b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -20,6 +20,8 @@ import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.OperationType; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -105,7 +107,7 @@ public void setUp() { pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); final List eventHandles = Arrays.asList( - eventHandle1, eventHandle2, eventHandle3, eventHandle4); + eventHandle1, eventHandle2, eventHandle3, eventHandle4); for (final EventHandle eventHandle: eventHandles) { lenient().doAnswer(a -> { boolean result = a.getArgument(0); @@ -120,10 +122,10 @@ public void setUp() { } public BulkRetryStrategy createObjectUnderTest( - final RequestFunction, BulkResponse> requestFunction, - final BiConsumer, Throwable> logFailure, - final Supplier bulkRequestSupplier -) { + final RequestFunction, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final Supplier bulkRequestSupplier + ) { return new BulkRetryStrategy( requestFunction, logFailure, @@ -136,11 +138,11 @@ public BulkRetryStrategy createObjectUnderTest( } public BulkRetryStrategy createObjectUnderTest( - final RequestFunction, BulkResponse> requestFunction, - final BiConsumer, Throwable> logFailure, - final int maxRetries, - final Supplier bulkRequestSupplier -) { + final RequestFunction, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final int maxRetries, + final Supplier bulkRequestSupplier + ) { return new BulkRetryStrategy( requestFunction, logFailure, @@ -195,6 +197,34 @@ public void testCanRetry() { assertTrue(bulkRetryStrategy.canRetry(bulkResponse)); } + @Test + public void testCanRetry_DeleteNotFound_ShouldReturnTrue() { + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + bulkRequest -> mock(BulkResponse.class), + (docWriteRequest, throwable) -> {}, + () -> mock(AccumulatingBulkRequest.class)); + final String testIndex = "foo"; + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem deleteNotFoundItem = deleteNotFoundItemResponse(testIndex); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(successItem, deleteNotFoundItem)); + assertTrue("canRetry should return true for a mix including DELETE + 404", bulkRetryStrategy.canRetry(bulkResponse)); + } + + @Test + public void testCanRetry_NonDeleteNotFound_ShouldReturnFalse() { + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + bulkRequest -> mock(BulkResponse.class), + (docWriteRequest, throwable) -> {}, + () -> mock(AccumulatingBulkRequest.class)); + final String testIndex = "foo"; + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem indexNotFoundItem = indexNotFoundItemResponse(testIndex); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(successItem, indexNotFoundItem)); + assertFalse("canRetry should return false for 404 on non-DELETE operations", bulkRetryStrategy.canRetry(bulkResponse)); + } + @Test public void testExecuteSuccessOnFirstAttempt() throws Exception { final String testIndex = "bar"; @@ -511,6 +541,184 @@ public void testExecuteNonRetryableResponse() throws Exception { assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0); } + @Test + public void testExecute_DeleteNotFound_RetriesAndSucceeds() throws Exception { + final String testIndex = "delete-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, 2, bulkRequestSupplier); + + final IndexOperation indexOp = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final DeleteOperation deleteOp = new DeleteOperation.Builder().index(testIndex).id("2").build(); + final IndexOperation badIndexOp = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + + final BulkOperationWrapper indexWrapper = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp).build(), eventHandle1); + final BulkOperationWrapper deleteWrapper = new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOp).build(), eventHandle2); + final BulkOperationWrapper badIndexWrapper = new BulkOperationWrapper(new BulkOperation.Builder().index(badIndexOp).build(), eventHandle3); + + final AccumulatingBulkRequest initialRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + initialRequest.addOperation(indexWrapper); + initialRequest.addOperation(deleteWrapper); + initialRequest.addOperation(badIndexWrapper); + + final BulkResponseItem indexSuccessItem = successItemResponse(testIndex); + final BulkResponseItem deleteNotFoundItem = deleteNotFoundItemResponse(testIndex); + final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex); + final BulkResponse firstResponse = mock(BulkResponse.class); + when(firstResponse.errors()).thenReturn(true); + when(firstResponse.items()).thenReturn(Arrays.asList(indexSuccessItem, deleteNotFoundItem, badRequestItem)); + + final BulkResponseItem deleteSuccessItem = successItemResponse(testIndex); + final BulkResponse secondResponse = mock(BulkResponse.class); + when(secondResponse.errors()).thenReturn(false); + when(secondResponse.items()).thenReturn(List.of(deleteSuccessItem)); + + final ArgumentCaptor> requestCaptor = ArgumentCaptor.forClass(AccumulatingBulkRequest.class); + when(requestFunction.apply(requestCaptor.capture())) + .thenReturn(firstResponse) + .thenReturn(secondResponse); + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(initialRequest); + + verify(requestFunction, times(2)).apply(any()); + + final List> capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests.size(), equalTo(2)); + final AccumulatingBulkRequest retryRequest = capturedRequests.get(1); + assertThat(retryRequest.getOperationsCount(), equalTo(1)); + assertThat(retryRequest.getOperationAt(0), equalTo(deleteWrapper)); + + final ArgumentCaptor> failedOpsCaptor = ArgumentCaptor.forClass(List.class); + verify(logFailureConsumer, times(1)).accept(failedOpsCaptor.capture(), eq(null)); + assertThat(failedOpsCaptor.getValue().size(), equalTo(1)); + assertThat(failedOpsCaptor.getValue().get(0).getBulkOperation(), equalTo(badIndexWrapper)); + + verify(eventHandle1).release(true); + verify(eventHandle2).release(true); + + assertThat(numEventsSucceeded, equalTo(2)); + assertThat(numEventsFailed, equalTo(1)); + + final List retryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.BULK_REQUEST_NUMBER_OF_RETRIES).toString()); + assertThat(retryMeasurements.size(), equalTo(1)); + assertThat(retryMeasurements.get(0).getValue(), equalTo(1.0)); + + final List successMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertThat(successMeasurements.size(), equalTo(1)); + assertThat(successMeasurements.get(0).getValue(), equalTo(2.0)); + + final List errorMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertThat(errorMeasurements.size(), equalTo(1)); + assertThat(errorMeasurements.get(0).getValue(), equalTo(1.0)); + } + + @Test + public void testExecute_DeleteNotFound_RetriesAndFailsPermanently() throws Exception { + final String testIndex = "delete-fail-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + final int MAX_RETRIES = 2; + + logFailureConsumer = this::logFailureMaxRetries; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, MAX_RETRIES, bulkRequestSupplier); + + final DeleteOperation deleteOp = new DeleteOperation.Builder().index(testIndex).id("1").build(); + final BulkOperationWrapper deleteWrapper = new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOp).build(), eventHandle1); + + final AccumulatingBulkRequest initialRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + initialRequest.addOperation(deleteWrapper); + + final BulkResponseItem deleteNotFoundItem = deleteNotFoundItemResponse(testIndex); + final BulkResponse persistentFailureResponse = mock(BulkResponse.class); + when(persistentFailureResponse.errors()).thenReturn(true); + when(persistentFailureResponse.items()).thenReturn(List.of(deleteNotFoundItem)); + + when(requestFunction.apply(any(AccumulatingBulkRequest.class))).thenReturn(persistentFailureResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + maxRetriesLimitReached = false; + bulkRetryStrategy.execute(initialRequest); + + verify(requestFunction, times(MAX_RETRIES)).apply(any()); + assertThat("Max retries limit should have been reached", maxRetriesLimitReached, equalTo(true)); + + assertThat(numEventsSucceeded, equalTo(0)); + assertThat(numEventsFailed, equalTo(1)); + + final List retryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.BULK_REQUEST_NUMBER_OF_RETRIES).toString()); + assertThat(retryMeasurements.size(), equalTo(1)); + assertThat(retryMeasurements.get(0).getValue(), equalTo((double)MAX_RETRIES)); + + final List errorMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertThat(errorMeasurements.size(), equalTo(1)); + assertThat(errorMeasurements.get(0).getValue(), equalTo(1.0)); + } + + @Test + public void testExecute_NonDeleteNotFound_DoesNotRetry() throws Exception { + final String testIndex = "non-delete-404-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, 2, bulkRequestSupplier); + + final IndexOperation indexOp = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final BulkOperationWrapper indexWrapper = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp).build(), eventHandle1); + + final AccumulatingBulkRequest initialRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + initialRequest.addOperation(indexWrapper); + + final BulkResponseItem indexNotFoundItem = indexNotFoundItemResponse(testIndex); + final BulkResponse firstResponse = mock(BulkResponse.class); + when(firstResponse.errors()).thenReturn(true); + when(firstResponse.items()).thenReturn(List.of(indexNotFoundItem)); + + when(requestFunction.apply(any(AccumulatingBulkRequest.class))).thenReturn(firstResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(initialRequest); + + verify(requestFunction, times(1)).apply(any()); + + final ArgumentCaptor> failedOpsCaptor = ArgumentCaptor.forClass(List.class); + verify(logFailureConsumer, times(1)).accept(failedOpsCaptor.capture(), eq(null)); + assertThat(failedOpsCaptor.getValue().size(), equalTo(1)); + assertThat(failedOpsCaptor.getValue().get(0).getBulkOperation(), equalTo(indexWrapper)); + + assertThat(numEventsSucceeded, equalTo(0)); + assertThat(numEventsFailed, equalTo(1)); + + final List retryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.BULK_REQUEST_NUMBER_OF_RETRIES).toString()); + if (!retryMeasurements.isEmpty()) { + assertThat(retryMeasurements.get(0).getValue(), equalTo(0.0)); + } + + final List errorMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertThat(errorMeasurements.size(), equalTo(1)); + assertThat(errorMeasurements.get(0).getValue(), equalTo(1.0)); + } + @Test void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status_when_error_is_provided() throws Exception { final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); @@ -867,6 +1075,20 @@ private static BulkResponseItem customBulkFailureResponse(final RestStatus restS return badResponse; } + private static BulkResponseItem deleteNotFoundItemResponse(final String index) { + final BulkResponseItem response = mock(BulkResponseItem.class); + lenient().when(response.status()).thenReturn(RestStatus.NOT_FOUND.getStatus()); + lenient().when(response.operationType()).thenReturn(OperationType.Delete); + return response; + } + + private static BulkResponseItem indexNotFoundItemResponse(final String index) { + final BulkResponseItem response = mock(BulkResponseItem.class); + lenient().when(response.status()).thenReturn(RestStatus.NOT_FOUND.getStatus()); + lenient().when(response.operationType()).thenReturn(OperationType.Index); + return response; + } + private SerializedJson arbitraryDocument() { return SerializedJson.fromStringAndOptionals("{}", null, null, null); } From 40d2800b4fd73538bd066c1302ad0b068b658143 Mon Sep 17 00:00:00 2001 From: Pallempati Saketh Date: Wed, 11 Jun 2025 16:53:59 +0530 Subject: [PATCH 2/2] Implement DELETE+404 retry limit and comprehensive refactoring - Add DELETE_404_MAX_RETRIES constant (3 attempts max) to prevent infinite retry loops - Refactor canRetryItem into focused methods: isDeleteOperationWithNotFoundError, canRetryDeleteNotFoundOperation, isGenerallyRetryableOperation - Add comprehensive test coverage for DELETE+404 edge cases and mixed operations This prevents infinite retries when DELETE operations encounter 404 errors due to race conditions in bulk operations, while maintaining backward compatibility and not affecting other operation types. Signed-off-by: Pallempati Saketh --- .../sink/opensearch/BulkRetryStrategy.java | 67 +++-- .../opensearch/BulkRetryStrategyTests.java | 247 ++++++++++++++++++ 2 files changed, 291 insertions(+), 23 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 85cad0c6af..7ad40b0019 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -53,6 +53,7 @@ public final class BulkRetryStrategy { static final long INITIAL_DELAY_MS = 50; static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis(); static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception"; + private static final int DELETE_404_MAX_RETRIES = 3; private static final Set NON_RETRY_STATUS = new HashSet<>( Arrays.asList( @@ -238,14 +239,8 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte public boolean canRetry(final BulkResponse response) { for (final BulkResponseItem bulkItemResponse : response.items()) { - if (isItemInError(bulkItemResponse)) { - boolean isGenerallyRetryable = !NON_RETRY_STATUS.contains(bulkItemResponse.status()); - boolean isDeleteNotFound = bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() && - bulkItemResponse.operationType() == OperationType.Delete; - - if (isGenerallyRetryable || isDeleteNotFound) { - return true; - } + if (isItemInError(bulkItemResponse) && canRetryItem(bulkItemResponse)) { + return true; } } return false; @@ -257,12 +252,42 @@ public static boolean canRetry(final Exception e) { !NON_RETRY_STATUS.contains(((OpenSearchException) e).status()))); } + private boolean canRetryItem(final BulkResponseItem bulkItemResponse) { + return canRetryItem(bulkItemResponse, 1); + } + + private boolean canRetryItem(final BulkResponseItem bulkItemResponse, final int attemptNumber) { + if (isDeleteOperationWithNotFoundError(bulkItemResponse)) { + return canRetryDeleteNotFoundOperation(bulkItemResponse, attemptNumber); + } + + return isGenerallyRetryableOperation(bulkItemResponse); + } + + private boolean isDeleteOperationWithNotFoundError(final BulkResponseItem bulkItemResponse) { + return bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() && + bulkItemResponse.operationType() == OperationType.Delete; + } + + private boolean canRetryDeleteNotFoundOperation(final BulkResponseItem bulkItemResponse, final int attemptNumber) { + if (attemptNumber > DELETE_404_MAX_RETRIES) { + LOG.info("DELETE operation for index '{}' reached maximum retry limit ({}) for 404 errors, sending to DLQ", + bulkItemResponse.index(), DELETE_404_MAX_RETRIES); + return false; + } + return true; + } + + private boolean isGenerallyRetryableOperation(final BulkResponseItem bulkItemResponse) { + return !NON_RETRY_STATUS.contains(bulkItemResponse.status()); + } + private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry, - final int retryCount, + final int attemptNumber, final BulkResponse bulkResponse, final Exception exceptionFromRequest) { final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest); - if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt + if (!Objects.isNull(bulkResponse) && attemptNumber == 1) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { if (!isItemInError(bulkItemResponse)) { sentDocumentsOnFirstAttemptCounter.increment(); @@ -270,8 +295,8 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating } } if (doRetry) { - if (retryCount % 5 == 0) { - LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, exceptionFromRequest); + if (attemptNumber % 5 == 1) { + LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", attemptNumber - 1, exceptionFromRequest); if (exceptionFromRequest == null) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { if(isItemInError(bulkItemResponse)) { @@ -311,9 +336,9 @@ private void handleFailures(final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response, previousException); + final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response, previousException, attemptNumber); if (bulkRequestForRetry.getOperationsCount() == 0) { return null; } @@ -323,13 +348,13 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r bulkResponse = requestFunction.apply(bulkRequestForRetry); } catch (Exception e) { incrementErrorCounters(e); - return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e); + return handleRetriesAndFailures(bulkRequestForRetry, attemptNumber, null, e); } if (bulkResponse.errors()) { - return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null); + return handleRetriesAndFailures(bulkRequestForRetry, attemptNumber, bulkResponse, null); } else { final int numberOfDocs = bulkRequestForRetry.getOperationsCount(); - final boolean firstAttempt = (retryCount == 1); + final boolean firstAttempt = (attemptNumber == 1); if (firstAttempt) { sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs); } @@ -344,7 +369,7 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r } private AccumulatingBulkRequest createBulkRequestForRetry( - final AccumulatingBulkRequest request, final BulkResponse response, final Exception previousException) { + final AccumulatingBulkRequest request, final BulkResponse response, final Exception previousException, final int attemptNumber) { if (shouldSendAllForQuerying(previousException)) { for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) { existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper); @@ -369,11 +394,7 @@ private AccumulatingBulkRequest createBulkReq continue; } - boolean isGenerallyRetryable = !NON_RETRY_STATUS.contains(bulkItemResponse.status()); - boolean isDeleteNotFound = bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() && - bulkItemResponse.operationType() == OperationType.Delete; - - if (isGenerallyRetryable || isDeleteNotFound) { + if (canRetryItem(bulkItemResponse, attemptNumber)) { requestToReissue.addOperation(bulkOperation); } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 1b76927d7b..3ffdf43e11 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -225,6 +225,129 @@ public void testCanRetry_NonDeleteNotFound_ShouldReturnFalse() { assertFalse("canRetry should return false for 404 on non-DELETE operations", bulkRetryStrategy.canRetry(bulkResponse)); } + @Test + public void testDeleteNotFound_ReachesMaxRetryLimit() throws Exception { + final String testIndex = "delete-404-retry-limit"; + final FakeClient client = new FakeClient(testIndex); + client.deleteNotFoundMaxRetries = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle1)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("DELETE+404 should retry exactly 3 times before giving up", 3, client.attempt); + assertEquals("No events should succeed", 0, numEventsSucceeded); + assertEquals("DELETE+404 operation should be sent to DLQ after max retries", 1, numEventsFailed); + } + + + @Test + public void testDeleteNotFound_MixedWithSuccessfulOperations() throws Exception { + final String testIndex = "mixed-delete-404-test"; + final FakeClient client = new FakeClient(testIndex); + client.deleteNotFoundMixed = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle2)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle3)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("Both INDEX operations should succeed", 2, numEventsSucceeded); + assertEquals("DELETE+404 operation should be sent to DLQ after max retries", 1, numEventsFailed); + } + + @Test + public void testDeleteNotFound_DoesNotAffectOtherNotFoundOperations() throws Exception { + final String testIndex = "other-404-test"; + final FakeClient client = new FakeClient(testIndex); + client.indexNotFoundTest = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final IndexOperation indexOperation = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation).build(), eventHandle1)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("INDEX+404 should not be retried multiple times", 1, client.attempt); + assertEquals("No events should succeed", 0, numEventsSucceeded); + assertEquals("INDEX+404 operation should be sent to DLQ immediately", 1, numEventsFailed); + } + + @Test + public void testDeleteNotFound_RetryCountDoesNotAffectOtherOperations() throws Exception { + final String testIndex = "retry-isolation-test"; + final FakeClient client = new FakeClient(testIndex); + client.retryCountIsolationTest = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final IndexOperation indexOperation = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle1)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation).build(), eventHandle2)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("At least 2 events should be processed", 2, numEventsSucceeded + numEventsFailed); + } + + @Test + public void testDeleteNotFound_RetriesWithOtherErrors() throws Exception { + final String testIndex = "delete-404-with-others-test"; + final FakeClient client = new FakeClient(testIndex); + client.deleteNotFoundWithOthers = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final IndexOperation indexOperation = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle1)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation).build(), eventHandle2)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("INDEX operation should eventually succeed", 1, numEventsSucceeded); + assertEquals("DELETE+404 operation should be sent to DLQ after max retries", 1, numEventsFailed); + } + @Test public void testExecuteSuccessOnFirstAttempt() throws Exception { final String testIndex = "bar"; @@ -1100,6 +1223,11 @@ private static class FakeClient { boolean nonRetryableException = true; boolean maxRetriesWithSuccesses = false; boolean maxRetriesWithException = false; + boolean deleteNotFoundMaxRetries = false; + boolean deleteNotFoundMixed = false; + boolean indexNotFoundTest = false; + boolean retryCountIsolationTest = false; + boolean deleteNotFoundWithOthers = false; int maxRetriesTestValue = 0; int attempt = 0; String index; @@ -1132,6 +1260,26 @@ public BulkResponse bulk(final AccumulatingBulkRequest bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } + + private BulkResponse bulkDeleteNotFoundMixedResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + + if (attempt == 1) { + assert requestSize == 3; + final List bulkItemResponses = Arrays.asList( + successItemResponse(index), + deleteNotFoundItemResponse(index), + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (attempt <= 3) { + assert requestSize == 1; + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else { + assert requestSize == 0; + return new BulkResponse.Builder().items(Arrays.asList()).errors(false).took(10).build(); + } + } + + private BulkResponse bulkIndexNotFoundResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + assert requestSize == 1; + final List bulkItemResponses = Arrays.asList( + indexNotFoundItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } + + private BulkResponse bulkRetryCountIsolationResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + + if (attempt == 1) { + assert requestSize == 2; + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (attempt <= 3) { + if (requestSize == 2) { + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (requestSize == 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } else { + if (requestSize >= 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } + + return new BulkResponse.Builder().items(Arrays.asList()).errors(false).took(10).build(); + } + + private BulkResponse bulkDeleteNotFoundWithOthersResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + + if (attempt == 1) { + assert requestSize == 2; + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (attempt <= 3) { + if (requestSize == 2) { + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (requestSize == 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } else { + if (requestSize >= 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } + + return new BulkResponse.Builder().items(Arrays.asList()).errors(false).took(10).build(); + } } }