Skip to content

Commit 2ed73b9

Browse files
committed
Modified to addressed modified forward_to config
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 1963d6f commit 2ed73b9

6 files changed

Lines changed: 155 additions & 22 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.configuration;
7+
8+
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import com.fasterxml.jackson.annotation.JsonProperty;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
public class SinkForwardConfig {
15+
@JsonProperty("pipelines")
16+
List<String> pipelineNames;
17+
18+
@JsonProperty("with_metadata")
19+
Map<String, Object> withMetadata;
20+
21+
@JsonProperty("with_data")
22+
Map<String, Object> withData;
23+
24+
@JsonCreator
25+
public SinkForwardConfig() {
26+
}
27+
28+
@JsonCreator
29+
public SinkForwardConfig(
30+
@JsonProperty("pipelines") final List<String> pipelineNames,
31+
@JsonProperty("with_data") final Map<String, Object> withData,
32+
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
33+
this.pipelineNames = pipelineNames;
34+
this.withData = withData;
35+
this.withMetadata = withMetadata;
36+
}
37+
38+
public List<String> getPipelineNames() {
39+
return pipelineNames;
40+
}
41+
42+
public Map<String, Object> getWithMetadata() {
43+
return withMetadata;
44+
}
45+
46+
public Map<String, Object> getWithData() {
47+
return withData;
48+
}
49+
}
50+

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ public SinkModel(final String pluginName, final List<String> routes, final Strin
3333
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, null, pluginSettings));
3434
}
3535

36-
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final List<String> forwardToPipelineNames, final Map<String, Object> pluginSettings) {
37-
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, pluginSettings));
36+
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
37+
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginSettings));
3838
}
3939

4040
private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
@@ -59,8 +59,8 @@ public List<String> getExcludeKeys() {
5959
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
6060
}
6161

62-
public List<String> getForwardPipelineNames() {
63-
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardToPipelineNames;
62+
public SinkForwardConfig getForwardConfig() {
63+
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardConfig;
6464
}
6565

6666
/**
@@ -81,19 +81,19 @@ public static class SinkModelBuilder {
8181

8282
private final List<String> includeKeys;
8383
private final List<String> excludeKeys;
84-
private final List<String> forwardToPipelineNames;
84+
private final SinkForwardConfig forwardConfig;
8585

8686
private SinkModelBuilder(final PluginModel pluginModel) {
8787
this.pluginModel = pluginModel;
8888
this.routes = Collections.emptyList();
8989
this.tagsTargetKey = null;
9090
this.includeKeys = Collections.emptyList();
9191
this.excludeKeys = Collections.emptyList();
92-
this.forwardToPipelineNames = Collections.emptyList();
92+
this.forwardConfig = null;
9393
}
9494

9595
public SinkModel build() {
96-
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, pluginModel.getPluginSettings());
96+
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginModel.getPluginSettings());
9797
}
9898
}
9999

@@ -121,19 +121,19 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
121121

122122
@JsonInclude(JsonInclude.Include.NON_EMPTY)
123123
@JsonProperty("forward_to")
124-
private final List<String> forwardToPipelineNames;
124+
private final SinkForwardConfig forwardConfig;
125125

126126
@JsonCreator
127-
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final List<String> forwardToPipelineNames) {
128-
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, new HashMap<>());
127+
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final SinkForwardConfig forwardConfig) {
128+
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, new HashMap<>());
129129
}
130130

131-
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final List<String> forwardToPipelineNames, final Map<String, Object> pluginSettings) {
131+
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
132132
super(pluginSettings);
133133
this.routes = routes != null ? routes : Collections.emptyList();
134134
this.includeKeys = includeKeys != null ? includeKeys : Collections.emptyList();
135135
this.excludeKeys = excludeKeys != null ? excludeKeys : Collections.emptyList();
136-
this.forwardToPipelineNames = forwardToPipelineNames != null ? forwardToPipelineNames : Collections.emptyList();
136+
this.forwardConfig = forwardConfig;
137137
this.tagsTargetKey = tagsTargetKey;
138138
validateConfiguration();
139139
validateKeys();

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
99
import org.opensearch.dataprepper.model.record.Record;
1010
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.event.EventMetadata;
1112

1213
import java.util.Collection;
1314
import java.util.HashMap;
@@ -33,7 +34,7 @@ public SinkContext(String tagsTargetKey, Collection<String> routes, List<String>
3334
this.forwardToPipelines = new HashMap<>();
3435
}
3536

36-
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, List<String> forwardPipelineNames) {
37+
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, final List<String> forwardPipelineNames) {
3738
this.tagsTargetKey = tagsTargetKey;
3839
this.routes = routes;
3940
this.includeKeys = includeKeys;
@@ -62,7 +63,7 @@ public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines)
6263
}
6364
}
6465

65-
public boolean forwardRecords(final Collection<Record<Event>> records) {
66+
public boolean forwardRecords(final Collection<Record<Event>> records, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
6667
if (forwardToPipelines.size() == 0) {
6768
return false;
6869
}
@@ -73,6 +74,25 @@ public boolean forwardRecords(final Collection<Record<Event>> records) {
7374
}
7475
}
7576

77+
if (records == null) {
78+
return true;
79+
}
80+
81+
records.forEach((record) -> {
82+
Event event = record.getData();
83+
if (withData != null && !withData.isEmpty()) {
84+
for (Map.Entry<String, Object> entry: withData.entrySet()) {
85+
event.put(entry.getKey(), entry.getValue());
86+
}
87+
}
88+
if (withMetadata != null && !withMetadata.isEmpty()) {
89+
EventMetadata metadata = event.getMetadata();
90+
for (Map.Entry<String, Object> entry: withMetadata.entrySet()) {
91+
metadata.setAttribute(entry.getKey(), entry.getValue());
92+
}
93+
}
94+
});
95+
7696
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
7797
entry.getValue().sendEvents(records);
7898
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.configuration;
7+
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.hamcrest.CoreMatchers.equalTo;
11+
import static org.hamcrest.CoreMatchers.nullValue;
12+
import static org.mockito.Mockito.mock;
13+
import static org.hamcrest.MatcherAssert.assertThat;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class SinkForwardConfigTest {
19+
20+
@Test
21+
void testDefaults() {
22+
final SinkForwardConfig sinkForwardConfig = new SinkForwardConfig();
23+
assertThat(sinkForwardConfig.getPipelineNames(), nullValue());
24+
assertThat(sinkForwardConfig.getWithData(), nullValue());
25+
assertThat(sinkForwardConfig.getWithMetadata(), nullValue());
26+
}
27+
28+
@Test
29+
void testCustomValues() {
30+
List<String> pipelines = mock(List.class);
31+
Map<String, Object> withData = mock(Map.class);
32+
Map<String, Object> withMetadata = mock(Map.class);
33+
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
34+
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
35+
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
36+
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
37+
}
38+
}
39+

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,14 @@ void sinkModel_with_exclude_keys() {
171171

172172
@Test
173173
void sinkModel_with_forward_pipelines() {
174+
SinkForwardConfig sinkForwardConfig = mock(SinkForwardConfig.class);
175+
List<String> forwardPipelineList = List.of("forward-pipeline1", "forward-pipeline2");
176+
when(sinkForwardConfig.getPipelineNames()).thenReturn(forwardPipelineList);
174177
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
175-
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), Arrays.asList("forward-pipeline1", "forward-pipeline2"), pluginSettings);
178+
179+
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), sinkForwardConfig, pluginSettings);
176180

177-
assertThat(sinkModel.getForwardPipelineNames(), equalTo(Arrays.asList("forward-pipeline1", "forward-pipeline2")));
181+
assertThat(sinkModel.getForwardConfig().getPipelineNames(), equalTo(forwardPipelineList));
178182
}
179183

180184
@Test
@@ -220,8 +224,7 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
220224
assertThat(actualSinkModel.getExcludeKeys(), notNullValue());
221225
assertThat(actualSinkModel.getExcludeKeys(), empty());
222226
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
223-
assertThat(actualSinkModel.getForwardPipelineNames(), notNullValue());
224-
assertThat(actualSinkModel.getForwardPipelineNames(), empty());
227+
assertThat(actualSinkModel.getForwardConfig(), nullValue());
225228

226229
}
227230
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010

1111
import org.opensearch.dataprepper.model.record.Record;
1212
import org.opensearch.dataprepper.model.event.Event;
13+
import org.opensearch.dataprepper.model.event.EventMetadata;
1314
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1415

1516
import static org.junit.jupiter.api.Assertions.assertThrows;
1617
import static org.hamcrest.MatcherAssert.assertThat;
1718
import static org.hamcrest.Matchers.equalTo;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.ArgumentMatchers.eq;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.when;
2023
import static org.mockito.Mockito.times;
2124
import static org.mockito.Mockito.verify;
2225

@@ -72,10 +75,28 @@ public void testForwardToPipelinesWithPipelineMap() {
7275
forwardToPipelines = sinkContext.getForwardToPipelines();
7376
assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(forwardPipeline1));
7477
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(forwardPipeline2));
75-
Collection<Record<Event>> records = mock(Collection.class);
76-
assertThat(sinkContext.forwardRecords(records), equalTo(true));
78+
Record<Event> record = mock(Record.class);
79+
EventMetadata eventMetadata = mock(EventMetadata.class);
80+
Event event = mock(Event.class);
81+
when(record.getData()).thenReturn(event);
82+
when(event.getMetadata()).thenReturn(eventMetadata);
83+
Collection<Record<Event>> records = Collections.singletonList(record);
84+
85+
assertThat(sinkContext.forwardRecords(records, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
7786
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
7887
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
88+
verify(event, times(1)).put(any(String.class), any(Object.class));
89+
verify(event, times(1)).getMetadata();
90+
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
91+
assertThat(sinkContext.forwardRecords(records, null, null), equalTo(true));
92+
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
93+
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
94+
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(true));
95+
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
96+
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
97+
assertThat(sinkContext.forwardRecords(null, Map.of(), Map.of()), equalTo(true));
98+
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
99+
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
79100
}
80101

81102
@Test
@@ -92,7 +113,7 @@ public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
92113
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
93114
assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1)));
94115
Collection<Record<Event>> records = mock(Collection.class);
95-
assertThat(sinkContext.forwardRecords(records), equalTo(false));
116+
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(false));
96117
}
97118

98119
@Test
@@ -104,7 +125,7 @@ public void testWithNoForwardToPipelines() {
104125
final List<String> testForwardToPipelineNames = Collections.emptyList();
105126
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
106127
Collection<Record<Event>> records = mock(Collection.class);
107-
assertThat(sinkContext.forwardRecords(records), equalTo(false));
128+
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(false));
108129
}
109130
}
110131

0 commit comments

Comments
 (0)