diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 594cadcffd..7ad40b0019 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -15,6 +15,7 @@ import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.BulkResponse; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.core.bulk.OperationType; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation; @@ -52,6 +53,7 @@ public final class BulkRetryStrategy { static final long INITIAL_DELAY_MS = 50; static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis(); static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception"; + private static final int DELETE_404_MAX_RETRIES = 3; private static final Set NON_RETRY_STATUS = new HashSet<>( Arrays.asList( @@ -237,7 +239,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte public boolean canRetry(final BulkResponse response) { for (final BulkResponseItem bulkItemResponse : response.items()) { - if (isItemInError(bulkItemResponse) && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) { + if (isItemInError(bulkItemResponse) && canRetryItem(bulkItemResponse)) { return true; } } @@ -250,12 +252,42 @@ public static boolean canRetry(final Exception e) { !NON_RETRY_STATUS.contains(((OpenSearchException) e).status()))); } + private boolean canRetryItem(final BulkResponseItem bulkItemResponse) { + return canRetryItem(bulkItemResponse, 1); + } + + private boolean canRetryItem(final BulkResponseItem bulkItemResponse, final int attemptNumber) { + if (isDeleteOperationWithNotFoundError(bulkItemResponse)) { + return canRetryDeleteNotFoundOperation(bulkItemResponse, attemptNumber); + } + + return isGenerallyRetryableOperation(bulkItemResponse); + } + + private boolean isDeleteOperationWithNotFoundError(final BulkResponseItem bulkItemResponse) { + return bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() && + bulkItemResponse.operationType() == OperationType.Delete; + } + + private boolean canRetryDeleteNotFoundOperation(final BulkResponseItem bulkItemResponse, final int attemptNumber) { + if (attemptNumber > DELETE_404_MAX_RETRIES) { + LOG.info("DELETE operation for index '{}' reached maximum retry limit ({}) for 404 errors, sending to DLQ", + bulkItemResponse.index(), DELETE_404_MAX_RETRIES); + return false; + } + return true; + } + + private boolean isGenerallyRetryableOperation(final BulkResponseItem bulkItemResponse) { + return !NON_RETRY_STATUS.contains(bulkItemResponse.status()); + } + private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry, - final int retryCount, + final int attemptNumber, final BulkResponse bulkResponse, final Exception exceptionFromRequest) { final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest); - if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt + if (!Objects.isNull(bulkResponse) && attemptNumber == 1) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { if (!isItemInError(bulkItemResponse)) { sentDocumentsOnFirstAttemptCounter.increment(); @@ -263,8 +295,8 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating } } if (doRetry) { - if (retryCount % 5 == 0) { - LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, exceptionFromRequest); + if (attemptNumber % 5 == 1) { + LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", attemptNumber - 1, exceptionFromRequest); if (exceptionFromRequest == null) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { if(isItemInError(bulkItemResponse)) { @@ -304,9 +336,9 @@ private void handleFailures(final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response, previousException); + final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response, previousException, attemptNumber); if (bulkRequestForRetry.getOperationsCount() == 0) { return null; } @@ -316,13 +348,13 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r bulkResponse = requestFunction.apply(bulkRequestForRetry); } catch (Exception e) { incrementErrorCounters(e); - return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e); + return handleRetriesAndFailures(bulkRequestForRetry, attemptNumber, null, e); } if (bulkResponse.errors()) { - return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null); + return handleRetriesAndFailures(bulkRequestForRetry, attemptNumber, bulkResponse, null); } else { final int numberOfDocs = bulkRequestForRetry.getOperationsCount(); - final boolean firstAttempt = (retryCount == 1); + final boolean firstAttempt = (attemptNumber == 1); if (firstAttempt) { sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs); } @@ -337,12 +369,12 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r } private AccumulatingBulkRequest createBulkRequestForRetry( - final AccumulatingBulkRequest request, final BulkResponse response, final Exception previousException) { + final AccumulatingBulkRequest request, final BulkResponse response, final Exception previousException, final int attemptNumber) { if (shouldSendAllForQuerying(previousException)) { - for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) { - existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper); - } - return bulkRequestSupplier.get(); + for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) { + existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper); + } + return bulkRequestSupplier.get(); } if (response == null) { @@ -354,7 +386,7 @@ private AccumulatingBulkRequest createBulkReq int index = 0; for (final BulkResponseItem bulkItemResponse : response.items()) { BulkOperationWrapper bulkOperation = - (BulkOperationWrapper)request.getOperationAt(index); + (BulkOperationWrapper)request.getOperationAt(index); if (isItemInError(bulkItemResponse)) { if (existingDocumentQueryManager != null && POTENTIAL_DUPLICATES_ERRORS.contains(bulkItemResponse.status())) { existingDocumentQueryManager.addBulkOperation(bulkOperation); @@ -362,7 +394,7 @@ private AccumulatingBulkRequest createBulkReq continue; } - if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) { + if (canRetryItem(bulkItemResponse, attemptNumber)) { requestToReissue.addOperation(bulkOperation); } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index f50822fab2..3ffdf43e11 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -20,6 +20,8 @@ import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.OperationType; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -105,7 +107,7 @@ public void setUp() { pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); final List eventHandles = Arrays.asList( - eventHandle1, eventHandle2, eventHandle3, eventHandle4); + eventHandle1, eventHandle2, eventHandle3, eventHandle4); for (final EventHandle eventHandle: eventHandles) { lenient().doAnswer(a -> { boolean result = a.getArgument(0); @@ -120,10 +122,10 @@ public void setUp() { } public BulkRetryStrategy createObjectUnderTest( - final RequestFunction, BulkResponse> requestFunction, - final BiConsumer, Throwable> logFailure, - final Supplier bulkRequestSupplier -) { + final RequestFunction, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final Supplier bulkRequestSupplier + ) { return new BulkRetryStrategy( requestFunction, logFailure, @@ -136,11 +138,11 @@ public BulkRetryStrategy createObjectUnderTest( } public BulkRetryStrategy createObjectUnderTest( - final RequestFunction, BulkResponse> requestFunction, - final BiConsumer, Throwable> logFailure, - final int maxRetries, - final Supplier bulkRequestSupplier -) { + final RequestFunction, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final int maxRetries, + final Supplier bulkRequestSupplier + ) { return new BulkRetryStrategy( requestFunction, logFailure, @@ -195,6 +197,157 @@ public void testCanRetry() { assertTrue(bulkRetryStrategy.canRetry(bulkResponse)); } + @Test + public void testCanRetry_DeleteNotFound_ShouldReturnTrue() { + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + bulkRequest -> mock(BulkResponse.class), + (docWriteRequest, throwable) -> {}, + () -> mock(AccumulatingBulkRequest.class)); + final String testIndex = "foo"; + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem deleteNotFoundItem = deleteNotFoundItemResponse(testIndex); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(successItem, deleteNotFoundItem)); + assertTrue("canRetry should return true for a mix including DELETE + 404", bulkRetryStrategy.canRetry(bulkResponse)); + } + + @Test + public void testCanRetry_NonDeleteNotFound_ShouldReturnFalse() { + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + bulkRequest -> mock(BulkResponse.class), + (docWriteRequest, throwable) -> {}, + () -> mock(AccumulatingBulkRequest.class)); + final String testIndex = "foo"; + final BulkResponseItem successItem = successItemResponse(testIndex); + final BulkResponseItem indexNotFoundItem = indexNotFoundItemResponse(testIndex); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(successItem, indexNotFoundItem)); + assertFalse("canRetry should return false for 404 on non-DELETE operations", bulkRetryStrategy.canRetry(bulkResponse)); + } + + @Test + public void testDeleteNotFound_ReachesMaxRetryLimit() throws Exception { + final String testIndex = "delete-404-retry-limit"; + final FakeClient client = new FakeClient(testIndex); + client.deleteNotFoundMaxRetries = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle1)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("DELETE+404 should retry exactly 3 times before giving up", 3, client.attempt); + assertEquals("No events should succeed", 0, numEventsSucceeded); + assertEquals("DELETE+404 operation should be sent to DLQ after max retries", 1, numEventsFailed); + } + + + @Test + public void testDeleteNotFound_MixedWithSuccessfulOperations() throws Exception { + final String testIndex = "mixed-delete-404-test"; + final FakeClient client = new FakeClient(testIndex); + client.deleteNotFoundMixed = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle2)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle3)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("Both INDEX operations should succeed", 2, numEventsSucceeded); + assertEquals("DELETE+404 operation should be sent to DLQ after max retries", 1, numEventsFailed); + } + + @Test + public void testDeleteNotFound_DoesNotAffectOtherNotFoundOperations() throws Exception { + final String testIndex = "other-404-test"; + final FakeClient client = new FakeClient(testIndex); + client.indexNotFoundTest = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final IndexOperation indexOperation = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation).build(), eventHandle1)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("INDEX+404 should not be retried multiple times", 1, client.attempt); + assertEquals("No events should succeed", 0, numEventsSucceeded); + assertEquals("INDEX+404 operation should be sent to DLQ immediately", 1, numEventsFailed); + } + + @Test + public void testDeleteNotFound_RetryCountDoesNotAffectOtherOperations() throws Exception { + final String testIndex = "retry-isolation-test"; + final FakeClient client = new FakeClient(testIndex); + client.retryCountIsolationTest = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final IndexOperation indexOperation = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle1)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation).build(), eventHandle2)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("At least 2 events should be processed", 2, numEventsSucceeded + numEventsFailed); + } + + @Test + public void testDeleteNotFound_RetriesWithOtherErrors() throws Exception { + final String testIndex = "delete-404-with-others-test"; + final FakeClient client = new FakeClient(testIndex); + client.deleteNotFoundWithOthers = true; + + numEventsSucceeded = 0; + numEventsFailed = 0; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder().index(testIndex).id("delete-404-test").build(); + final IndexOperation indexOperation = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOperation).build(), eventHandle1)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation).build(), eventHandle2)); + + bulkRetryStrategy.execute(accumulatingBulkRequest); + + assertEquals("INDEX operation should eventually succeed", 1, numEventsSucceeded); + assertEquals("DELETE+404 operation should be sent to DLQ after max retries", 1, numEventsFailed); + } + @Test public void testExecuteSuccessOnFirstAttempt() throws Exception { final String testIndex = "bar"; @@ -511,6 +664,184 @@ public void testExecuteNonRetryableResponse() throws Exception { assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0); } + @Test + public void testExecute_DeleteNotFound_RetriesAndSucceeds() throws Exception { + final String testIndex = "delete-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, 2, bulkRequestSupplier); + + final IndexOperation indexOp = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final DeleteOperation deleteOp = new DeleteOperation.Builder().index(testIndex).id("2").build(); + final IndexOperation badIndexOp = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + + final BulkOperationWrapper indexWrapper = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp).build(), eventHandle1); + final BulkOperationWrapper deleteWrapper = new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOp).build(), eventHandle2); + final BulkOperationWrapper badIndexWrapper = new BulkOperationWrapper(new BulkOperation.Builder().index(badIndexOp).build(), eventHandle3); + + final AccumulatingBulkRequest initialRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + initialRequest.addOperation(indexWrapper); + initialRequest.addOperation(deleteWrapper); + initialRequest.addOperation(badIndexWrapper); + + final BulkResponseItem indexSuccessItem = successItemResponse(testIndex); + final BulkResponseItem deleteNotFoundItem = deleteNotFoundItemResponse(testIndex); + final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex); + final BulkResponse firstResponse = mock(BulkResponse.class); + when(firstResponse.errors()).thenReturn(true); + when(firstResponse.items()).thenReturn(Arrays.asList(indexSuccessItem, deleteNotFoundItem, badRequestItem)); + + final BulkResponseItem deleteSuccessItem = successItemResponse(testIndex); + final BulkResponse secondResponse = mock(BulkResponse.class); + when(secondResponse.errors()).thenReturn(false); + when(secondResponse.items()).thenReturn(List.of(deleteSuccessItem)); + + final ArgumentCaptor> requestCaptor = ArgumentCaptor.forClass(AccumulatingBulkRequest.class); + when(requestFunction.apply(requestCaptor.capture())) + .thenReturn(firstResponse) + .thenReturn(secondResponse); + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(initialRequest); + + verify(requestFunction, times(2)).apply(any()); + + final List> capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests.size(), equalTo(2)); + final AccumulatingBulkRequest retryRequest = capturedRequests.get(1); + assertThat(retryRequest.getOperationsCount(), equalTo(1)); + assertThat(retryRequest.getOperationAt(0), equalTo(deleteWrapper)); + + final ArgumentCaptor> failedOpsCaptor = ArgumentCaptor.forClass(List.class); + verify(logFailureConsumer, times(1)).accept(failedOpsCaptor.capture(), eq(null)); + assertThat(failedOpsCaptor.getValue().size(), equalTo(1)); + assertThat(failedOpsCaptor.getValue().get(0).getBulkOperation(), equalTo(badIndexWrapper)); + + verify(eventHandle1).release(true); + verify(eventHandle2).release(true); + + assertThat(numEventsSucceeded, equalTo(2)); + assertThat(numEventsFailed, equalTo(1)); + + final List retryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.BULK_REQUEST_NUMBER_OF_RETRIES).toString()); + assertThat(retryMeasurements.size(), equalTo(1)); + assertThat(retryMeasurements.get(0).getValue(), equalTo(1.0)); + + final List successMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertThat(successMeasurements.size(), equalTo(1)); + assertThat(successMeasurements.get(0).getValue(), equalTo(2.0)); + + final List errorMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertThat(errorMeasurements.size(), equalTo(1)); + assertThat(errorMeasurements.get(0).getValue(), equalTo(1.0)); + } + + @Test + public void testExecute_DeleteNotFound_RetriesAndFailsPermanently() throws Exception { + final String testIndex = "delete-fail-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + final int MAX_RETRIES = 2; + + logFailureConsumer = this::logFailureMaxRetries; + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, MAX_RETRIES, bulkRequestSupplier); + + final DeleteOperation deleteOp = new DeleteOperation.Builder().index(testIndex).id("1").build(); + final BulkOperationWrapper deleteWrapper = new BulkOperationWrapper(new BulkOperation.Builder().delete(deleteOp).build(), eventHandle1); + + final AccumulatingBulkRequest initialRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + initialRequest.addOperation(deleteWrapper); + + final BulkResponseItem deleteNotFoundItem = deleteNotFoundItemResponse(testIndex); + final BulkResponse persistentFailureResponse = mock(BulkResponse.class); + when(persistentFailureResponse.errors()).thenReturn(true); + when(persistentFailureResponse.items()).thenReturn(List.of(deleteNotFoundItem)); + + when(requestFunction.apply(any(AccumulatingBulkRequest.class))).thenReturn(persistentFailureResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + maxRetriesLimitReached = false; + bulkRetryStrategy.execute(initialRequest); + + verify(requestFunction, times(MAX_RETRIES)).apply(any()); + assertThat("Max retries limit should have been reached", maxRetriesLimitReached, equalTo(true)); + + assertThat(numEventsSucceeded, equalTo(0)); + assertThat(numEventsFailed, equalTo(1)); + + final List retryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.BULK_REQUEST_NUMBER_OF_RETRIES).toString()); + assertThat(retryMeasurements.size(), equalTo(1)); + assertThat(retryMeasurements.get(0).getValue(), equalTo((double)MAX_RETRIES)); + + final List errorMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertThat(errorMeasurements.size(), equalTo(1)); + assertThat(errorMeasurements.get(0).getValue(), equalTo(1.0)); + } + + @Test + public void testExecute_NonDeleteNotFound_DoesNotRetry() throws Exception { + final String testIndex = "non-delete-404-index"; + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + requestFunction, logFailureConsumer, 2, bulkRequestSupplier); + + final IndexOperation indexOp = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final BulkOperationWrapper indexWrapper = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp).build(), eventHandle1); + + final AccumulatingBulkRequest initialRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + initialRequest.addOperation(indexWrapper); + + final BulkResponseItem indexNotFoundItem = indexNotFoundItemResponse(testIndex); + final BulkResponse firstResponse = mock(BulkResponse.class); + when(firstResponse.errors()).thenReturn(true); + when(firstResponse.items()).thenReturn(List.of(indexNotFoundItem)); + + when(requestFunction.apply(any(AccumulatingBulkRequest.class))).thenReturn(firstResponse); + + numEventsSucceeded = 0; + numEventsFailed = 0; + bulkRetryStrategy.execute(initialRequest); + + verify(requestFunction, times(1)).apply(any()); + + final ArgumentCaptor> failedOpsCaptor = ArgumentCaptor.forClass(List.class); + verify(logFailureConsumer, times(1)).accept(failedOpsCaptor.capture(), eq(null)); + assertThat(failedOpsCaptor.getValue().size(), equalTo(1)); + assertThat(failedOpsCaptor.getValue().get(0).getBulkOperation(), equalTo(indexWrapper)); + + assertThat(numEventsSucceeded, equalTo(0)); + assertThat(numEventsFailed, equalTo(1)); + + final List retryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.BULK_REQUEST_NUMBER_OF_RETRIES).toString()); + if (!retryMeasurements.isEmpty()) { + assertThat(retryMeasurements.get(0).getValue(), equalTo(0.0)); + } + + final List errorMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + assertThat(errorMeasurements.size(), equalTo(1)); + assertThat(errorMeasurements.get(0).getValue(), equalTo(1.0)); + } + @Test void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status_when_error_is_provided() throws Exception { final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); @@ -867,6 +1198,20 @@ private static BulkResponseItem customBulkFailureResponse(final RestStatus restS return badResponse; } + private static BulkResponseItem deleteNotFoundItemResponse(final String index) { + final BulkResponseItem response = mock(BulkResponseItem.class); + lenient().when(response.status()).thenReturn(RestStatus.NOT_FOUND.getStatus()); + lenient().when(response.operationType()).thenReturn(OperationType.Delete); + return response; + } + + private static BulkResponseItem indexNotFoundItemResponse(final String index) { + final BulkResponseItem response = mock(BulkResponseItem.class); + lenient().when(response.status()).thenReturn(RestStatus.NOT_FOUND.getStatus()); + lenient().when(response.operationType()).thenReturn(OperationType.Index); + return response; + } + private SerializedJson arbitraryDocument() { return SerializedJson.fromStringAndOptionals("{}", null, null, null); } @@ -878,6 +1223,11 @@ private static class FakeClient { boolean nonRetryableException = true; boolean maxRetriesWithSuccesses = false; boolean maxRetriesWithException = false; + boolean deleteNotFoundMaxRetries = false; + boolean deleteNotFoundMixed = false; + boolean indexNotFoundTest = false; + boolean retryCountIsolationTest = false; + boolean deleteNotFoundWithOthers = false; int maxRetriesTestValue = 0; int attempt = 0; String index; @@ -910,6 +1260,26 @@ public BulkResponse bulk(final AccumulatingBulkRequest bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } + + private BulkResponse bulkDeleteNotFoundMixedResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + + if (attempt == 1) { + assert requestSize == 3; + final List bulkItemResponses = Arrays.asList( + successItemResponse(index), + deleteNotFoundItemResponse(index), + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (attempt <= 3) { + assert requestSize == 1; + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else { + assert requestSize == 0; + return new BulkResponse.Builder().items(Arrays.asList()).errors(false).took(10).build(); + } + } + + private BulkResponse bulkIndexNotFoundResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + assert requestSize == 1; + final List bulkItemResponses = Arrays.asList( + indexNotFoundItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } + + private BulkResponse bulkRetryCountIsolationResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + + if (attempt == 1) { + assert requestSize == 2; + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (attempt <= 3) { + if (requestSize == 2) { + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (requestSize == 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } else { + if (requestSize >= 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } + + return new BulkResponse.Builder().items(Arrays.asList()).errors(false).took(10).build(); + } + + private BulkResponse bulkDeleteNotFoundWithOthersResponse(final BulkRequest bulkRequest) { + final int requestSize = bulkRequest.operations().size(); + + if (attempt == 1) { + assert requestSize == 2; + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (attempt <= 3) { + if (requestSize == 2) { + final List bulkItemResponses = Arrays.asList( + deleteNotFoundItemResponse(index), + internalServerErrorItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); + } else if (requestSize == 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } else { + if (requestSize >= 1) { + final List bulkItemResponses = Arrays.asList( + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); + } + } + + return new BulkResponse.Builder().items(Arrays.asList()).errors(false).took(10).build(); + } } }