Skip to content

Commit 4865dce

Browse files
Fix invalid document version events still included in bulk requests (opensearch-project#6601) (opensearch-project#6758)
Events with invalid document_version values are correctly sent to the DLQ but were still being added to the OpenSearch bulk request. Added continue statements after the NumberFormatException and RuntimeException catch blocks in doOutput() to skip the rest of the loop iteration for failed events. Made version conflict handling in BulkRetryStrategy conditional on external versioning. Version conflicts are only silently handled (skipping documentErrors) when document_version_type is set to external or external_gte. For internal versioning or when unset, version conflicts are treated as document errors. BulkRetryStrategy now accepts a boolean isExternalVersioning parameter instead of importing VersionType directly, keeping the OpenSearch client type out of its API. The VersionType check is done in OpenSearchSink via the isExternalVersionType helper method. Signed-off-by: Keyur-S-Patel <keyurpatel.opensource@gmail.com>
1 parent f3072d7 commit 4865dce

4 files changed

Lines changed: 232 additions & 21 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public final class BulkRetryStrategy {
136136
private final Counter documentsVersionConflictErrors;
137137
private final Counter documentsDuplicates;
138138
private final ExistingDocumentQueryManager existingDocumentQueryManager;
139+
private final boolean isExternalVersioning;
139140
private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class);
140141

141142
static class BulkOperationRequestResponse {
@@ -167,8 +168,10 @@ public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOpera
167168
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier,
168169
final String pipelineName,
169170
final String pluginName,
170-
final ExistingDocumentQueryManager existingDocumentQueryManager) {
171+
final ExistingDocumentQueryManager existingDocumentQueryManager,
172+
final boolean isExternalVersioning) {
171173
this.existingDocumentQueryManager = existingDocumentQueryManager;
174+
this.isExternalVersioning = isExternalVersioning;
172175
this.requestFunction = requestFunction;
173176
this.logFailure = logFailure;
174177
this.successfulOperationsHandler = successfulOperationsHandler;
@@ -323,9 +326,9 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
323326
if (failure == null) {
324327
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
325328
if(isItemInError(bulkItemResponse)) {
326-
// Skip logging the error for version conflicts
329+
// Skip logging the error for version conflicts when using external versioning
327330
final ErrorCause error = bulkItemResponse.error();
328-
if (error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
331+
if (isExternalVersioning && error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
329332
continue;
330333
}
331334
LOG.warn("index = {}, operation = {}, status = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
@@ -401,11 +404,11 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
401404

402405
if (canRetryItem(bulkItemResponse, attemptNumber)) {
403406
requestToReissue.addOperation(bulkOperation);
404-
} else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
407+
} else if (isExternalVersioning && bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
405408
documentsVersionConflictErrors.increment();
406409
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason());
407-
// This is not a successfully sent document, so do not add to "successfulOperations"
408-
// and just release the eventHandle
410+
// When using external versioning, version conflicts are expected and not true errors.
411+
// Release the eventHandle without counting as a document error.
409412
bulkOperation.releaseEventHandle(true);
410413
} else {
411414
nonRetryableFailures.add(FailedBulkOperation.builder()
@@ -441,20 +444,20 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
441444
final BulkResponseItem bulkItemResponse = itemResponses.get(i);
442445
final BulkOperationWrapper bulkOperation = accumulatingBulkRequest.getOperationAt(i);
443446
if (isItemInError(bulkItemResponse)) {
444-
if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
447+
if (isExternalVersioning && bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
445448
documentsVersionConflictErrors.increment();
446449
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkOperation.getIndex(), bulkItemResponse.error().reason());
447-
// This is not a successfully sent document, so do not add to "successfulOperations"
448-
// and just release the eventHandle
450+
// When using external versioning, version conflicts are expected and not true errors.
451+
// Release the eventHandle without counting as a document error.
449452
bulkOperation.releaseEventHandle(true);
450453
} else {
451454
failures.add(FailedBulkOperation.builder()
452455
.withBulkOperation(bulkOperation)
453456
.withBulkResponseItem(bulkItemResponse)
454457
.build());
458+
documentErrorsCounter.increment();
459+
getDocumentStatusCounter(bulkItemResponse.status()).increment();
455460
}
456-
documentErrorsCounter.increment();
457-
getDocumentStatusCounter(bulkItemResponse.status()).increment();
458461
} else {
459462
sentDocumentsCounter.increment();
460463
if(isDuplicateDocument(bulkItemResponse)) {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ private void doInitializeInternal() throws IOException {
317317
bulkRequestSupplier,
318318
pipeline,
319319
PLUGIN_NAME,
320-
openSearchSinkConfig.getIndexConfiguration().getQueryOnBulkFailures() ? existingDocumentQueryManager : null);
320+
openSearchSinkConfig.getIndexConfiguration().getQueryOnBulkFailures() ? existingDocumentQueryManager : null,
321+
isExternalVersionType(openSearchSinkConfig.getIndexConfiguration().getVersionType()));
321322

322323
if (queryExecutorService != null) {
323324
existingDocumentQueryManager = new ExistingDocumentQueryManager(openSearchSinkConfig.getIndexConfiguration(), pluginMetrics, openSearchClient);
@@ -673,6 +674,10 @@ private void maybeUpdateServerlessNetworkPolicy() {
673674
}
674675
}
675676

677+
private static boolean isExternalVersionType(final VersionType versionType) {
678+
return versionType != null && (versionType == VersionType.External || versionType == VersionType.ExternalGte);
679+
}
680+
676681
private DlqObject createDlqObjectFromEvent(final Event event,
677682
final String index,
678683
final String message) {

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java

Lines changed: 155 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public BulkRetryStrategy createObjectUnderTest(
144144
bulkRequestSupplier,
145145
PIPELINE_NAME,
146146
PLUGIN_NAME,
147-
null);
147+
null,
148+
false);
148149
}
149150

150151
public BulkRetryStrategy createObjectUnderTest(
@@ -171,7 +172,8 @@ public BulkRetryStrategy createObjectUnderTest(
171172
bulkRequestSupplier,
172173
PIPELINE_NAME,
173174
PLUGIN_NAME,
174-
null);
175+
null,
176+
false);
175177
}
176178

177179
public BulkRetryStrategy createObjectUnderTest(
@@ -199,7 +201,35 @@ public BulkRetryStrategy createObjectUnderTest(
199201
bulkRequestSupplier,
200202
PIPELINE_NAME,
201203
PLUGIN_NAME,
202-
existingDocumentQueryManager);
204+
existingDocumentQueryManager,
205+
false);
206+
}
207+
208+
public BulkRetryStrategy createObjectUnderTestWithExternalVersioning(
209+
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction,
210+
final BiConsumer<List<FailedBulkOperation>, Throwable> logFailure,
211+
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier
212+
) {
213+
return new BulkRetryStrategy(
214+
requestFunction,
215+
logFailure,
216+
(operations) -> {
217+
for (BulkOperationWrapper operation: operations) {
218+
if (operation.getEvent() != null) {
219+
operation.getEvent().getEventHandle().release(true);
220+
}
221+
if (operation.getEventHandle() != null) {
222+
operation.getEventHandle().release(true);
223+
}
224+
}
225+
},
226+
pluginMetrics,
227+
Integer.MAX_VALUE,
228+
bulkRequestSupplier,
229+
PIPELINE_NAME,
230+
PLUGIN_NAME,
231+
null,
232+
true);
203233
}
204234

205235
@Test
@@ -629,12 +659,6 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception {
629659
assertThat(maxRetriesLimitReached, equalTo(true));
630660
assertEquals(numEventsSucceeded, 2);
631661
assertEquals(numEventsFailed, 2);
632-
633-
final List<Measurement> documentVersionConflictMeasurement = MetricsTestUtil.getMeasurementList(
634-
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
635-
.add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString());
636-
assertEquals(1, documentVersionConflictMeasurement.size());
637-
assertEquals(1.0, documentVersionConflictMeasurement.get(0).getValue(), 0);
638662
}
639663

640664
@Test
@@ -694,6 +718,128 @@ public void testExecuteNonRetryableResponse() throws Exception {
694718
assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0);
695719
}
696720

721+
@Test
722+
public void testExecute_VersionConflictDoesNotIncrementDocumentErrors() throws Exception {
723+
final String testIndex = "version-conflict-index";
724+
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
725+
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
726+
727+
final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTestWithExternalVersioning(
728+
requestFunction, logFailureConsumer, bulkRequestSupplier);
729+
730+
final IndexOperation<SerializedJson> indexOp1 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("1").document(arbitraryDocument()).build();
731+
final IndexOperation<SerializedJson> indexOp2 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("2").document(arbitraryDocument()).build();
732+
final IndexOperation<SerializedJson> indexOp3 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("3").document(arbitraryDocument()).build();
733+
734+
final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1);
735+
final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2);
736+
final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3);
737+
738+
final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
739+
accumulatingBulkRequest.addOperation(wrapper1);
740+
accumulatingBulkRequest.addOperation(wrapper2);
741+
accumulatingBulkRequest.addOperation(wrapper3);
742+
743+
// Response: 1 success, 1 version conflict, 1 bad request
744+
final BulkResponseItem successItem = successItemResponse(testIndex);
745+
final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse();
746+
final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex);
747+
final List<BulkResponseItem> responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem);
748+
749+
final BulkResponse bulkResponse = mock(BulkResponse.class);
750+
when(bulkResponse.errors()).thenReturn(true);
751+
when(bulkResponse.items()).thenReturn(responseItems);
752+
when(requestFunction.apply(any())).thenReturn(bulkResponse);
753+
754+
numEventsSucceeded = 0;
755+
numEventsFailed = 0;
756+
bulkRetryStrategy.execute(accumulatingBulkRequest);
757+
758+
// Version conflict should NOT be counted as a document error
759+
final List<Measurement> documentErrorsMeasurements = MetricsTestUtil.getMeasurementList(
760+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
761+
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
762+
assertEquals(1, documentErrorsMeasurements.size());
763+
assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0);
764+
765+
// Version conflict should be counted in its own metric
766+
final List<Measurement> versionConflictMeasurements = MetricsTestUtil.getMeasurementList(
767+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
768+
.add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString());
769+
assertEquals(1, versionConflictMeasurements.size());
770+
assertEquals(1.0, versionConflictMeasurements.get(0).getValue(), 0);
771+
772+
// Success metric should count the 1 successful document
773+
final List<Measurement> successMeasurements = MetricsTestUtil.getMeasurementList(
774+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
775+
.add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString());
776+
assertEquals(1, successMeasurements.size());
777+
assertEquals(1.0, successMeasurements.get(0).getValue(), 0);
778+
779+
// Version conflict event handle should be released with true (success)
780+
verify(eventHandle2).release(true);
781+
}
782+
783+
@Test
784+
public void testExecute_VersionConflictIncrementsDocumentErrors_WhenNotExternalVersioning() throws Exception {
785+
final String testIndex = "version-conflict-index";
786+
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
787+
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
788+
789+
// Use default (non-external versioning)
790+
final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest(
791+
requestFunction, logFailureConsumer, bulkRequestSupplier);
792+
793+
final IndexOperation<SerializedJson> indexOp1 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("1").document(arbitraryDocument()).build();
794+
final IndexOperation<SerializedJson> indexOp2 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("2").document(arbitraryDocument()).build();
795+
final IndexOperation<SerializedJson> indexOp3 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("3").document(arbitraryDocument()).build();
796+
797+
final BulkOperationWrapper wrapper1 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp1).build(), eventHandle1);
798+
final BulkOperationWrapper wrapper2 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp2).build(), eventHandle2);
799+
final BulkOperationWrapper wrapper3 = new BulkOperationWrapper(new BulkOperation.Builder().index(indexOp3).build(), eventHandle3);
800+
801+
final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
802+
accumulatingBulkRequest.addOperation(wrapper1);
803+
accumulatingBulkRequest.addOperation(wrapper2);
804+
accumulatingBulkRequest.addOperation(wrapper3);
805+
806+
// Response: 1 success, 1 version conflict, 1 bad request
807+
final BulkResponseItem successItem = successItemResponse(testIndex);
808+
final BulkResponseItem versionConflictItem = versionConflictErrorItemResponse();
809+
final BulkResponseItem badRequestItem = badRequestItemResponse(testIndex);
810+
final List<BulkResponseItem> responseItems = Arrays.asList(successItem, versionConflictItem, badRequestItem);
811+
812+
final BulkResponse bulkResponse = mock(BulkResponse.class);
813+
when(bulkResponse.errors()).thenReturn(true);
814+
when(bulkResponse.items()).thenReturn(responseItems);
815+
when(requestFunction.apply(any())).thenReturn(bulkResponse);
816+
817+
numEventsSucceeded = 0;
818+
numEventsFailed = 0;
819+
bulkRetryStrategy.execute(accumulatingBulkRequest);
820+
821+
// Without external versioning, version conflict SHOULD be counted as a document error
822+
final List<Measurement> documentErrorsMeasurements = MetricsTestUtil.getMeasurementList(
823+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
824+
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
825+
assertEquals(1, documentErrorsMeasurements.size());
826+
assertEquals(2.0, documentErrorsMeasurements.get(0).getValue(), 0);
827+
828+
// Version conflict metric should NOT be incremented without external versioning
829+
final List<Measurement> versionConflictMeasurements = MetricsTestUtil.getMeasurementList(
830+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
831+
.add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString());
832+
assertEquals(1, versionConflictMeasurements.size());
833+
assertEquals(0.0, versionConflictMeasurements.get(0).getValue(), 0);
834+
835+
// Success metric should count the 1 successful document
836+
final List<Measurement> successMeasurements = MetricsTestUtil.getMeasurementList(
837+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
838+
.add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString());
839+
assertEquals(1, successMeasurements.size());
840+
assertEquals(1.0, successMeasurements.get(0).getValue(), 0);
841+
}
842+
697843
@Test
698844
public void testExecute_DeleteNotFound_RetriesAndSucceeds() throws Exception {
699845
final String testIndex = "delete-index";

0 commit comments

Comments
 (0)