Skip to content

Commit 16d7f5d

Browse files
authored
Fix dlqPipeline functionality broken by PR 6349 (#6678)
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent da35804 commit 16d7f5d

2 files changed

Lines changed: 64 additions & 2 deletions

File tree

  • data-prepper-plugins/opensearch/src

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.ArrayList;
7272
import java.util.Arrays;
7373
import java.util.Collections;
74+
import java.util.Collection;
7475
import java.util.Date;
7576
import java.util.HashMap;
7677
import java.util.List;
@@ -103,6 +104,7 @@
103104
import static org.mockito.Mockito.mock;
104105
import static org.mockito.Mockito.verify;
105106
import static org.mockito.Mockito.when;
107+
import org.mockito.ArgumentCaptor;
106108
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
107109
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
108110
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts;
@@ -173,6 +175,23 @@ private OpenSearchSink createObjectUnderTest(OpenSearchSinkConfig openSearchSink
173175
return sink;
174176
}
175177

178+
private OpenSearchSink createObjectUnderTestWithDlqPipeline(OpenSearchSinkConfig openSearchSinkConfig, HeadlessPipeline dlqPipeline, boolean doInitialize) {
179+
sinkContext = mock(SinkContext.class);
180+
when(sinkContext.getTagsTargetKey()).thenReturn(null);
181+
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
182+
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
183+
when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME);
184+
when(pluginSetting.getName()).thenReturn(PLUGIN_NAME);
185+
OpenSearchSink sink = new OpenSearchSink(
186+
pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig);
187+
sink.setFailurePipeline(dlqPipeline);
188+
if (doInitialize) {
189+
sink.doInitialize();
190+
}
191+
sinksToShutdown.add(sink);
192+
return sink;
193+
}
194+
176195
private OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, final Map<String, HeadlessPipeline> forwardPipelineMap, boolean doInitialize) {
177196
sinkContext = mock(SinkContext.class);
178197
testTagsTargetKey = RandomStringUtils.randomAlphabetic(5);
@@ -906,6 +925,47 @@ void testOutputForwardsCreatedDocumentsToAPipeline() throws IOException, Interru
906925
verify(sinkContext).forwardRecords(any(), eq(null), eq(null));
907926
}
908927

928+
@Test
929+
@Timeout(value = 50, unit = TimeUnit.SECONDS)
930+
void testOutputFailedDocumentsToDLQPipeline() throws IOException, InterruptedException {
931+
HeadlessPipeline dlqPipeline = mock(HeadlessPipeline.class);
932+
final String testIndexAlias = "test-alias";
933+
final String testTemplateFile = Objects.requireNonNull(
934+
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
935+
final String testIdField = "someId";
936+
final String testId = "foo";
937+
final String testId2 = "foo2";
938+
final Record<Event> testRecord1 = new Record(JacksonEvent.builder()
939+
.withEventType(EventType.TRACE.toString())
940+
.withData(Map.of(testIdField, testId, "name", "value")).build());
941+
final Record<Event> testRecord2 = new Record(JacksonEvent.builder()
942+
.withEventType(EventType.TRACE.toString())
943+
.withData(Map.of(testIdField, testId, "name", Map.of("key", "value"))).build());
944+
//List<Record<Event>> testRecords = Arrays.asList(testRecord1, testRecord2);
945+
List<Record<Event>> testRecords = Collections.singletonList(testRecord1);
946+
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
947+
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
948+
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
949+
final OpenSearchSink sink = createObjectUnderTestWithDlqPipeline(openSearchSinkConfig, dlqPipeline, true);
950+
sink.output(testRecords);
951+
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
952+
assertThat(retSources.size(), equalTo(1));
953+
assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
954+
955+
// verify metrics
956+
final List<Measurement> bulkRequestLatencies = MetricsTestUtil.getMeasurementList(
957+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
958+
.add(OpenSearchSink.BULKREQUEST_LATENCY).toString());
959+
assertThat(bulkRequestLatencies.size(), equalTo(3));
960+
// COUNT
961+
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);
962+
testRecords = Collections.singletonList(testRecord2);
963+
sink.output(testRecords);
964+
ArgumentCaptor<Collection<Record<Event>>> captor = ArgumentCaptor.forClass(Collection.class);
965+
verify(dlqPipeline).sendEvents(captor.capture());
966+
assertThat(captor.getValue().size(), equalTo(1));
967+
}
968+
909969
@Test
910970
@Timeout(value = 50, unit = TimeUnit.SECONDS)
911971
void testOutputCustomIndex() throws IOException, InterruptedException {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
157157
private volatile boolean initialized;
158158
private final SinkContext sinkContext;
159159
private final ExpressionEvaluator expressionEvaluator;
160-
private final boolean useEventInBulkOperation;
160+
private boolean useEventInBulkOperation;
161161

162162
private FailedBulkOperationConverter failedBulkOperationConverter;
163163
private DataStreamDetector dataStreamDetector;
@@ -191,7 +191,6 @@ public OpenSearchSink(final PluginSetting pluginSetting,
191191
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
192192
this.expressionEvaluator = expressionEvaluator;
193193
this.pipeline = pipelineDescription.getPipelineName();
194-
this.useEventInBulkOperation = (getFailurePipeline() != null || sinkContext.getForwardToPipelines().size() > 0);
195194
bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
196195
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);
197196
invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS);
@@ -234,6 +233,9 @@ public OpenSearchSink(final PluginSetting pluginSetting,
234233
public void doInitialize() {
235234
try {
236235
doInitializeInternal();
236+
// getFailurePipeline() does not return valid value in the constructor. Earliest it can be used
237+
// is in doInitialize()
238+
useEventInBulkOperation = (getFailurePipeline() != null || sinkContext.getForwardToPipelines().size() > 0);
237239
} catch (IOException e) {
238240
LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage());
239241
this.shutdown();

0 commit comments

Comments
 (0)