1515import org .opensearch .client .opensearch .core .BulkRequest ;
1616import org .opensearch .client .opensearch .core .BulkResponse ;
1717import org .opensearch .client .opensearch .core .bulk .BulkResponseItem ;
18+ import org .opensearch .client .opensearch .core .bulk .OperationType ;
1819import org .opensearch .dataprepper .metrics .PluginMetrics ;
1920import org .opensearch .dataprepper .plugins .sink .opensearch .bulk .AccumulatingBulkRequest ;
2021import org .opensearch .dataprepper .plugins .sink .opensearch .dlq .FailedBulkOperation ;
@@ -52,6 +53,7 @@ public final class BulkRetryStrategy {
5253 static final long INITIAL_DELAY_MS = 50 ;
5354 static final long MAXIMUM_DELAY_MS = Duration .ofMinutes (10 ).toMillis ();
5455 static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception" ;
56+ private static final int DELETE_404_MAX_RETRIES = 3 ;
5557
5658 private static final Set <Integer > NON_RETRY_STATUS = new HashSet <>(
5759 Arrays .asList (
@@ -237,7 +239,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte
237239
238240 public boolean canRetry (final BulkResponse response ) {
239241 for (final BulkResponseItem bulkItemResponse : response .items ()) {
240- if (isItemInError (bulkItemResponse ) && ! NON_RETRY_STATUS . contains (bulkItemResponse . status () )) {
242+ if (isItemInError (bulkItemResponse ) && canRetryItem (bulkItemResponse )) {
241243 return true ;
242244 }
243245 }
@@ -250,21 +252,51 @@ public static boolean canRetry(final Exception e) {
250252 !NON_RETRY_STATUS .contains (((OpenSearchException ) e ).status ())));
251253 }
252254
255+ private boolean canRetryItem (final BulkResponseItem bulkItemResponse ) {
256+ return canRetryItem (bulkItemResponse , 1 );
257+ }
258+
259+ private boolean canRetryItem (final BulkResponseItem bulkItemResponse , final int attemptNumber ) {
260+ if (isDeleteOperationWithNotFoundError (bulkItemResponse )) {
261+ return canRetryDeleteNotFoundOperation (bulkItemResponse , attemptNumber );
262+ }
263+
264+ return isGenerallyRetryableOperation (bulkItemResponse );
265+ }
266+
267+ private boolean isDeleteOperationWithNotFoundError (final BulkResponseItem bulkItemResponse ) {
268+ return bulkItemResponse .status () == RestStatus .NOT_FOUND .getStatus () &&
269+ bulkItemResponse .operationType () == OperationType .Delete ;
270+ }
271+
272+ private boolean canRetryDeleteNotFoundOperation (final BulkResponseItem bulkItemResponse , final int attemptNumber ) {
273+ if (attemptNumber > DELETE_404_MAX_RETRIES ) {
274+ LOG .info ("DELETE operation for index '{}' reached maximum retry limit ({}) for 404 errors, sending to DLQ" ,
275+ bulkItemResponse .index (), DELETE_404_MAX_RETRIES );
276+ return false ;
277+ }
278+ return true ;
279+ }
280+
281+ private boolean isGenerallyRetryableOperation (final BulkResponseItem bulkItemResponse ) {
282+ return !NON_RETRY_STATUS .contains (bulkItemResponse .status ());
283+ }
284+
253285 private BulkOperationRequestResponse handleRetriesAndFailures (final AccumulatingBulkRequest bulkRequestForRetry ,
254- final int retryCount ,
286+ final int attemptNumber ,
255287 final BulkResponse bulkResponse ,
256288 final Exception exceptionFromRequest ) {
257289 final boolean doRetry = (Objects .isNull (exceptionFromRequest )) ? canRetry (bulkResponse ) : canRetry (exceptionFromRequest );
258- if (!Objects .isNull (bulkResponse ) && retryCount == 1 ) { // first attempt
290+ if (!Objects .isNull (bulkResponse ) && attemptNumber == 1 ) {
259291 for (final BulkResponseItem bulkItemResponse : bulkResponse .items ()) {
260292 if (!isItemInError (bulkItemResponse )) {
261293 sentDocumentsOnFirstAttemptCounter .increment ();
262294 }
263295 }
264296 }
265297 if (doRetry ) {
266- if (retryCount % 5 == 0 ) {
267- LOG .warn ("Bulk Operation Failed. Number of retries {}. Retrying... " , retryCount , exceptionFromRequest );
298+ if (attemptNumber % 5 == 1 ) {
299+ LOG .warn ("Bulk Operation Failed. Number of retries {}. Retrying... " , attemptNumber - 1 , exceptionFromRequest );
268300 if (exceptionFromRequest == null ) {
269301 for (final BulkResponseItem bulkItemResponse : bulkResponse .items ()) {
270302 if (isItemInError (bulkItemResponse )) {
@@ -304,9 +336,9 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
304336
305337 private BulkOperationRequestResponse handleRetry (final AccumulatingBulkRequest request ,
306338 final BulkResponse response ,
307- int retryCount ,
339+ int attemptNumber ,
308340 final Exception previousException ) throws InterruptedException {
309- final AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > bulkRequestForRetry = createBulkRequestForRetry (request , response , previousException );
341+ final AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > bulkRequestForRetry = createBulkRequestForRetry (request , response , previousException , attemptNumber );
310342 if (bulkRequestForRetry .getOperationsCount () == 0 ) {
311343 return null ;
312344 }
@@ -316,13 +348,13 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
316348 bulkResponse = requestFunction .apply (bulkRequestForRetry );
317349 } catch (Exception e ) {
318350 incrementErrorCounters (e );
319- return handleRetriesAndFailures (bulkRequestForRetry , retryCount , null , e );
351+ return handleRetriesAndFailures (bulkRequestForRetry , attemptNumber , null , e );
320352 }
321353 if (bulkResponse .errors ()) {
322- return handleRetriesAndFailures (bulkRequestForRetry , retryCount , bulkResponse , null );
354+ return handleRetriesAndFailures (bulkRequestForRetry , attemptNumber , bulkResponse , null );
323355 } else {
324356 final int numberOfDocs = bulkRequestForRetry .getOperationsCount ();
325- final boolean firstAttempt = (retryCount == 1 );
357+ final boolean firstAttempt = (attemptNumber == 1 );
326358 if (firstAttempt ) {
327359 sentDocumentsOnFirstAttemptCounter .increment (numberOfDocs );
328360 }
@@ -337,12 +369,12 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
337369 }
338370
339371 private AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > createBulkRequestForRetry (
340- final AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > request , final BulkResponse response , final Exception previousException ) {
372+ final AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > request , final BulkResponse response , final Exception previousException , final int attemptNumber ) {
341373 if (shouldSendAllForQuerying (previousException )) {
342- for (final BulkOperationWrapper bulkOperationWrapper : request .getOperations ()) {
343- existingDocumentQueryManager .addBulkOperation (bulkOperationWrapper );
344- }
345- return bulkRequestSupplier .get ();
374+ for (final BulkOperationWrapper bulkOperationWrapper : request .getOperations ()) {
375+ existingDocumentQueryManager .addBulkOperation (bulkOperationWrapper );
376+ }
377+ return bulkRequestSupplier .get ();
346378 }
347379
348380 if (response == null ) {
@@ -354,15 +386,15 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
354386 int index = 0 ;
355387 for (final BulkResponseItem bulkItemResponse : response .items ()) {
356388 BulkOperationWrapper bulkOperation =
357- (BulkOperationWrapper )request .getOperationAt (index );
389+ (BulkOperationWrapper )request .getOperationAt (index );
358390 if (isItemInError (bulkItemResponse )) {
359391 if (existingDocumentQueryManager != null && POTENTIAL_DUPLICATES_ERRORS .contains (bulkItemResponse .status ())) {
360392 existingDocumentQueryManager .addBulkOperation (bulkOperation );
361393 index ++;
362394 continue ;
363395 }
364396
365- if (! NON_RETRY_STATUS . contains (bulkItemResponse . status () )) {
397+ if (canRetryItem (bulkItemResponse , attemptNumber )) {
366398 requestToReissue .addOperation (bulkOperation );
367399 } else if (bulkItemResponse .error () != null && VERSION_CONFLICT_EXCEPTION_TYPE .equals (bulkItemResponse .error ().type ())) {
368400 documentsVersionConflictErrors .increment ();
0 commit comments