Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public final class BulkRetryStrategy {
private final Counter documentsVersionConflictErrors;
private final Counter documentsDuplicates;
private final ExistingDocumentQueryManager existingDocumentQueryManager;
private final boolean isExternalVersioning;
private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class);

static class BulkOperationRequestResponse {
Expand Down Expand Up @@ -167,8 +168,10 @@ public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOpera
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier,
final String pipelineName,
final String pluginName,
final ExistingDocumentQueryManager existingDocumentQueryManager) {
final ExistingDocumentQueryManager existingDocumentQueryManager,
final boolean isExternalVersioning) {
this.existingDocumentQueryManager = existingDocumentQueryManager;
this.isExternalVersioning = isExternalVersioning;
this.requestFunction = requestFunction;
this.logFailure = logFailure;
this.successfulOperationsHandler = successfulOperationsHandler;
Expand Down Expand Up @@ -323,9 +326,9 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if(isItemInError(bulkItemResponse)) {
// Skip logging the error for version conflicts
// Skip logging the error for version conflicts when using external versioning
final ErrorCause error = bulkItemResponse.error();
if (error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
if (isExternalVersioning && error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
continue;
}
LOG.warn("index = {}, operation = {}, status = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
Expand Down Expand Up @@ -401,11 +404,11 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq

if (canRetryItem(bulkItemResponse, attemptNumber)) {
requestToReissue.addOperation(bulkOperation);
} else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
} else if (isExternalVersioning && bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason());
// This is not a successfully sent document, so do not add to "successfulOperations"
// and just release the eventHandle
// When using external versioning, version conflicts are expected and not true errors.
// Release the eventHandle without counting as a document error.
bulkOperation.releaseEventHandle(true);
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
Expand Down Expand Up @@ -441,20 +444,20 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
final BulkResponseItem bulkItemResponse = itemResponses.get(i);
final BulkOperationWrapper bulkOperation = accumulatingBulkRequest.getOperationAt(i);
if (isItemInError(bulkItemResponse)) {
if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
if (isExternalVersioning && bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkOperation.getIndex(), bulkItemResponse.error().reason());
// This is not a successfully sent document, so do not add to "successfulOperations"
// and just release the eventHandle
// When using external versioning, version conflicts are expected and not true errors.
// Release the eventHandle without counting as a document error.
bulkOperation.releaseEventHandle(true);
} else {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
.build());
documentErrorsCounter.increment();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change! there was a discussion here on having a condition to skip applying documentErrors only if the document_version_type is set to external. I think that in some cases there can be a true failure when not using external versioning. Can we add that check here?

getDocumentStatusCounter(bulkItemResponse.status()).increment();
}
documentErrorsCounter.increment();
getDocumentStatusCounter(bulkItemResponse.status()).increment();
} else {
sentDocumentsCounter.increment();
if(isDuplicateDocument(bulkItemResponse)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ private void doInitializeInternal() throws IOException {
bulkRequestSupplier,
pipeline,
PLUGIN_NAME,
openSearchSinkConfig.getIndexConfiguration().getQueryOnBulkFailures() ? existingDocumentQueryManager : null);
openSearchSinkConfig.getIndexConfiguration().getQueryOnBulkFailures() ? existingDocumentQueryManager : null,
isExternalVersionType(openSearchSinkConfig.getIndexConfiguration().getVersionType()));

if (queryExecutorService != null) {
existingDocumentQueryManager = new ExistingDocumentQueryManager(openSearchSinkConfig.getIndexConfiguration(), pluginMetrics, openSearchClient);
Expand Down Expand Up @@ -673,6 +674,10 @@ private void maybeUpdateServerlessNetworkPolicy() {
}
}

private static boolean isExternalVersionType(final VersionType versionType) {
return versionType != null && (versionType == VersionType.External || versionType == VersionType.ExternalGte);
}

private DlqObject createDlqObjectFromEvent(final Event event,
final String index,
final String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public BulkRetryStrategy createObjectUnderTest(
bulkRequestSupplier,
PIPELINE_NAME,
PLUGIN_NAME,
null);
null,
false);
}

public BulkRetryStrategy createObjectUnderTest(
Expand All @@ -171,7 +172,8 @@ public BulkRetryStrategy createObjectUnderTest(
bulkRequestSupplier,
PIPELINE_NAME,
PLUGIN_NAME,
null);
null,
false);
}

public BulkRetryStrategy createObjectUnderTest(
Expand Down Expand Up @@ -199,7 +201,35 @@ public BulkRetryStrategy createObjectUnderTest(
bulkRequestSupplier,
PIPELINE_NAME,
PLUGIN_NAME,
existingDocumentQueryManager);
existingDocumentQueryManager,
false);
}

public BulkRetryStrategy createObjectUnderTestWithExternalVersioning(
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction,
final BiConsumer<List<FailedBulkOperation>, Throwable> logFailure,
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier
) {
return new BulkRetryStrategy(
requestFunction,
logFailure,
(operations) -> {
for (BulkOperationWrapper operation: operations) {
if (operation.getEvent() != null) {
operation.getEvent().getEventHandle().release(true);
}
if (operation.getEventHandle() != null) {
operation.getEventHandle().release(true);
}
}
},
pluginMetrics,
Integer.MAX_VALUE,
bulkRequestSupplier,
PIPELINE_NAME,
PLUGIN_NAME,
null,
true);
}

@Test
Expand Down Expand Up @@ -629,12 +659,6 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception {
assertThat(maxRetriesLimitReached, equalTo(true));
assertEquals(numEventsSucceeded, 2);
assertEquals(numEventsFailed, 2);

final List<Measurement> documentVersionConflictMeasurement = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString());
assertEquals(1, documentVersionConflictMeasurement.size());
assertEquals(1.0, documentVersionConflictMeasurement.get(0).getValue(), 0);
}

@Test
Expand Down Expand Up @@ -694,6 +718,128 @@ public void testExecuteNonRetryableResponse() throws Exception {
assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0);
}

@Test
public void testExecute_VersionConflictDoesNotIncrementDocumentErrors() throws Exception {
final String testIndex = "version-conflict-index";
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());

final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTestWithExternalVersioning(
requestFunction, logFailureConsumer, bulkRequestSupplier);

final IndexOperation<SerializedJson> indexOp1 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("1").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOp2 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("2").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOp3 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("3").document(arbitraryDocument()).build();

final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1);
final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2);
final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3);

final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
accumulatingBulkRequest.addOperation(wrapper1);
accumulatingBulkRequest.addOperation(wrapper2);
accumulatingBulkRequest.addOperation(wrapper3);

// Response: 1 success, 1 version conflict, 1 bad request
final BulkResponseItem successItem = successItemResponse(testIndex);
final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse();
final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex);
final List<BulkResponseItem> responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem);

final BulkResponse bulkResponse = mock(BulkResponse.class);
when(bulkResponse.errors()).thenReturn(true);
when(bulkResponse.items()).thenReturn(responseItems);
when(requestFunction.apply(any())).thenReturn(bulkResponse);

numEventsSucceeded = 0;
numEventsFailed = 0;
bulkRetryStrategy.execute(accumulatingBulkRequest);

// Version conflict should NOT be counted as a document error
final List<Measurement> documentErrorsMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
assertEquals(1, documentErrorsMeasurements.size());
assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0);

// Version conflict should be counted in its own metric
final List<Measurement> versionConflictMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString());
assertEquals(1, versionConflictMeasurements.size());
assertEquals(1.0, versionConflictMeasurements.get(0).getValue(), 0);

// Success metric should count the 1 successful document
final List<Measurement> successMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString());
assertEquals(1, successMeasurements.size());
assertEquals(1.0, successMeasurements.get(0).getValue(), 0);

// Version conflict event handle should be released with true (success)
verify(eventHandle2).release(true);
}

@Test
public void testExecute_VersionConflictIncrementsDocumentErrors_WhenNotExternalVersioning() throws Exception {
final String testIndex = "version-conflict-index";
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());

// Use default (non-external versioning)
final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest(
requestFunction, logFailureConsumer, bulkRequestSupplier);

final IndexOperation<SerializedJson> indexOp1 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("1").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOp2 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("2").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOp3 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("3").document(arbitraryDocument()).build();

final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1);
final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2);
final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3);

final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
accumulatingBulkRequest.addOperation(wrapper1);
accumulatingBulkRequest.addOperation(wrapper2);
accumulatingBulkRequest.addOperation(wrapper3);

// Response: 1 success, 1 version conflict, 1 bad request
final BulkResponseItem successItem = successItemResponse(testIndex);
final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse();
final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex);
final List<BulkResponseItem> responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem);

final BulkResponse bulkResponse = mock(BulkResponse.class);
when(bulkResponse.errors()).thenReturn(true);
when(bulkResponse.items()).thenReturn(responseItems);
when(requestFunction.apply(any())).thenReturn(bulkResponse);

numEventsSucceeded = 0;
numEventsFailed = 0;
bulkRetryStrategy.execute(accumulatingBulkRequest);

// Without external versioning, version conflict SHOULD be counted as a document error
final List<Measurement> documentErrorsMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
assertEquals(1, documentErrorsMeasurements.size());
assertEquals(2.0, documentErrorsMeasurements.get(0).getValue(), 0);

// Version conflict metric should NOT be incremented without external versioning
final List<Measurement> versionConflictMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString());
assertEquals(1, versionConflictMeasurements.size());
assertEquals(0.0, versionConflictMeasurements.get(0).getValue(), 0);

// Success metric should count the 1 successful document
final List<Measurement> successMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString());
assertEquals(1, successMeasurements.size());
assertEquals(1.0, successMeasurements.get(0).getValue(), 0);
}

@Test
public void testExecute_DeleteNotFound_RetriesAndSucceeds() throws Exception {
final String testIndex = "delete-index";
Expand Down
Loading
Loading