Skip to content

Commit 97103b3

Browse files
committed
Modified to addressed modified forward_to config
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent d666fbf commit 97103b3

6 files changed

Lines changed: 155 additions & 22 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.configuration;
7+
8+
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import com.fasterxml.jackson.annotation.JsonProperty;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
public class SinkForwardConfig {
15+
@JsonProperty("pipelines")
16+
List<String> pipelineNames;
17+
18+
@JsonProperty("with_metadata")
19+
Map<String, Object> withMetadata;
20+
21+
@JsonProperty("with_data")
22+
Map<String, Object> withData;
23+
24+
@JsonCreator
25+
public SinkForwardConfig() {
26+
}
27+
28+
@JsonCreator
29+
public SinkForwardConfig(
30+
@JsonProperty("pipelines") final List<String> pipelineNames,
31+
@JsonProperty("with_data") final Map<String, Object> withData,
32+
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
33+
this.pipelineNames = pipelineNames;
34+
this.withData = withData;
35+
this.withMetadata = withMetadata;
36+
}
37+
38+
public List<String> getPipelineNames() {
39+
return pipelineNames;
40+
}
41+
42+
public Map<String, Object> getWithMetadata() {
43+
return withMetadata;
44+
}
45+
46+
public Map<String, Object> getWithData() {
47+
return withData;
48+
}
49+
}
50+

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public SinkModel(final String pluginName, final List<String> routes, final Strin
3434
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, null, pluginSettings));
3535
}
3636

37-
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final List<String> forwardToPipelineNames, final Map<String, Object> pluginSettings) {
38-
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, pluginSettings));
37+
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
38+
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginSettings));
3939
}
4040

4141
private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
@@ -60,8 +60,8 @@ public List<String> getExcludeKeys() {
6060
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
6161
}
6262

63-
public List<String> getForwardPipelineNames() {
64-
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardToPipelineNames;
63+
public SinkForwardConfig getForwardConfig() {
64+
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardConfig;
6565
}
6666

6767
/**
@@ -82,19 +82,19 @@ public static class SinkModelBuilder {
8282

8383
private final List<String> includeKeys;
8484
private final List<String> excludeKeys;
85-
private final List<String> forwardToPipelineNames;
85+
private final SinkForwardConfig forwardConfig;
8686

8787
private SinkModelBuilder(final PluginModel pluginModel) {
8888
this.pluginModel = pluginModel;
8989
this.routes = Collections.emptyList();
9090
this.tagsTargetKey = null;
9191
this.includeKeys = Collections.emptyList();
9292
this.excludeKeys = Collections.emptyList();
93-
this.forwardToPipelineNames = Collections.emptyList();
93+
this.forwardConfig = null;
9494
}
9595

9696
public SinkModel build() {
97-
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, pluginModel.getPluginSettings());
97+
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginModel.getPluginSettings());
9898
}
9999
}
100100

@@ -122,19 +122,19 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
122122

123123
@JsonInclude(JsonInclude.Include.NON_EMPTY)
124124
@JsonProperty("forward_to")
125-
private final List<String> forwardToPipelineNames;
125+
private final SinkForwardConfig forwardConfig;
126126

127127
@JsonCreator
128-
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final List<String> forwardToPipelineNames) {
129-
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardToPipelineNames, new HashMap<>());
128+
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final SinkForwardConfig forwardConfig) {
129+
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, new HashMap<>());
130130
}
131131

132-
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final List<String> forwardToPipelineNames, final Map<String, Object> pluginSettings) {
132+
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
133133
super(pluginSettings);
134134
this.routes = routes != null ? routes : Collections.emptyList();
135135
this.includeKeys = includeKeys != null ? validateKeys(includeKeys, "include_keys") : Collections.emptyList();
136136
this.excludeKeys = excludeKeys != null ? validateKeys(excludeKeys, "exclude_keys") : Collections.emptyList();
137-
this.forwardToPipelineNames = forwardToPipelineNames != null ? forwardToPipelineNames : Collections.emptyList();
137+
this.forwardConfig = forwardConfig;
138138
this.tagsTargetKey = tagsTargetKey;
139139
validateConfiguration();
140140
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
99
import org.opensearch.dataprepper.model.record.Record;
1010
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.event.EventMetadata;
1112

1213
import java.util.Collection;
1314
import java.util.HashMap;
@@ -33,7 +34,7 @@ public SinkContext(String tagsTargetKey, Collection<String> routes, List<String>
3334
this.forwardToPipelines = new HashMap<>();
3435
}
3536

36-
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, List<String> forwardPipelineNames) {
37+
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, final List<String> forwardPipelineNames) {
3738
this.tagsTargetKey = tagsTargetKey;
3839
this.routes = routes;
3940
this.includeKeys = includeKeys;
@@ -62,7 +63,7 @@ public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines)
6263
}
6364
}
6465

65-
public boolean forwardRecords(final Collection<Record<Event>> records) {
66+
public boolean forwardRecords(final Collection<Record<Event>> records, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
6667
if (forwardToPipelines.size() == 0) {
6768
return false;
6869
}
@@ -73,6 +74,25 @@ public boolean forwardRecords(final Collection<Record<Event>> records) {
7374
}
7475
}
7576

77+
if (records == null) {
78+
return true;
79+
}
80+
81+
records.forEach((record) -> {
82+
Event event = record.getData();
83+
if (withData != null && !withData.isEmpty()) {
84+
for (Map.Entry<String, Object> entry: withData.entrySet()) {
85+
event.put(entry.getKey(), entry.getValue());
86+
}
87+
}
88+
if (withMetadata != null && !withMetadata.isEmpty()) {
89+
EventMetadata metadata = event.getMetadata();
90+
for (Map.Entry<String, Object> entry: withMetadata.entrySet()) {
91+
metadata.setAttribute(entry.getKey(), entry.getValue());
92+
}
93+
}
94+
});
95+
7696
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
7797
entry.getValue().sendEvents(records);
7898
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.configuration;
7+
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.hamcrest.CoreMatchers.equalTo;
11+
import static org.hamcrest.CoreMatchers.nullValue;
12+
import static org.mockito.Mockito.mock;
13+
import static org.hamcrest.MatcherAssert.assertThat;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class SinkForwardConfigTest {
19+
20+
@Test
21+
void testDefaults() {
22+
final SinkForwardConfig sinkForwardConfig = new SinkForwardConfig();
23+
assertThat(sinkForwardConfig.getPipelineNames(), nullValue());
24+
assertThat(sinkForwardConfig.getWithData(), nullValue());
25+
assertThat(sinkForwardConfig.getWithMetadata(), nullValue());
26+
}
27+
28+
@Test
29+
void testCustomValues() {
30+
List<String> pipelines = mock(List.class);
31+
Map<String, Object> withData = mock(Map.class);
32+
Map<String, Object> withMetadata = mock(Map.class);
33+
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
34+
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
35+
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
36+
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
37+
}
38+
}
39+

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,14 @@ void sinkModel_with_exclude_keys() {
172172

173173
@Test
174174
void sinkModel_with_forward_pipelines() {
175+
SinkForwardConfig sinkForwardConfig = mock(SinkForwardConfig.class);
176+
List<String> forwardPipelineList = List.of("forward-pipeline1", "forward-pipeline2");
177+
when(sinkForwardConfig.getPipelineNames()).thenReturn(forwardPipelineList);
175178
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
176-
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), Arrays.asList("forward-pipeline1", "forward-pipeline2"), pluginSettings);
179+
180+
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), sinkForwardConfig, pluginSettings);
177181

178-
assertThat(sinkModel.getForwardPipelineNames(), equalTo(Arrays.asList("forward-pipeline1", "forward-pipeline2")));
182+
assertThat(sinkModel.getForwardConfig().getPipelineNames(), equalTo(forwardPipelineList));
179183
}
180184

181185
@Test
@@ -221,8 +225,7 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
221225
assertThat(actualSinkModel.getExcludeKeys(), notNullValue());
222226
assertThat(actualSinkModel.getExcludeKeys(), empty());
223227
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
224-
assertThat(actualSinkModel.getForwardPipelineNames(), notNullValue());
225-
assertThat(actualSinkModel.getForwardPipelineNames(), empty());
228+
assertThat(actualSinkModel.getForwardConfig(), nullValue());
226229

227230
}
228231
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010

1111
import org.opensearch.dataprepper.model.record.Record;
1212
import org.opensearch.dataprepper.model.event.Event;
13+
import org.opensearch.dataprepper.model.event.EventMetadata;
1314
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1415

1516
import static org.junit.jupiter.api.Assertions.assertThrows;
1617
import static org.hamcrest.MatcherAssert.assertThat;
1718
import static org.hamcrest.Matchers.equalTo;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.ArgumentMatchers.eq;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.when;
2023
import static org.mockito.Mockito.times;
2124
import static org.mockito.Mockito.verify;
2225

@@ -72,10 +75,28 @@ public void testForwardToPipelinesWithPipelineMap() {
7275
forwardToPipelines = sinkContext.getForwardToPipelines();
7376
assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(forwardPipeline1));
7477
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(forwardPipeline2));
75-
Collection<Record<Event>> records = mock(Collection.class);
76-
assertThat(sinkContext.forwardRecords(records), equalTo(true));
78+
Record<Event> record = mock(Record.class);
79+
EventMetadata eventMetadata = mock(EventMetadata.class);
80+
Event event = mock(Event.class);
81+
when(record.getData()).thenReturn(event);
82+
when(event.getMetadata()).thenReturn(eventMetadata);
83+
Collection<Record<Event>> records = Collections.singletonList(record);
84+
85+
assertThat(sinkContext.forwardRecords(records, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
7786
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
7887
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
88+
verify(event, times(1)).put(any(String.class), any(Object.class));
89+
verify(event, times(1)).getMetadata();
90+
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
91+
assertThat(sinkContext.forwardRecords(records, null, null), equalTo(true));
92+
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
93+
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
94+
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(true));
95+
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
96+
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
97+
assertThat(sinkContext.forwardRecords(null, Map.of(), Map.of()), equalTo(true));
98+
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
99+
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
79100
}
80101

81102
@Test
@@ -92,7 +113,7 @@ public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
92113
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
93114
assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1)));
94115
Collection<Record<Event>> records = mock(Collection.class);
95-
assertThat(sinkContext.forwardRecords(records), equalTo(false));
116+
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(false));
96117
}
97118

98119
@Test
@@ -104,7 +125,7 @@ public void testWithNoForwardToPipelines() {
104125
final List<String> testForwardToPipelineNames = Collections.emptyList();
105126
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
106127
Collection<Record<Event>> records = mock(Collection.class);
107-
assertThat(sinkContext.forwardRecords(records), equalTo(false));
128+
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(false));
108129
}
109130
}
110131

0 commit comments

Comments
 (0)