Skip to content

Commit 3e71902

Browse files
authored
Add support for forwarding successful records from sinks using SinkContext (#5994)
* Add support for forwarding successful records from sinks using SinkContext Signed-off-by: Kondaka <krishkdk@amazon.com> * Modified to addressed modified forward_to config Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed build failures Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed build failures Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixing code to address failing tests Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments and fixed acknowledgements issue Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 06503f6 commit 3e71902

21 files changed

Lines changed: 731 additions & 39 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.configuration;
7+
8+
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import com.fasterxml.jackson.annotation.JsonProperty;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
public class SinkForwardConfig {
15+
@JsonProperty("pipelines")
16+
List<String> pipelineNames;
17+
18+
@JsonProperty("with_metadata")
19+
Map<String, Object> withMetadata;
20+
21+
@JsonProperty("with_data")
22+
Map<String, Object> withData;
23+
24+
@JsonCreator
25+
public SinkForwardConfig() {
26+
}
27+
28+
@JsonCreator
29+
public SinkForwardConfig(
30+
@JsonProperty("pipelines") final List<String> pipelineNames,
31+
@JsonProperty("with_data") final Map<String, Object> withData,
32+
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
33+
this.pipelineNames = pipelineNames;
34+
this.withData = withData;
35+
this.withMetadata = withMetadata;
36+
}
37+
38+
public List<String> getPipelineNames() {
39+
return pipelineNames;
40+
}
41+
42+
public Map<String, Object> getWithMetadata() {
43+
return withMetadata;
44+
}
45+
46+
public Map<String, Object> getWithData() {
47+
return withData;
48+
}
49+
}
50+

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@
3131
public class SinkModel extends PluginModel {
3232

3333
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
34-
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings));
34+
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, null, pluginSettings));
35+
}
36+
37+
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
38+
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginSettings));
3539
}
3640

3741
private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
@@ -56,6 +60,9 @@ public List<String> getExcludeKeys() {
5660
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
5761
}
5862

63+
public SinkForwardConfig getForwardConfig() {
64+
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardConfig;
65+
}
5966

6067
/**
6168
* Gets the tags target key associated with this Sink.
@@ -75,17 +82,19 @@ public static class SinkModelBuilder {
7582

7683
private final List<String> includeKeys;
7784
private final List<String> excludeKeys;
85+
private final SinkForwardConfig forwardConfig;
7886

7987
private SinkModelBuilder(final PluginModel pluginModel) {
8088
this.pluginModel = pluginModel;
8189
this.routes = Collections.emptyList();
8290
this.tagsTargetKey = null;
8391
this.includeKeys = Collections.emptyList();
8492
this.excludeKeys = Collections.emptyList();
93+
this.forwardConfig = null;
8594
}
8695

8796
public SinkModel build() {
88-
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings());
97+
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginModel.getPluginSettings());
8998
}
9099
}
91100

@@ -111,16 +120,21 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
111120
@JsonProperty("exclude_keys")
112121
private final List<String> excludeKeys;
113122

123+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
124+
@JsonProperty("forward_to")
125+
private final SinkForwardConfig forwardConfig;
126+
114127
@JsonCreator
115-
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys) {
116-
this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>());
128+
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final SinkForwardConfig forwardConfig) {
129+
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, new HashMap<>());
117130
}
118131

119-
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
132+
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
120133
super(pluginSettings);
121134
this.routes = routes != null ? routes : Collections.emptyList();
122135
this.includeKeys = includeKeys != null ? validateKeys(includeKeys, "include_keys") : Collections.emptyList();
123136
this.excludeKeys = excludeKeys != null ? validateKeys(excludeKeys, "exclude_keys") : Collections.emptyList();
137+
this.forwardConfig = forwardConfig;
124138
this.tagsTargetKey = tagsTargetKey;
125139
validateConfiguration();
126140
}
@@ -146,7 +160,7 @@ private static List<String> validateKeys(List<String> input, String tag) {
146160

147161
static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
148162
SinkModelDeserializer() {
149-
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null));
163+
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null, null));
150164
}
151165
}
152-
}
166+
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,15 @@
55

66
package org.opensearch.dataprepper.model.sink;
77

8+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.event.EventMetadata;
12+
813
import java.util.Collection;
14+
import java.util.HashMap;
915
import java.util.List;
16+
import java.util.Map;
1017

1118
/**
1219
* Data Prepper Sink Context class. This the class for keeping global
@@ -15,20 +22,82 @@
1522
public class SinkContext {
1623
private final String tagsTargetKey;
1724
private final Collection<String> routes;
18-
1925
private final List<String> includeKeys;
2026
private final List<String> excludeKeys;
21-
27+
private Map<String, HeadlessPipeline> forwardToPipelines;
2228

2329
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys) {
2430
this.tagsTargetKey = tagsTargetKey;
2531
this.routes = routes;
2632
this.includeKeys = includeKeys;
2733
this.excludeKeys = excludeKeys;
34+
this.forwardToPipelines = new HashMap<>();
35+
}
36+
37+
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, final List<String> forwardPipelineNames) {
38+
this.tagsTargetKey = tagsTargetKey;
39+
this.routes = routes;
40+
this.includeKeys = includeKeys;
41+
this.excludeKeys = excludeKeys;
42+
this.forwardToPipelines = new HashMap<>();
43+
if (forwardPipelineNames != null) {
44+
for (final String forwardPipelineName: forwardPipelineNames) {
45+
this.forwardToPipelines.put(forwardPipelineName, null);
46+
}
47+
}
2848
}
2949

3050
public SinkContext(String tagsTargetKey) {
31-
this(tagsTargetKey, null, null, null);
51+
this(tagsTargetKey, null, null, null, null);
52+
}
53+
54+
public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines) {
55+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
56+
final String key = entry.getKey();
57+
final HeadlessPipeline pipeline = pipelines.get(key);
58+
if (pipeline != null) {
59+
forwardToPipelines.put(key, pipeline);
60+
} else {
61+
throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key));
62+
}
63+
}
64+
}
65+
66+
public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
67+
if (forwardToPipelines.size() == 0) {
68+
return false;
69+
}
70+
71+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
72+
if (entry.getValue() == null) {
73+
return false;
74+
}
75+
}
76+
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
77+
78+
if (records.size() == 0) {
79+
return true;
80+
}
81+
82+
records.forEach((record) -> {
83+
Event event = record.getData();
84+
if (withData != null && !withData.isEmpty()) {
85+
for (Map.Entry<String, Object> entry: withData.entrySet()) {
86+
event.put(entry.getKey(), entry.getValue());
87+
}
88+
}
89+
if (withMetadata != null && !withMetadata.isEmpty()) {
90+
EventMetadata metadata = event.getMetadata();
91+
for (Map.Entry<String, Object> entry: withMetadata.entrySet()) {
92+
metadata.setAttribute(entry.getKey(), entry.getValue());
93+
}
94+
}
95+
});
96+
97+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
98+
entry.getValue().sendEvents(records);
99+
}
100+
return true;
32101
}
33102

34103
/**
@@ -56,5 +125,9 @@ public List<String> getIncludeKeys() {
56125
public List<String> getExcludeKeys() {
57126
return excludeKeys;
58127
}
128+
129+
public Map<String, HeadlessPipeline> getForwardToPipelines() {
130+
return forwardToPipelines;
131+
}
59132
}
60133

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.sink;
7+
8+
import org.opensearch.dataprepper.model.record.Record;
9+
import org.opensearch.dataprepper.model.event.Event;
10+
import org.opensearch.dataprepper.model.event.InternalEventHandle;
11+
12+
import java.util.ArrayList;
13+
import java.util.Collection;
14+
import java.util.List;
15+
16+
public class SinkForwardRecordsContext {
17+
List<Record<Event>> records;
18+
boolean forwardPipelinesPresent;
19+
20+
public SinkForwardRecordsContext(SinkContext sinkContext) {
21+
forwardPipelinesPresent = (sinkContext != null && sinkContext.getForwardToPipelines().size() > 0);
22+
records = new ArrayList<>();
23+
}
24+
25+
public void addRecord(Record<Event> record) {
26+
if (!forwardPipelinesPresent)
27+
return;
28+
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
29+
if (eventHandle != null) {
30+
eventHandle.acquireReference();
31+
}
32+
records.add(record);
33+
}
34+
35+
public void addRecords(Collection<Record<Event>> newRecords) {
36+
if (!forwardPipelinesPresent)
37+
return;
38+
newRecords.forEach((record) -> {
39+
Event event = record.getData();
40+
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();
41+
if (eventHandle != null) {
42+
eventHandle.acquireReference();
43+
}
44+
});
45+
records.addAll(newRecords);
46+
}
47+
48+
public List<Record<Event>> getRecords() {
49+
return records;
50+
}
51+
}
52+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.configuration;
7+
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.hamcrest.CoreMatchers.equalTo;
11+
import static org.hamcrest.CoreMatchers.nullValue;
12+
import static org.mockito.Mockito.mock;
13+
import static org.hamcrest.MatcherAssert.assertThat;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class SinkForwardConfigTest {
19+
20+
@Test
21+
void testDefaults() {
22+
final SinkForwardConfig sinkForwardConfig = new SinkForwardConfig();
23+
assertThat(sinkForwardConfig.getPipelineNames(), nullValue());
24+
assertThat(sinkForwardConfig.getWithData(), nullValue());
25+
assertThat(sinkForwardConfig.getWithMetadata(), nullValue());
26+
}
27+
28+
@Test
29+
void testCustomValues() {
30+
List<String> pipelines = mock(List.class);
31+
Map<String, Object> withData = mock(Map.class);
32+
Map<String, Object> withMetadata = mock(Map.class);
33+
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
34+
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
35+
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
36+
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
37+
}
38+
}
39+

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ void sinkModel_with_exclude_keys() {
170170

171171
}
172172

173+
@Test
174+
void sinkModel_with_forward_pipelines() {
175+
SinkForwardConfig sinkForwardConfig = mock(SinkForwardConfig.class);
176+
List<String> forwardPipelineList = List.of("forward-pipeline1", "forward-pipeline2");
177+
when(sinkForwardConfig.getPipelineNames()).thenReturn(forwardPipelineList);
178+
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
179+
180+
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), sinkForwardConfig, pluginSettings);
181+
182+
assertThat(sinkModel.getForwardConfig().getPipelineNames(), equalTo(forwardPipelineList));
183+
}
184+
173185
@Test
174186
void sinkModel_with_invalid_exclude_keys() {
175187
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
@@ -213,12 +225,12 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
213225
assertThat(actualSinkModel.getExcludeKeys(), notNullValue());
214226
assertThat(actualSinkModel.getExcludeKeys(), empty());
215227
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
216-
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
228+
assertThat(actualSinkModel.getForwardConfig(), nullValue());
217229

218230
}
219231
}
220232

221233
private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
222234
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
223235
}
224-
}
236+
}

0 commit comments

Comments
 (0)