Skip to content

Commit d666fbf

Browse files
committed
Add support for forwarding successful records from sinks using SinkContext
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent d1527e5 commit d666fbf

16 files changed

Lines changed: 398 additions & 45 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@
3131
public class SinkModel extends PluginModel {
3232

3333
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
34-
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings));
34+
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, null, pluginSettings));
35+
}
36+
37+
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final List<String> forwardToPipelineNames, final Map<String, Object> pluginSettings) {
38+
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, pluginSettings));
3539
}
3640

3741
private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
@@ -56,6 +60,9 @@ public List<String> getExcludeKeys() {
5660
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
5761
}
5862

63+
public List<String> getForwardPipelineNames() {
64+
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardToPipelineNames;
65+
}
5966

6067
/**
6168
* Gets the tags target key associated with this Sink.
@@ -75,17 +82,19 @@ public static class SinkModelBuilder {
7582

7683
private final List<String> includeKeys;
7784
private final List<String> excludeKeys;
85+
private final List<String> forwardToPipelineNames;
7886

7987
private SinkModelBuilder(final PluginModel pluginModel) {
8088
this.pluginModel = pluginModel;
8189
this.routes = Collections.emptyList();
8290
this.tagsTargetKey = null;
8391
this.includeKeys = Collections.emptyList();
8492
this.excludeKeys = Collections.emptyList();
93+
this.forwardToPipelineNames = Collections.emptyList();
8594
}
8695

8796
public SinkModel build() {
88-
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings());
97+
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, pluginModel.getPluginSettings());
8998
}
9099
}
91100

@@ -111,16 +120,21 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
111120
@JsonProperty("exclude_keys")
112121
private final List<String> excludeKeys;
113122

123+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
124+
@JsonProperty("forward_to")
125+
private final List<String> forwardToPipelineNames;
126+
114127
@JsonCreator
115-
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys) {
116-
this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>());
128+
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final List<String> forwardToPipelineNames) {
129+
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, new HashMap<>());
117130
}
118131

119-
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
132+
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final List<String> forwardToPipelineNames, final Map<String, Object> pluginSettings) {
120133
super(pluginSettings);
121134
this.routes = routes != null ? routes : Collections.emptyList();
122135
this.includeKeys = includeKeys != null ? validateKeys(includeKeys, "include_keys") : Collections.emptyList();
123136
this.excludeKeys = excludeKeys != null ? validateKeys(excludeKeys, "exclude_keys") : Collections.emptyList();
137+
this.forwardToPipelineNames = forwardToPipelineNames != null ? forwardToPipelineNames : Collections.emptyList();
124138
this.tagsTargetKey = tagsTargetKey;
125139
validateConfiguration();
126140
}
@@ -146,7 +160,7 @@ private static List<String> validateKeys(List<String> input, String tag) {
146160

147161
static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
148162
SinkModelDeserializer() {
149-
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null));
163+
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null, null));
150164
}
151165
}
152-
}
166+
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@
55

66
package org.opensearch.dataprepper.model.sink;
77

8+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
import org.opensearch.dataprepper.model.event.Event;
11+
812
import java.util.Collection;
13+
import java.util.HashMap;
914
import java.util.List;
15+
import java.util.Map;
1016

1117
/**
1218
* Data Prepper Sink Context class. This the class for keeping global
@@ -15,20 +21,62 @@
1521
public class SinkContext {
1622
private final String tagsTargetKey;
1723
private final Collection<String> routes;
18-
1924
private final List<String> includeKeys;
2025
private final List<String> excludeKeys;
21-
26+
private Map<String, HeadlessPipeline> forwardToPipelines;
2227

2328
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys) {
2429
this.tagsTargetKey = tagsTargetKey;
2530
this.routes = routes;
2631
this.includeKeys = includeKeys;
2732
this.excludeKeys = excludeKeys;
33+
this.forwardToPipelines = new HashMap<>();
34+
}
35+
36+
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, List<String> forwardPipelineNames) {
37+
this.tagsTargetKey = tagsTargetKey;
38+
this.routes = routes;
39+
this.includeKeys = includeKeys;
40+
this.excludeKeys = excludeKeys;
41+
this.forwardToPipelines = new HashMap<>();
42+
if (forwardPipelineNames != null) {
43+
for (final String forwardPipelineName: forwardPipelineNames) {
44+
this.forwardToPipelines.put(forwardPipelineName, null);
45+
}
46+
}
2847
}
2948

3049
public SinkContext(String tagsTargetKey) {
31-
this(tagsTargetKey, null, null, null);
50+
this(tagsTargetKey, null, null, null, null);
51+
}
52+
53+
public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines) {
54+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
55+
final String key = entry.getKey();
56+
final HeadlessPipeline pipeline = pipelines.get(key);
57+
if (pipeline != null) {
58+
forwardToPipelines.put(key, pipeline);
59+
} else {
60+
throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key));
61+
}
62+
}
63+
}
64+
65+
public boolean forwardRecords(final Collection<Record<Event>> records) {
66+
if (forwardToPipelines.size() == 0) {
67+
return false;
68+
}
69+
70+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
71+
if (entry.getValue() == null) {
72+
return false;
73+
}
74+
}
75+
76+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
77+
entry.getValue().sendEvents(records);
78+
}
79+
return true;
3280
}
3381

3482
/**
@@ -56,5 +104,9 @@ public List<String> getIncludeKeys() {
56104
public List<String> getExcludeKeys() {
57105
return excludeKeys;
58106
}
107+
108+
public Map<String, HeadlessPipeline> getForwardToPipelines() {
109+
return forwardToPipelines;
110+
}
59111
}
60112

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ void sinkModel_with_exclude_keys() {
170170

171171
}
172172

173+
@Test
174+
void sinkModel_with_forward_pipelines() {
175+
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
176+
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), Arrays.asList("forward-pipeline1", "forward-pipeline2"), pluginSettings);
177+
178+
assertThat(sinkModel.getForwardPipelineNames(), equalTo(Arrays.asList("forward-pipeline1", "forward-pipeline2")));
179+
}
180+
173181
@Test
174182
void sinkModel_with_invalid_exclude_keys() {
175183
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
@@ -213,12 +221,13 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
213221
assertThat(actualSinkModel.getExcludeKeys(), notNullValue());
214222
assertThat(actualSinkModel.getExcludeKeys(), empty());
215223
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
216-
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
224+
assertThat(actualSinkModel.getForwardPipelineNames(), notNullValue());
225+
assertThat(actualSinkModel.getForwardPipelineNames(), empty());
217226

218227
}
219228
}
220229

221230
private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
222231
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
223232
}
224-
}
233+
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,22 @@
88
import org.apache.commons.lang3.RandomStringUtils;
99
import org.junit.jupiter.api.Test;
1010

11-
import java.util.Collections;
12-
import java.util.List;
11+
import org.opensearch.dataprepper.model.record.Record;
12+
import org.opensearch.dataprepper.model.event.Event;
13+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1314

15+
import static org.junit.jupiter.api.Assertions.assertThrows;
1416
import static org.hamcrest.MatcherAssert.assertThat;
1517
import static org.hamcrest.Matchers.equalTo;
18+
import static org.mockito.Mockito.mock;
19+
import static org.mockito.ArgumentMatchers.eq;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
1622

23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Map;
1727

1828
public class SinkContextTest {
1929
private SinkContext sinkContext;
@@ -24,11 +34,13 @@ public void testSinkContextBasic() {
2434
final List<String> testRoutes = Collections.emptyList();
2535
final List<String> testIncludeKeys = Collections.emptyList();
2636
final List<String> testExcludeKeys = Collections.emptyList();
27-
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys);
37+
final List<String> testForwardToPipelineNames = Collections.emptyList();
38+
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
2839
assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
2940
assertThat(sinkContext.getRoutes(), equalTo(testRoutes));
3041
assertThat(sinkContext.getIncludeKeys(), equalTo(testIncludeKeys));
3142
assertThat(sinkContext.getExcludeKeys(), equalTo(testExcludeKeys));
43+
assertThat(sinkContext.getForwardToPipelines(), equalTo(Collections.emptyMap()));
3244

3345
}
3446

@@ -43,5 +55,56 @@ public void testSinkContextWithTagsOnly() {
4355

4456
}
4557

58+
@Test
59+
public void testForwardToPipelinesWithPipelineMap() {
60+
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
61+
final List<String> testRoutes = Collections.emptyList();
62+
final List<String> testIncludeKeys = Collections.emptyList();
63+
final List<String> testExcludeKeys = Collections.emptyList();
64+
final List<String> testForwardToPipelineNames = List.of("forward-pipeline1", "forward-pipeline2");
65+
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
66+
Map<String, HeadlessPipeline> forwardToPipelines = sinkContext.getForwardToPipelines();
67+
assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(null));
68+
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(null));
69+
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
70+
HeadlessPipeline forwardPipeline2 = mock(HeadlessPipeline.class);
71+
sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1, "forward-pipeline2", forwardPipeline2));
72+
forwardToPipelines = sinkContext.getForwardToPipelines();
73+
assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(forwardPipeline1));
74+
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(forwardPipeline2));
75+
Collection<Record<Event>> records = mock(Collection.class);
76+
assertThat(sinkContext.forwardRecords(records), equalTo(true));
77+
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
78+
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
79+
}
80+
81+
@Test
82+
public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
83+
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
84+
final List<String> testRoutes = Collections.emptyList();
85+
final List<String> testIncludeKeys = Collections.emptyList();
86+
final List<String> testExcludeKeys = Collections.emptyList();
87+
final List<String> testForwardToPipelineNames = List.of("forward-pipeline1", "forward-pipeline2");
88+
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
89+
Map<String, HeadlessPipeline> forwardToPipelines = sinkContext.getForwardToPipelines();
90+
assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(null));
91+
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(null));
92+
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
93+
assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1)));
94+
Collection<Record<Event>> records = mock(Collection.class);
95+
assertThat(sinkContext.forwardRecords(records), equalTo(false));
96+
}
97+
98+
@Test
99+
public void testWithNoForwardToPipelines() {
100+
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
101+
final List<String> testRoutes = Collections.emptyList();
102+
final List<String> testIncludeKeys = Collections.emptyList();
103+
final List<String> testExcludeKeys = Collections.emptyList();
104+
final List<String> testForwardToPipelineNames = Collections.emptyList();
105+
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
106+
Collection<Record<Event>> records = mock(Collection.class);
107+
assertThat(sinkContext.forwardRecords(records), equalTo(false));
108+
}
46109
}
47110

0 commit comments

Comments
 (0)