Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.core.bulk.OperationType;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation;
Expand Down Expand Up @@ -52,6 +53,7 @@ public final class BulkRetryStrategy {
static final long INITIAL_DELAY_MS = 50;
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception";
private static final int DELETE_404_MAX_RETRIES = 3;

private static final Set<Integer> NON_RETRY_STATUS = new HashSet<>(
Arrays.asList(
Expand Down Expand Up @@ -237,7 +239,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte

public boolean canRetry(final BulkResponse response) {
for (final BulkResponseItem bulkItemResponse : response.items()) {
if (isItemInError(bulkItemResponse) && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
if (isItemInError(bulkItemResponse) && canRetryItem(bulkItemResponse)) {
return true;
}
}
Expand All @@ -250,21 +252,51 @@ public static boolean canRetry(final Exception e) {
!NON_RETRY_STATUS.contains(((OpenSearchException) e).status())));
}

private boolean canRetryItem(final BulkResponseItem bulkItemResponse) {
return canRetryItem(bulkItemResponse, 1);
}

private boolean canRetryItem(final BulkResponseItem bulkItemResponse, final int attemptNumber) {
if (isDeleteOperationWithNotFoundError(bulkItemResponse)) {
return canRetryDeleteNotFoundOperation(bulkItemResponse, attemptNumber);
}

return isGenerallyRetryableOperation(bulkItemResponse);
}

private boolean isDeleteOperationWithNotFoundError(final BulkResponseItem bulkItemResponse) {
return bulkItemResponse.status() == RestStatus.NOT_FOUND.getStatus() &&
bulkItemResponse.operationType() == OperationType.Delete;
}

private boolean canRetryDeleteNotFoundOperation(final BulkResponseItem bulkItemResponse, final int attemptNumber) {
if (attemptNumber > DELETE_404_MAX_RETRIES) {
LOG.info("DELETE operation for index '{}' reached maximum retry limit ({}) for 404 errors, sending to DLQ",
bulkItemResponse.index(), DELETE_404_MAX_RETRIES);
return false;
}
return true;
}

private boolean isGenerallyRetryableOperation(final BulkResponseItem bulkItemResponse) {
return !NON_RETRY_STATUS.contains(bulkItemResponse.status());
}

private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry,
final int retryCount,
final int attemptNumber,
final BulkResponse bulkResponse,
final Exception exceptionFromRequest) {
final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest);
if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt
if (!Objects.isNull(bulkResponse) && attemptNumber == 1) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (!isItemInError(bulkItemResponse)) {
sentDocumentsOnFirstAttemptCounter.increment();
}
}
}
if (doRetry) {
if (retryCount % 5 == 0) {
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, exceptionFromRequest);
if (attemptNumber % 5 == 1) {
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", attemptNumber - 1, exceptionFromRequest);
if (exceptionFromRequest == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if(isItemInError(bulkItemResponse)) {
Expand Down Expand Up @@ -304,9 +336,9 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,

private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest request,
final BulkResponse response,
int retryCount,
int attemptNumber,
final Exception previousException) throws InterruptedException {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response, previousException);
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response, previousException, attemptNumber);
if (bulkRequestForRetry.getOperationsCount() == 0) {
return null;
}
Expand All @@ -316,13 +348,13 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
bulkResponse = requestFunction.apply(bulkRequestForRetry);
} catch (Exception e) {
incrementErrorCounters(e);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e);
return handleRetriesAndFailures(bulkRequestForRetry, attemptNumber, null, e);
}
if (bulkResponse.errors()) {
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null);
return handleRetriesAndFailures(bulkRequestForRetry, attemptNumber, bulkResponse, null);
} else {
final int numberOfDocs = bulkRequestForRetry.getOperationsCount();
final boolean firstAttempt = (retryCount == 1);
final boolean firstAttempt = (attemptNumber == 1);
if (firstAttempt) {
sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs);
}
Expand All @@ -337,12 +369,12 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
}

private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkRequestForRetry(
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> request, final BulkResponse response, final Exception previousException) {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> request, final BulkResponse response, final Exception previousException, final int attemptNumber) {
if (shouldSendAllForQuerying(previousException)) {
for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) {
existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper);
}
return bulkRequestSupplier.get();
for (final BulkOperationWrapper bulkOperationWrapper : request.getOperations()) {
existingDocumentQueryManager.addBulkOperation(bulkOperationWrapper);
}
return bulkRequestSupplier.get();
}

if (response == null) {
Expand All @@ -354,15 +386,15 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
int index = 0;
for (final BulkResponseItem bulkItemResponse : response.items()) {
BulkOperationWrapper bulkOperation =
(BulkOperationWrapper)request.getOperationAt(index);
(BulkOperationWrapper)request.getOperationAt(index);
if (isItemInError(bulkItemResponse)) {
if (existingDocumentQueryManager != null && POTENTIAL_DUPLICATES_ERRORS.contains(bulkItemResponse.status())) {
existingDocumentQueryManager.addBulkOperation(bulkOperation);
index++;
continue;
}

if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
if (canRetryItem(bulkItemResponse, attemptNumber)) {
requestToReissue.addOperation(bulkOperation);
} else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
Expand Down
Loading
Loading