diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java new file mode 100644 index 0000000000..68dbed2280 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Map; + +public class SinkForwardConfig { + @JsonProperty("pipelines") + List pipelineNames; + + @JsonProperty("with_metadata") + Map withMetadata; + + @JsonProperty("with_data") + Map withData; + + @JsonCreator + public SinkForwardConfig() { + } + + @JsonCreator + public SinkForwardConfig( + @JsonProperty("pipelines") final List pipelineNames, + @JsonProperty("with_data") final Map withData, + @JsonProperty("with_metadata") final Map withMetadata) { + this.pipelineNames = pipelineNames; + this.withData = withData; + this.withMetadata = withMetadata; + } + + public List getPipelineNames() { + return pipelineNames; + } + + public Map getWithMetadata() { + return withMetadata; + } + + public Map getWithData() { + return withData; + } +} + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java index 40f567f607..79512cefc6 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java @@ -31,7 +31,11 @@ public class SinkModel extends PluginModel { public SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) { - this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings)); + this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, null, pluginSettings)); + } + + public SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final SinkForwardConfig forwardConfig, final Map pluginSettings) { + this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginSettings)); } private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) { @@ -56,6 +60,9 @@ public List getExcludeKeys() { return this.getInternalJsonModel().excludeKeys; } + public SinkForwardConfig getForwardConfig() { + return this.getInternalJsonModel().forwardConfig; + } /** * Gets the tags target key associated with this Sink. @@ -75,6 +82,7 @@ public static class SinkModelBuilder { private final List includeKeys; private final List excludeKeys; + private final SinkForwardConfig forwardConfig; private SinkModelBuilder(final PluginModel pluginModel) { this.pluginModel = pluginModel; @@ -82,10 +90,11 @@ private SinkModelBuilder(final PluginModel pluginModel) { this.tagsTargetKey = null; this.includeKeys = Collections.emptyList(); this.excludeKeys = Collections.emptyList(); + this.forwardConfig = null; } public SinkModel build() { - return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings()); + return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginModel.getPluginSettings()); } } @@ -111,16 +120,21 @@ private static class SinkInternalJsonModel extends InternalJsonModel { @JsonProperty("exclude_keys") private final List excludeKeys; + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty("forward_to") + private final SinkForwardConfig forwardConfig; + @JsonCreator - private SinkInternalJsonModel(@JsonProperty("routes") final List routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List includeKeys, @JsonProperty("exclude_keys") final List excludeKeys) { - this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>()); + private SinkInternalJsonModel(@JsonProperty("routes") final List routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List includeKeys, @JsonProperty("exclude_keys") final List excludeKeys, @JsonProperty("forward_to") final SinkForwardConfig forwardConfig) { + this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, new HashMap<>()); } - private SinkInternalJsonModel(final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) { + private SinkInternalJsonModel(final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final SinkForwardConfig forwardConfig, final Map pluginSettings) { super(pluginSettings); this.routes = routes != null ? routes : Collections.emptyList(); this.includeKeys = includeKeys != null ? validateKeys(includeKeys, "include_keys") : Collections.emptyList(); this.excludeKeys = excludeKeys != null ? validateKeys(excludeKeys, "exclude_keys") : Collections.emptyList(); + this.forwardConfig = forwardConfig; this.tagsTargetKey = tagsTargetKey; validateConfiguration(); } @@ -146,7 +160,7 @@ private static List validateKeys(List input, String tag) { static class SinkModelDeserializer extends AbstractPluginModelDeserializer { SinkModelDeserializer() { - super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null)); + super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null, null)); } } -} \ No newline at end of file +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java index 85a8f49ea0..0d3b9485e1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java @@ -5,8 +5,15 @@ package org.opensearch.dataprepper.model.sink; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; + import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Data Prepper Sink Context class. This the class for keeping global @@ -15,20 +22,82 @@ public class SinkContext { private final String tagsTargetKey; private final Collection routes; - private final List includeKeys; private final List excludeKeys; - + private Map forwardToPipelines; public SinkContext(String tagsTargetKey, Collection routes, List includeKeys, List excludeKeys) { this.tagsTargetKey = tagsTargetKey; this.routes = routes; this.includeKeys = includeKeys; this.excludeKeys = excludeKeys; + this.forwardToPipelines = new HashMap<>(); + } + + public SinkContext(String tagsTargetKey, Collection routes, List includeKeys, List excludeKeys, final List forwardPipelineNames) { + this.tagsTargetKey = tagsTargetKey; + this.routes = routes; + this.includeKeys = includeKeys; + this.excludeKeys = excludeKeys; + this.forwardToPipelines = new HashMap<>(); + if (forwardPipelineNames != null) { + for (final String forwardPipelineName: forwardPipelineNames) { + this.forwardToPipelines.put(forwardPipelineName, null); + } + } } public SinkContext(String tagsTargetKey) { - this(tagsTargetKey, null, null, null); + this(tagsTargetKey, null, null, null, null); + } + + public void setForwardToPipelines(final Map pipelines) { + for (Map.Entry entry: forwardToPipelines.entrySet()) { + final String key = entry.getKey(); + final HeadlessPipeline pipeline = pipelines.get(key); + if (pipeline != null) { + forwardToPipelines.put(key, pipeline); + } else { + throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key)); + } + } + } + + public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Map withData, final Map withMetadata) { + if (forwardToPipelines.size() == 0) { + return false; + } + + for (Map.Entry entry: forwardToPipelines.entrySet()) { + if (entry.getValue() == null) { + return false; + } + } + List> records = sinkForwardRecordsContext.getRecords(); + + if (records.size() == 0) { + return true; + } + + records.forEach((record) -> { + Event event = record.getData(); + if (withData != null && !withData.isEmpty()) { + for (Map.Entry entry: withData.entrySet()) { + event.put(entry.getKey(), entry.getValue()); + } + } + if (withMetadata != null && !withMetadata.isEmpty()) { + EventMetadata metadata = event.getMetadata(); + for (Map.Entry entry: withMetadata.entrySet()) { + metadata.setAttribute(entry.getKey(), entry.getValue()); + } + } + }); + + for (Map.Entry entry: forwardToPipelines.entrySet()) { + entry.getValue().sendEvents(records); + } + return true; } /** @@ -56,5 +125,9 @@ public List getIncludeKeys() { public List getExcludeKeys() { return excludeKeys; } + + public Map getForwardToPipelines() { + return forwardToPipelines; + } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java new file mode 100644 index 0000000000..f972aad3cb --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.InternalEventHandle; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class SinkForwardRecordsContext { + List> records; + boolean forwardPipelinesPresent; + + public SinkForwardRecordsContext(SinkContext sinkContext) { + forwardPipelinesPresent = (sinkContext != null && sinkContext.getForwardToPipelines().size() > 0); + records = new ArrayList<>(); + } + + public void addRecord(Record record) { + if (!forwardPipelinesPresent) + return; + InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle(); + if (eventHandle != null) { + eventHandle.acquireReference(); + } + records.add(record); + } + + public void addRecords(Collection> newRecords) { + if (!forwardPipelinesPresent) + return; + newRecords.forEach((record) -> { + Event event = record.getData(); + InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle(); + if (eventHandle != null) { + eventHandle.acquireReference(); + } + }); + records.addAll(newRecords); + } + + public List> getRecords() { + return records; + } +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java new file mode 100644 index 0000000000..a9b64daed9 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.configuration; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import java.util.Map; + +public class SinkForwardConfigTest { + + @Test + void testDefaults() { + final SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(); + assertThat(sinkForwardConfig.getPipelineNames(), nullValue()); + assertThat(sinkForwardConfig.getWithData(), nullValue()); + assertThat(sinkForwardConfig.getWithMetadata(), nullValue()); + } + + @Test + void testCustomValues() { + List pipelines = mock(List.class); + Map withData = mock(Map.class); + Map withMetadata = mock(Map.class); + SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata); + assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines)); + assertThat(sinkForwardConfig.getWithData(), equalTo(withData)); + assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata)); + } +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java index d5c831e98e..9dd9e450a5 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java @@ -170,6 +170,18 @@ void sinkModel_with_exclude_keys() { } + @Test + void sinkModel_with_forward_pipelines() { + SinkForwardConfig sinkForwardConfig = mock(SinkForwardConfig.class); + List forwardPipelineList = List.of("forward-pipeline1", "forward-pipeline2"); + when(sinkForwardConfig.getPipelineNames()).thenReturn(forwardPipelineList); + final Map pluginSettings = new LinkedHashMap<>(); + + final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), sinkForwardConfig, pluginSettings); + + assertThat(sinkModel.getForwardConfig().getPipelineNames(), equalTo(forwardPipelineList)); + } + @Test void sinkModel_with_invalid_exclude_keys() { final Map pluginSettings = new LinkedHashMap<>(); @@ -213,7 +225,7 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() { assertThat(actualSinkModel.getExcludeKeys(), notNullValue()); assertThat(actualSinkModel.getExcludeKeys(), empty()); assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); - assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); + assertThat(actualSinkModel.getForwardConfig(), nullValue()); } } @@ -221,4 +233,4 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() { private static String createStringFromInputStream(final InputStream inputStream) throws IOException { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } -} \ No newline at end of file +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java index cb6fe54e02..72a5f64c6b 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java @@ -8,12 +8,27 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.List; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doNothing; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; public class SinkContextTest { private SinkContext sinkContext; @@ -24,11 +39,13 @@ public void testSinkContextBasic() { final List testRoutes = Collections.emptyList(); final List testIncludeKeys = Collections.emptyList(); final List testExcludeKeys = Collections.emptyList(); - sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys); + final List testForwardToPipelineNames = Collections.emptyList(); + sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames); assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); assertThat(sinkContext.getRoutes(), equalTo(testRoutes)); assertThat(sinkContext.getIncludeKeys(), equalTo(testIncludeKeys)); assertThat(sinkContext.getExcludeKeys(), equalTo(testExcludeKeys)); + assertThat(sinkContext.getForwardToPipelines(), equalTo(Collections.emptyMap())); } @@ -43,5 +60,94 @@ public void testSinkContextWithTagsOnly() { } + @Test + public void testForwardToPipelinesWithPipelineMap() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + final List testRoutes = Collections.emptyList(); + final List testIncludeKeys = Collections.emptyList(); + final List testExcludeKeys = Collections.emptyList(); + final List testForwardToPipelineNames = List.of("forward-pipeline1", "forward-pipeline2"); + sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames); + Map forwardToPipelines = sinkContext.getForwardToPipelines(); + assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(null)); + assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(null)); + HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class); + HeadlessPipeline forwardPipeline2 = mock(HeadlessPipeline.class); + sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1, "forward-pipeline2", forwardPipeline2)); + forwardToPipelines = sinkContext.getForwardToPipelines(); + assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(forwardPipeline1)); + assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(forwardPipeline2)); + Record record = mock(Record.class); + EventMetadata eventMetadata = mock(EventMetadata.class); + Event event = mock(Event.class); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + doNothing().when(eventHandle).acquireReference(); + when(event.getEventHandle()).thenReturn(eventHandle); + when(record.getData()).thenReturn(event); + when(event.getMetadata()).thenReturn(eventMetadata); + Collection> records = Collections.singletonList(record); + SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + + assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true)); + verify(forwardPipeline1, times(0)).sendEvents(eq(records)); + verify(forwardPipeline2, times(0)).sendEvents(eq(records)); + sinkForwardRecordsContext.addRecords(records); + assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true)); + verify(forwardPipeline1, times(1)).sendEvents(eq(records)); + verify(forwardPipeline2, times(1)).sendEvents(eq(records)); + verify(event, times(1)).put(any(String.class), any(Object.class)); + verify(event, times(1)).getMetadata(); + verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class)); + assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, null), equalTo(true)); + verify(forwardPipeline1, times(2)).sendEvents(eq(records)); + verify(forwardPipeline2, times(2)).sendEvents(eq(records)); + assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(true)); + verify(forwardPipeline1, times(3)).sendEvents(eq(records)); + verify(forwardPipeline2, times(3)).sendEvents(eq(records)); + } + + @Test + public void testForwardToPipelinesWithPipelineMapAndFailureCases() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + final List testRoutes = Collections.emptyList(); + final List testIncludeKeys = Collections.emptyList(); + final List testExcludeKeys = Collections.emptyList(); + final List testForwardToPipelineNames = List.of("forward-pipeline1", "forward-pipeline2"); + sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames); + Map forwardToPipelines = sinkContext.getForwardToPipelines(); + assertThat(forwardToPipelines.get("forward-pipeline1"), equalTo(null)); + assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(null)); + HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class); + assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1))); + SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + Record record = mock(Record.class); + Event event = mock(Event.class); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + doNothing().when(eventHandle).acquireReference(); + when(record.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + sinkForwardRecordsContext.addRecords(List.of(record)); + assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false)); + } + + @Test + public void testWithNoForwardToPipelines() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + final List testRoutes = Collections.emptyList(); + final List testIncludeKeys = Collections.emptyList(); + final List testExcludeKeys = Collections.emptyList(); + final List testForwardToPipelineNames = Collections.emptyList(); + sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames); + Record record = mock(Record.class); + Event event = mock(Event.class); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + doNothing().when(eventHandle).acquireReference(); + when(record.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + + SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + sinkForwardRecordsContext.addRecords(List.of(record)); + assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false)); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java new file mode 100644 index 0000000000..84450db5dc --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.junit.jupiter.api.Test; + +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; + +import static org.mockito.Mockito.mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; + +public class SinkForwardRecordsContextTest { + + SinkForwardRecordsContext sinkForwardRecordsContext; + + @Test + public void testSinkForwardRecordContextBasic() { + SinkContext sinkContext = mock(SinkContext.class); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); + sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + Event event = mock(Event.class); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + doNothing().when(eventHandle).acquireReference(); + Record record1 = mock(Record.class); + Record record2 = mock(Record.class); + Record record3 = mock(Record.class); + when(record1.getData()).thenReturn(event); + when(record2.getData()).thenReturn(event); + when(record3.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + sinkForwardRecordsContext.addRecord(record1); + sinkForwardRecordsContext.addRecords(List.of(record2, record3)); + List> records = sinkForwardRecordsContext.getRecords(); + assertThat(records.size(), equalTo(0)); + } + + @Test + public void testSinkForwardRecordContextWithForwardingPipelines() { + SinkContext sinkContext = mock(SinkContext.class); + HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline)); + sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + Event event = mock(Event.class); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + doNothing().when(eventHandle).acquireReference(); + Record record1 = mock(Record.class); + Record record2 = mock(Record.class); + Record record3 = mock(Record.class); + when(record1.getData()).thenReturn(event); + when(record2.getData()).thenReturn(event); + when(record3.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + sinkForwardRecordsContext.addRecord(record1); + sinkForwardRecordsContext.addRecords(List.of(record2, record3)); + List> records = sinkForwardRecordsContext.getRecords(); + assertThat(records.size(), equalTo(3)); + } +} diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/DLQHeadlessPipelinesIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/DLQHeadlessPipelinesIT.java new file mode 100644 index 0000000000..a60bd9991a --- /dev/null +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/DLQHeadlessPipelinesIT.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.integration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; + +class DLQHeadlessPipelinesIT { + private static final Logger LOG = LoggerFactory.getLogger(ProcessorPipelineIT.class); + private static final String IN_MEMORY_IDENTIFIER_DLQ = "PipelineDLQIT"; + private static final String IN_MEMORY_IDENTIFIER_FORWARD = "ForwardPipelineIT"; + private static final String PIPELINE_DLQ_TEST_CONFIGURATION= "pipeline-dlq.yaml"; + private static final String FORWARD_PIPELINE_TEST_CONFIGURATION= "forward-pipeline.yaml"; + private DataPrepperTestRunner dataPrepperTestRunner; + private InMemorySourceAccessor inMemorySourceAccessor; + private InMemorySinkAccessor inMemorySinkAccessor; + + private void createPipeline(final String pipelineConfiguration) { + dataPrepperTestRunner = DataPrepperTestRunner.builder() + .withPipelinesDirectoryOrFile(pipelineConfiguration) + .build(); + inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); + inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); + dataPrepperTestRunner.start(); + LOG.info("Started test runner."); + } + + @AfterEach + void tearDown() { + LOG.info("Test tear down. Stop the test runner."); + dataPrepperTestRunner.stop(); + } + + @Test + void dlq_pipeline_test() { + createPipeline(PIPELINE_DLQ_TEST_CONFIGURATION); + + final int recordsToCreate = 200; + final List> inputRecords = IntStream.range(0, recordsToCreate) + .mapToObj(i -> UUID.randomUUID().toString()) + .map(JacksonEvent::fromMessage) + .map(Record::new) + .collect(Collectors.toList()); + + LOG.info("Submitting a batch of record."); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER_DLQ, inputRecords); + + await().atMost(400, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ), not(empty())); + }); + + assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ).size(), equalTo(recordsToCreate)); + + final List> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ); + + assertThat(sinkRecords.size(), equalTo(recordsToCreate)); + for (int i = 0; i < sinkRecords.size(); i++) { + final Record inputRecord = inputRecords.get(i); + final Record sinkRecord = sinkRecords.get(i); + assertThat(sinkRecord, notNullValue()); + final Event recordData = sinkRecord.getData(); + assertThat(recordData, notNullValue()); + assertThat( + recordData.get("message", String.class), + equalTo(inputRecord.getData().get("message", String.class))); + assertThat(recordData.get("test1", String.class), + equalTo(null)); + + } + } + +} + diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ForwardingHeadlessPipelinesIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ForwardingHeadlessPipelinesIT.java new file mode 100644 index 0000000000..a6c4830520 --- /dev/null +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ForwardingHeadlessPipelinesIT.java @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.integration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; + +class ForwardingHeadlessPipelinesIT { + private static final Logger LOG = LoggerFactory.getLogger(ProcessorPipelineIT.class); + private static final String IN_MEMORY_IDENTIFIER_DLQ = "PipelineDLQIT"; + private static final String IN_MEMORY_IDENTIFIER_FORWARD = "ForwardPipelineIT"; + private static final String PIPELINE_DLQ_TEST_CONFIGURATION= "pipeline-dlq.yaml"; + private static final String FORWARD_PIPELINE_TEST_CONFIGURATION= "forward-pipeline.yaml"; + private DataPrepperTestRunner dataPrepperTestRunner; + private InMemorySourceAccessor inMemorySourceAccessor; + private InMemorySinkAccessor inMemorySinkAccessor; + + private void createPipeline(final String pipelineConfiguration) { + dataPrepperTestRunner = DataPrepperTestRunner.builder() + .withPipelinesDirectoryOrFile(pipelineConfiguration) + .build(); + inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); + inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); + dataPrepperTestRunner.start(); + LOG.info("Started test runner."); + } + + @AfterEach + void tearDown() { + LOG.info("Test tear down. Stop the test runner."); + dataPrepperTestRunner.stop(); + } + + @Test + void pipeline_forward_test() { + createPipeline(FORWARD_PIPELINE_TEST_CONFIGURATION); + + final int recordsToCreate = 200; + final List> inputRecords = IntStream.range(0, recordsToCreate) + .mapToObj(i -> UUID.randomUUID().toString()) + .map(JacksonEvent::fromMessage) + .map(Record::new) + .collect(Collectors.toList()); + + LOG.info("Submitting a batch of record."); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER_FORWARD, inputRecords); + + await().atMost(400, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_FORWARD), not(empty())); + }); + + assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_FORWARD).size(), equalTo(2*recordsToCreate)); + + final List> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_FORWARD); + + for (int i = 0; i < sinkRecords.size(); i++) { + final Record inputRecord = inputRecords.get(i%recordsToCreate); + final Record sinkRecord = sinkRecords.get(i); + assertThat(sinkRecord, notNullValue()); + final Event recordData = sinkRecord.getData(); + assertThat(recordData, notNullValue()); + assertThat( + recordData.get("message", String.class), + equalTo(inputRecord.getData().get("message", String.class))); + assertThat(recordData.get("test1", String.class), + equalTo("knownUpdatedPrefix1" + i%recordsToCreate)); + + } + } +} diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java index a98b1c56b8..df223dc3d2 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java @@ -12,6 +12,8 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import java.util.Collection; @@ -22,12 +24,15 @@ public class InMemorySink implements Sink> { private final InMemorySinkAccessor inMemorySinkAccessor; private final AcknowledgementSetManager acknowledgementSetManager; private final Boolean acknowledgements; + private final SinkContext sinkContext; @DataPrepperPluginConstructor public InMemorySink(final InMemoryConfig inMemoryConfig, final AcknowledgementSetManager acknowledgementSetManager, + final SinkContext sinkContext, final InMemorySinkAccessor inMemorySinkAccessor) { testingKey = inMemoryConfig.getTestingKey(); + this.sinkContext = sinkContext; this.inMemorySinkAccessor = inMemorySinkAccessor; this.acknowledgementSetManager = acknowledgementSetManager; acknowledgements = inMemoryConfig.getAcknowledgements(); @@ -37,12 +42,15 @@ public InMemorySink(final InMemoryConfig inMemoryConfig, public void output(final Collection> records) { inMemorySinkAccessor.addEvents(testingKey, records); boolean result = inMemorySinkAccessor.getResult(); + SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + sinkForwardRecordsContext.addRecords(records); records.stream().forEach((record) -> { EventHandle eventHandle = ((Event)record.getData()).getEventHandle(); if (acknowledgements) { eventHandle.release(result); } }); + sinkContext.forwardRecords(sinkForwardRecordsContext, null, null); } @Override diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessor.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessor.java index b0450d06d1..cc602147ad 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessor.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessor.java @@ -18,16 +18,21 @@ public class SimpleProcessor implements Processor, Record> { private final EventKey eventKey1; private final String valuePrefix1; + private final boolean throwException; int count = 0; @DataPrepperPluginConstructor public SimpleProcessor(final SimpleProcessorConfig simpleProcessorConfig) { eventKey1 = simpleProcessorConfig.getKey1(); valuePrefix1 = simpleProcessorConfig.getValuePrefix1(); + throwException = simpleProcessorConfig.getThrowException(); } @Override public Collection> execute(final Collection> records) { + if (throwException) { + throw new RuntimeException("Throwing Exception"); + } for (final Record record : records) { record.getData().put(eventKey1, valuePrefix1 + count); count++; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessorConfig.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessorConfig.java index 932d91c936..c6e26f9eef 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessorConfig.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SimpleProcessorConfig.java @@ -13,6 +13,7 @@ public class SimpleProcessorConfig { @EventKeyConfiguration(EventKeyFactory.EventAction.PUT) private EventKey key1; private String valuePrefix1; + private boolean throwException; public EventKey getKey1() { return key1; @@ -21,4 +22,8 @@ public EventKey getKey1() { public String getValuePrefix1() { return valuePrefix1; } + + public boolean getThrowException() { + return throwException; + } } diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/forward-pipeline.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/forward-pipeline.yaml new file mode 100644 index 0000000000..6594568a5c --- /dev/null +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/forward-pipeline.yaml @@ -0,0 +1,25 @@ +test-forward-pipeline: + delay: 10 + source: + in_memory: + testing_key: ForwardPipelineIT + + processor: + - simple_test: + key1: /test1 + value_prefix1: knownUpdatedPrefix1 + + sink: + - in_memory: + testing_key: ForwardPipelineIT + forward_to: + pipelines: [ "pipeline2"] + +pipeline2: + sink: + - in_memory: + testing_key: ForwardPipelineIT + + + + diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/pipeline-dlq.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/pipeline-dlq.yaml new file mode 100644 index 0000000000..6d921a0218 --- /dev/null +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/pipeline-dlq.yaml @@ -0,0 +1,23 @@ +test-dlq-pipeline: + delay: 10 + source: + in_memory: + testing_key: PipelineDLQIT + + processor: + - simple_test: + key1: /test1 + value_prefix1: knownPrefix1 + throw_exception: true + + sink: + - in_memory: + testing_key: PipelineDLQNone + +dlq_pipeline: + sink: + - in_memory: + testing_key: PipelineDLQIT + + + diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index 5624fba79d..aaae9d7d03 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.core.pipeline.Pipeline; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; import org.opensearch.dataprepper.core.pipeline.HeadlessPipelineSource; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.core.pipeline.PipelineRunnerImpl; import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner; import org.opensearch.dataprepper.core.pipeline.router.Router; @@ -72,6 +73,7 @@ public class PipelineTransformer { private final SourceCoordinatorFactory sourceCoordinatorFactory; private final PluginErrorCollector pluginErrorCollector; private final PluginErrorsHandler pluginErrorsHandler; + private final Map sinkToSinkContextMap; private final ExpressionEvaluator expressionEvaluator; @@ -98,6 +100,7 @@ public PipelineTransformer( this.pluginErrorCollector = pluginErrorCollector; this.pluginErrorsHandler = pluginErrorsHandler; this.expressionEvaluator = expressionEvaluator; + this.sinkToSinkContextMap = new LinkedHashMap<>(); } public Map transformConfiguration(final PipelinesDataFlowModel pipelinesDataFlowModel) { @@ -119,6 +122,17 @@ public Map transformConfiguration(final PipelinesDataFlowModel buildPipelineFromConfiguration(pipelineName, pipelineConfigurationMap, pipelineMap); } } + Map headlessPipelines = new LinkedHashMap<>(); + for (Map.Entry pipeline: pipelineMap.entrySet()){ + if (pipeline.getValue().getSource() instanceof HeadlessPipelineSource) { + headlessPipelines.put(pipeline.getKey(), pipeline.getValue()); + } + } + + for (Map.Entry pipelineEntry: pipelineMap.entrySet()) { + final Pipeline pipeline = pipelineEntry.getValue(); + pipeline.getSinks().forEach(sink -> sinkToSinkContextMap.get(sink).setForwardToPipelines(headlessPipelines)); + } return pipelineMap; } @@ -131,19 +145,8 @@ private void buildPipelineFromConfiguration( final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName(); try { Source source; - if (!pipelineName.equals(failurePipelineName)) { - final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); - if (sourceSetting == null) { - Exception e = new IllegalArgumentException(String.format("{}: Source must not be null", pipelineName)); - final PluginError pluginError = PluginError.builder() - .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) - .pipelineName(pipelineName) - .pluginName("UNKNOWN") - .exception(e) - .build(); - pluginErrorCollector.collectPluginError(pluginError); - return; - } + final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); + if (sourceSetting != null) { final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, pipelineMap, pipelineConfigurationMap); source = pipelineSource.orElseGet(() -> { @@ -161,16 +164,15 @@ private void buildPipelineFromConfiguration( } }); } else { - source = new HeadlessPipelineSource(failurePipelineName, ""); + source = new HeadlessPipelineSource(pipelineName, ""); } LOG.info("Building buffer for the pipeline [{}]", pipelineName); Buffer pipelineDefinedBuffer = null; final PluginSetting bufferPluginSetting = pipelineConfiguration.getBufferPluginSetting(); try { - if (source != null) { - pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, bufferPluginSetting, source.getDecoder()); - } + pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, bufferPluginSetting, + source != null ? source.getDecoder() : null); } catch (Exception e) { final PluginError pluginError = PluginError.builder() .componentType(PipelineModel.BUFFER_PLUGIN_TYPE) @@ -354,6 +356,7 @@ private DataFlowComponent buildRoutedSinkOrConnector(final SinkContextPlug try { final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext()); + sinkToSinkContextMap.put(sink, pluginSetting.getSinkContext()); return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes()); } catch (Exception e) { final PluginError pluginError = PluginError.builder() diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java index 3473b2c0f0..32eadee182 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java @@ -109,6 +109,7 @@ Collection runProcessorsAndProcessAcknowledgements(List processors, C } } catch (final Exception e) { if (pipeline.getFailurePipeline() != null) { + LOG.error("A processor threw an exception. This batch of Events will be sent to DLQ. ", e); pipeline.getFailurePipeline().sendEvents(records); } else if (inputEvents != null) { LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 39b3c8c2d3..99f5c6e306 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -347,9 +347,9 @@ void parseConfiguration_with_missing_source_should_fail() { final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.MISSING_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE); - final Map connectedPipelines =pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel); - assertThat(connectedPipelines.size(), equalTo(0)); - assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(2)); + final RuntimeException actualException = assertThrows(RuntimeException.class, () -> pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel)); + assertThat(actualException.getMessage(), + equalTo("Invalid configuration, expected source test-pipeline-1 for pipeline test-pipeline-2 is missing")); } @Test diff --git a/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml b/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml index 37a59f7eb4..ca1995c535 100644 --- a/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml +++ b/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml @@ -1,13 +1,12 @@ # this configuration file is solely for testing formatting test-pipeline-1: + source: + stdin: buffer: bounded_blocking: #to check non object nodes for plugins sink: - pipeline: name: "test-pipeline-2" test-pipeline-2: - source: - pipeline: - name: "test-pipeline-1" sink: - file: diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java index 4215409770..ce72f5ea99 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java @@ -137,7 +137,8 @@ private static SinkContextPluginSetting getSinkContextPluginSettingFromSinkModel final Map settingsMap = Optional .ofNullable(sinkModel.getPluginSettings()) .orElseGet(HashMap::new); - return new SinkContextPluginSetting(sinkModel.getPluginName(), settingsMap, new SinkContext(sinkModel.getTagsTargetKey(), sinkModel.getRoutes(), sinkModel.getIncludeKeys(), sinkModel.getExcludeKeys())); + List pipelineNames = sinkModel.getForwardConfig() == null ? null : sinkModel.getForwardConfig().getPipelineNames(); + return new SinkContextPluginSetting(sinkModel.getPluginName(), settingsMap, new SinkContext(sinkModel.getTagsTargetKey(), sinkModel.getRoutes(), sinkModel.getIncludeKeys(), sinkModel.getExcludeKeys(), pipelineNames)); } private Integer getWorkersFromPipelineModel(final PipelineModel pipelineModel) { diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java index ae9f52bbf9..0f8dad4cc7 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +23,7 @@ import java.nio.file.StandardOpenOption; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; @@ -41,6 +43,7 @@ public class FileSink implements Sink> { private boolean isStopRequested; private boolean initialized; private final String tagsTargetKey; + private final SinkContext sinkContext; private final boolean appendMode; @@ -61,16 +64,25 @@ public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkConte initialized = false; lock = new ReentrantLock(true); tagsTargetKey = Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null; + this.sinkContext = sinkContext; } @Override public void output(final Collection> records) { + SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); lock.lock(); + Collection> events = new ArrayList<>(); + try { if (isStopRequested) return; for (final Record record : records) { + if (record.getData() instanceof Event) { + Event event = (Event)record.getData(); + sinkForwardRecordsContext.addRecord(new Record<>(event)); + } + try { checkTypeAndWriteObject(record.getData(), writer); } catch (final IOException ex) { @@ -78,6 +90,7 @@ public void output(final Collection> records) { } } + sinkContext.forwardRecords(sinkForwardRecordsContext, null, null); try { writer.flush(); } catch (final IOException ex) {