Skip to content

Commit 08166df

Browse files
kkondakasimonelbaz
authored andcommitted
Add forward_to support to opensearch sink (opensearch-project#6349)
* Add forward_to support to opensearch sink Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added integration test Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 655fb17 commit 08166df

11 files changed

Lines changed: 237 additions & 46 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.model.configuration;
77

8+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
89
import com.fasterxml.jackson.annotation.JsonCreator;
910
import com.fasterxml.jackson.annotation.JsonProperty;
1011

@@ -30,6 +31,9 @@ public SinkForwardConfig(
3031
@JsonProperty("pipelines") final List<String> pipelineNames,
3132
@JsonProperty("with_data") final Map<String, Object> withData,
3233
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
34+
if (pipelineNames.size() != 1) {
35+
throw new InvalidPluginConfigurationException("Supports only one forwarding pipeline");
36+
}
3337
this.pipelineNames = pipelineNames;
3438
this.withData = withData;
3539
this.withMetadata = withMetadata;
@@ -46,5 +50,6 @@ public Map<String, Object> getWithMetadata() {
4650
public Map<String, Object> getWithData() {
4751
return withData;
4852
}
53+
4954
}
5055

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecords
9797
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
9898
entry.getValue().sendEvents(records);
9999
}
100+
sinkForwardRecordsContext.clearRecords();
100101
return true;
101102
}
102103

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.opensearch.dataprepper.model.record.Record;
99
import org.opensearch.dataprepper.model.event.Event;
10-
import org.opensearch.dataprepper.model.event.InternalEventHandle;
1110

1211
import java.util.ArrayList;
1312
import java.util.Collection;
@@ -25,28 +24,21 @@ public SinkForwardRecordsContext(SinkContext sinkContext) {
2524
public void addRecord(Record<Event> record) {
2625
if (!forwardPipelinesPresent)
2726
return;
28-
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
29-
if (eventHandle != null) {
30-
eventHandle.acquireReference();
31-
}
3227
records.add(record);
3328
}
3429

3530
public void addRecords(Collection<Record<Event>> newRecords) {
3631
if (!forwardPipelinesPresent)
3732
return;
38-
newRecords.forEach((record) -> {
39-
Event event = record.getData();
40-
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();
41-
if (eventHandle != null) {
42-
eventHandle.acquireReference();
43-
}
44-
});
4533
records.addAll(newRecords);
4634
}
4735

4836
public List<Record<Event>> getRecords() {
4937
return records;
5038
}
39+
40+
public void clearRecords() {
41+
records.clear();
42+
}
5143
}
5244

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
package org.opensearch.dataprepper.model.configuration;
77

8+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
89
import org.junit.jupiter.api.Test;
910

1011
import static org.hamcrest.CoreMatchers.equalTo;
1112
import static org.hamcrest.CoreMatchers.nullValue;
1213
import static org.mockito.Mockito.mock;
1314
import static org.hamcrest.MatcherAssert.assertThat;
15+
import static org.junit.jupiter.api.Assertions.assertThrows;
1416

1517
import java.util.List;
1618
import java.util.Map;
@@ -26,14 +28,27 @@ void testDefaults() {
2628
}
2729

2830
@Test
29-
void testCustomValues() {
30-
List<String> pipelines = mock(List.class);
31+
void pipelines_lsit_with_one_pipeline_succeeds() {
32+
List<String> pipelines = List.of("pipeline1");
3133
Map<String, Object> withData = mock(Map.class);
3234
Map<String, Object> withMetadata = mock(Map.class);
3335
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
3436
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
3537
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
3638
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
3739
}
40+
41+
@Test
42+
void pipelines_list_with_two_or_more_pipelines_throws_exception() {
43+
List<String> pipelines = List.of("pipeline1", "pipeline2");
44+
Map<String, Object> withData = mock(Map.class);
45+
Map<String, Object> withMetadata = mock(Map.class);
46+
assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(pipelines, withData, withMetadata));
47+
}
48+
49+
@Test
50+
void empty_pipelines_list_throws_exception() {
51+
assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(List.of(), Map.of(), Map.of()));
52+
}
3853
}
3954

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,21 @@ public void testForwardToPipelinesWithPipelineMap() {
9393
verify(forwardPipeline2, times(0)).sendEvents(eq(records));
9494
sinkForwardRecordsContext.addRecords(records);
9595
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
96-
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
97-
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
96+
verify(forwardPipeline1, times(1)).sendEvents(any());
97+
verify(forwardPipeline2, times(1)).sendEvents(any());
9898
verify(event, times(1)).put(any(String.class), any(Object.class));
9999
verify(event, times(1)).getMetadata();
100100
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
101+
records = Collections.singletonList(record);
102+
sinkForwardRecordsContext.addRecords(records);
101103
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, null), equalTo(true));
102-
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
103-
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
104+
verify(forwardPipeline1, times(2)).sendEvents(any());
105+
verify(forwardPipeline2, times(2)).sendEvents(any());
106+
records = Collections.singletonList(record);
107+
sinkForwardRecordsContext.addRecords(records);
104108
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(true));
105-
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
106-
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
109+
verify(forwardPipeline1, times(3)).sendEvents(any());
110+
verify(forwardPipeline2, times(3)).sendEvents(any());
107111
}
108112

109113
@Test
@@ -148,6 +152,7 @@ public void testWithNoForwardToPipelines() {
148152
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
149153
sinkForwardRecordsContext.addRecords(List.of(record));
150154
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false));
155+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
151156
}
152157
}
153158

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
import org.opensearch.dataprepper.model.record.Record;
1111
import org.opensearch.dataprepper.model.event.Event;
12-
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
1312
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1413

1514
import static org.mockito.Mockito.mock;
1615
import static org.hamcrest.MatcherAssert.assertThat;
1716
import static org.hamcrest.Matchers.equalTo;
18-
import static org.mockito.Mockito.doNothing;
1917
import static org.mockito.Mockito.when;
2018

2119
import java.util.List;
@@ -30,16 +28,9 @@ public void testSinkForwardRecordContextBasic() {
3028
SinkContext sinkContext = mock(SinkContext.class);
3129
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
3230
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
33-
Event event = mock(Event.class);
34-
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
35-
doNothing().when(eventHandle).acquireReference();
3631
Record<Event> record1 = mock(Record.class);
3732
Record<Event> record2 = mock(Record.class);
3833
Record<Event> record3 = mock(Record.class);
39-
when(record1.getData()).thenReturn(event);
40-
when(record2.getData()).thenReturn(event);
41-
when(record3.getData()).thenReturn(event);
42-
when(event.getEventHandle()).thenReturn(eventHandle);
4334
sinkForwardRecordsContext.addRecord(record1);
4435
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
4536
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
@@ -52,19 +43,28 @@ public void testSinkForwardRecordContextWithForwardingPipelines() {
5243
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
5344
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
5445
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
55-
Event event = mock(Event.class);
56-
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
57-
doNothing().when(eventHandle).acquireReference();
5846
Record<Event> record1 = mock(Record.class);
5947
Record<Event> record2 = mock(Record.class);
6048
Record<Event> record3 = mock(Record.class);
61-
when(record1.getData()).thenReturn(event);
62-
when(record2.getData()).thenReturn(event);
63-
when(record3.getData()).thenReturn(event);
64-
when(event.getEventHandle()).thenReturn(eventHandle);
6549
sinkForwardRecordsContext.addRecord(record1);
6650
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
6751
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
6852
assertThat(records.size(), equalTo(3));
53+
sinkForwardRecordsContext.clearRecords();
54+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
55+
}
56+
57+
@Test
58+
public void testSinkForwardRecordContextClearRecords() {
59+
SinkContext sinkContext = mock(SinkContext.class);
60+
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
61+
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
62+
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
63+
Record<Event> record1 = mock(Record.class);
64+
Record<Event> record2 = mock(Record.class);
65+
sinkForwardRecordsContext.addRecords(List.of(record1, record2));
66+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(2));
67+
sinkForwardRecordsContext.clearRecords();
68+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
6969
}
7070
}

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
3939
import org.opensearch.dataprepper.metrics.MetricNames;
4040
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
41+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
4142
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
4243
import org.opensearch.dataprepper.model.configuration.PluginSetting;
4344
import org.opensearch.dataprepper.model.event.Event;
@@ -94,6 +95,7 @@
9495
import static org.junit.jupiter.api.Assertions.assertTrue;
9596
import static org.junit.jupiter.params.provider.Arguments.arguments;
9697
import static org.mockito.ArgumentMatchers.any;
98+
import static org.mockito.ArgumentMatchers.eq;
9799
import static org.mockito.Mockito.mock;
98100
import static org.mockito.Mockito.verify;
99101
import static org.mockito.Mockito.when;
@@ -151,21 +153,25 @@ public class OpenSearchSinkIT {
151153
private PluginConfigObservable pluginConfigObservable;
152154

153155
public OpenSearchSink createObjectUnderTest(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) {
156+
sinkContext = mock(SinkContext.class);
157+
when(sinkContext.getTagsTargetKey()).thenReturn(null);
158+
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
154159
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
155160
when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME);
156161
when(pluginSetting.getName()).thenReturn(PLUGIN_NAME);
157162
OpenSearchSink sink = new OpenSearchSink(
158-
pluginSetting, null, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig);
163+
pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig);
159164
if (doInitialize) {
160165
sink.doInitialize();
161166
}
162167
return sink;
163168
}
164169

165-
public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) {
170+
public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, final Map<String, HeadlessPipeline> forwardPipelineMap, boolean doInitialize) {
166171
sinkContext = mock(SinkContext.class);
167172
testTagsTargetKey = RandomStringUtils.randomAlphabetic(5);
168173
when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey);
174+
when(sinkContext.getForwardToPipelines()).thenReturn(forwardPipelineMap);
169175
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
170176
when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME);
171177
when(pluginSetting.getName()).thenReturn(PLUGIN_NAME);
@@ -846,6 +852,36 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
846852
}
847853
}
848854

855+
@Test
856+
public void testOutputForwardsCreatedDocumentsToAPipeline() throws IOException, InterruptedException {
857+
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
858+
Map<String, HeadlessPipeline> forwardPipelineMap = Map.of("fwd_pipeline1", forwardPipeline1);
859+
final String testIndexAlias = "test-alias";
860+
final String testTemplateFile = Objects.requireNonNull(
861+
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
862+
final String testIdField = "someId";
863+
final String testId = "foo";
864+
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
865+
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
866+
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
867+
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
868+
final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, forwardPipelineMap, true);
869+
sink.output(testRecords);
870+
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
871+
assertThat(retSources.size(), equalTo(1));
872+
assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
873+
sink.shutdown();
874+
875+
// verify metrics
876+
final List<Measurement> bulkRequestLatencies = MetricsTestUtil.getMeasurementList(
877+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
878+
.add(OpenSearchSink.BULKREQUEST_LATENCY).toString());
879+
assertThat(bulkRequestLatencies.size(), equalTo(3));
880+
// COUNT
881+
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);
882+
verify(sinkContext).forwardRecords(any(), eq(null), eq(null));
883+
}
884+
849885
@Test
850886
public void testOutputCustomIndex() throws IOException, InterruptedException {
851887
final String testIndexAlias = "test-alias";
@@ -1255,7 +1291,7 @@ public void testEventOutputWithTags() throws IOException, InterruptedException {
12551291
final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));
12561292

12571293
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
1258-
final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, true);
1294+
final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, Map.of(), true);
12591295
sink.output(testRecords);
12601296

12611297
final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);

0 commit comments

Comments
 (0)