Skip to content

Commit 506ee95

Browse files
committed
Fix invalid document version events still included in bulk requests (#6601)
Events with invalid document_version values are correctly sent to the DLQ but were still being added to the OpenSearch bulk request. Added continue statements after the NumberFormatException and RuntimeException catch blocks in doOutput() to skip the rest of the loop iteration for failed events. Added unit tests to verify events with invalid versions are not added to the bulk request. Signed-off-by: Kiro <kiro@amazon.com>
1 parent bfda518 commit 506ee95

2 files changed

Lines changed: 68 additions & 0 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,18 +477,26 @@ public void doOutput(final Collection<Record<Event>> records) {
477477
versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator);
478478
version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator));
479479
} catch (final NumberFormatException e) {
480+
// Skip this event from the bulk request after sending to DLQ to avoid
481+
// including events with invalid versions in the OpenSearch bulk request.
482+
// See https://github.com/opensearch-project/data-prepper/issues/6601
480483
final String errorMessage = String.format(
481484
"Unable to convert the result of evaluating document_version '%s' to Long for an Event. The evaluation result '%s' must be a valid Long type", versionExpression, versionExpressionEvaluationResult
482485
);
483486
LOG.error(errorMessage);
484487
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e);
485488
dynamicDocumentVersionDroppedEvents.increment();
489+
continue;
486490
} catch (final RuntimeException e) {
491+
// Skip this event from the bulk request after sending to DLQ to avoid
492+
// including events with invalid versions in the OpenSearch bulk request.
493+
// See https://github.com/opensearch-project/data-prepper/issues/6601
487494
final String errorMessage = String.format(
488495
"There was an exception when evaluating the document_version '%s': %s", versionExpression, e.getMessage());
489496
LOG.error(errorMessage + " Check the dlq if configured to see more details about the affected Event");
490497
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e);
491498
dynamicDocumentVersionDroppedEvents.increment();
499+
continue;
492500
}
493501
}
494502

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import org.junit.jupiter.api.BeforeEach;
1212
import org.junit.jupiter.api.Test;
1313
import org.junit.jupiter.api.extension.ExtendWith;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.Arguments;
16+
import org.junit.jupiter.params.provider.MethodSource;
1417
import org.mockito.ArgumentCaptor;
1518
import org.mockito.Mock;
1619
import org.mockito.MockedConstruction;
@@ -54,6 +57,7 @@
5457
import java.util.Map;
5558
import java.util.Optional;
5659
import java.util.UUID;
60+
import java.util.stream.Stream;
5761

5862
import static org.hamcrest.MatcherAssert.assertThat;
5963
import static org.hamcrest.Matchers.equalTo;
@@ -353,6 +357,62 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_
353357
verify(dynamicDocumentVersionDroppedEvents).increment();
354358
}
355359

360+
@ParameterizedTest
361+
@MethodSource("invalidVersionExceptionProvider")
362+
void doOutput_with_invalid_version_expression_does_not_add_event_to_bulk_request(
363+
final Class<? extends RuntimeException> exceptionType) throws IOException {
364+
when(pluginSetting.getName()).thenReturn("opensearch");
365+
final String versionExpression = UUID.randomUUID().toString();
366+
when(indexConfiguration.getVersionExpression()).thenReturn(versionExpression);
367+
368+
final Event event = mock(JacksonEvent.class);
369+
final String document = UUID.randomUUID().toString();
370+
when(event.toJsonString()).thenReturn(document);
371+
final EventHandle eventHandle = mock(EventHandle.class);
372+
when(event.getEventHandle()).thenReturn(eventHandle);
373+
final String index = UUID.randomUUID().toString();
374+
when(event.formatString(versionExpression, expressionEvaluator)).thenThrow(exceptionType);
375+
when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index);
376+
final Record<Event> eventRecord = new Record<>(event);
377+
378+
final OpenSearchSink objectUnderTest = createObjectUnderTest();
379+
when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any()))
380+
.thenReturn(indexManager);
381+
doNothing().when(indexManager).setupIndex();
382+
objectUnderTest.initialize();
383+
384+
when(indexManager.getIndexName(anyString())).thenReturn(index);
385+
386+
final DlqObject dlqObject = mock(DlqObject.class);
387+
final DlqObject.Builder dlqObjectBuilder = mock(DlqObject.Builder.class);
388+
when(dlqObjectBuilder.withEventHandle(eventHandle)).thenReturn(dlqObjectBuilder);
389+
when(dlqObjectBuilder.withFailedData(any(FailedDlqData.class))).thenReturn(dlqObjectBuilder);
390+
when(dlqObjectBuilder.withPluginName(pluginSetting.getName())).thenReturn(dlqObjectBuilder);
391+
when(dlqObjectBuilder.withPluginId(pluginSetting.getName())).thenReturn(dlqObjectBuilder);
392+
when(dlqObjectBuilder.withPipelineName(pipelineDescription.getPipelineName())).thenReturn(dlqObjectBuilder);
393+
when(dlqObject.getFailedData()).thenReturn(mock(FailedDlqData.class));
394+
doNothing().when(dlqObject).releaseEventHandle(false);
395+
when(dlqObjectBuilder.build()).thenReturn(dlqObject);
396+
397+
try (final MockedStatic<DocumentBuilder> documentBuilderMockedStatic = mockStatic(DocumentBuilder.class);
398+
final MockedStatic<DlqObject> dlqObjectMockedStatic = mockStatic(DlqObject.class)) {
399+
documentBuilderMockedStatic.when(() -> DocumentBuilder.build(eq(event), eq(null), eq(null), eq(null), eq(null)))
400+
.thenReturn(UUID.randomUUID().toString());
401+
dlqObjectMockedStatic.when(DlqObject::builder).thenReturn(dlqObjectBuilder);
402+
objectUnderTest.doOutput(List.of(eventRecord));
403+
}
404+
405+
verify(dynamicDocumentVersionDroppedEvents).increment();
406+
verify(event, times(0)).getJsonNode();
407+
}
408+
409+
private static Stream<Arguments> invalidVersionExceptionProvider() {
410+
return Stream.of(
411+
Arguments.of(NumberFormatException.class),
412+
Arguments.of(RuntimeException.class)
413+
);
414+
}
415+
356416
@Test
357417
void test_routing_field_in_document() throws IOException {
358418
String routingFieldKey = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)