diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index fc562a2a62..c0a9712dfc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Instant; import java.time.Duration; @@ -38,6 +39,7 @@ public abstract class AbstractBuffer> implements Buffer { private final Timer latencyTimer; private final Timer readTimer; private final Timer checkpointTimer; + private HeadlessPipeline failurePipeline; public AbstractBuffer(final PluginSetting pluginSetting) { this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName()); @@ -62,6 +64,15 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN this.checkpointTimer = pluginMetrics.timer(MetricNames.CHECKPOINT_TIME_ELAPSED); } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + /** * Records metrics for ingress, time elapsed, and timeouts, while calling the doWrite method * to perform the actual write diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index 6fde22ce67..5367757937 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.plugin.PluginComponentType; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Duration; import java.util.Collection; @@ -138,4 +139,14 @@ default boolean isWrittenOffHeapOnly() { */ default void shutdown() { } + + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { + } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java index 7af56175a0..005ba53632 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java @@ -77,7 +77,6 @@ public PipelineModel( @JsonProperty("sink") final List sinks, @JsonProperty("workers") final Integer workers, @JsonProperty("delay") final Integer delay) { - checkArgument(Objects.nonNull(source), "Source must not be null"); checkArgument(Objects.nonNull(sinks), "Sinks must not be null"); checkArgument(sinks.size() > 0, "PipelineModel must include at least 1 sink"); diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java new file mode 100644 index 0000000000..c47eca24bd --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.pipeline; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; + +public interface HeadlessPipeline { + /** + * sets flag to indicate if acknowledgements are enabled + * + * @param acknowledgementsEnabled flag indicating acknowledgements are enabled + * @since 2.13 + */ + void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled); + + /** + * sends events to the headless pipeline + * + * @param events records to be sent to headless pipeline + * @since 2.13 + */ + void sendEvents(Collection> events); + +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java index 9ec6a689da..86ba5d25a2 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.util.Collection; @@ -26,6 +27,7 @@ public abstract class AbstractProcessor, OutputRec private final Counter recordsInCounter; private final Counter recordsOutCounter; private final Timer timeElapsedTimer; + private HeadlessPipeline failurePipeline; public AbstractProcessor(final PluginSetting pluginSetting) { pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -63,4 +65,13 @@ public Collection execute(final Collection records) { * @return Processed records */ public abstract Collection doExecute(Collection records); + + @Override + public void setFailurePipeline(HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 4b66491b4f..6979ba2bcd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.processor; import org.opensearch.dataprepper.model.plugin.PluginComponentType; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -58,4 +59,14 @@ default boolean holdsEvents() { * Final shutdown call to clean up any resources that need to be closed. */ void shutdown(); + + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { + } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 26dd7e98a6..527a83cb60 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -29,6 +30,7 @@ public abstract class AbstractSink> implements Sink { private int maxRetries; private int waitTimeMs; private SinkThread sinkThread; + private HeadlessPipeline failurePipeline; public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) { this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -69,6 +71,15 @@ public void output(Collection records) { timeElapsedTimer.record(() -> doOutput(records)); } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + /** * This method should implement the output logic * @param records Records to be output diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 0f8793c0ff..0f22649191 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.plugin.PluginComponentType; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -48,4 +49,13 @@ public interface Sink> { default void updateLatencyMetrics(final Collection events) { } + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { + } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java index 90f0870c78..fb62af42fe 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.plugin.PluginComponentType; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.codec.HasByteDecoder; /** @@ -39,4 +40,13 @@ default boolean areAcknowledgementsEnabled() { return false; } + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { + } + } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java index f60e42ca6f..7e1a36e669 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.util.AbstractMap; import java.util.ArrayList; @@ -31,6 +32,9 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.mockito.Mockito.mock; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -181,6 +185,14 @@ public void testWriteBytes() throws TimeoutException { assertThrows(RuntimeException.class, () -> abstractBuffer.writeBytes(bytes, "", 10)); } + @Test + void testGetAndSetFailurePipeline() { + final AbstractBuffer> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + abstractBuffer.setFailurePipeline(failurePipeline); + assertThat(abstractBuffer.getFailurePipeline(), sameInstance(failurePipeline)); + } + @Test public void testWriteTimeoutMetric() throws TimeoutException { // Given diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java index 0d9aa51296..a1a65a4f97 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Duration; import java.util.Optional; @@ -16,7 +17,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doCallRealMethod; class BufferTest { @@ -71,4 +74,11 @@ void testWriteBytes() { } + @Test + void testSetFailurePipeline() { + final Buffer> buffer = createObjectUnderTest(); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + doCallRealMethod().when(buffer).setFailurePipeline(failurePipeline); + buffer.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java index e6a53b3496..e5b3f53fb0 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java @@ -107,23 +107,6 @@ void testPipelineModelWithValidProcessorConfig() { assertEquals(expectedPreppersPluginModel, pipelineModel.getProcessors()); } - @Test - void testPipelineModelWithNullSourceThrowsException() { - final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( - null, - validBufferPluginModel(), - validPreppersPluginModel(), - validPipelineRouter(), - validSinksPluginModel(), - TEST_WORKERS, - TEST_READ_BATCH_DELAY - )); - - final String expected = "Source must not be null"; - - assertTrue(exception.getMessage().contains(expected)); - } - @Test void testPipelineModelWithNullSinksThrowsException() { final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java index c938abf465..6ccf3e8f4e 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java @@ -14,6 +14,10 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; +import static org.mockito.Mockito.mock; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; import java.util.Arrays; import java.util.Collection; @@ -93,6 +97,20 @@ public void testMetricsWithPluginMetricsConstructor() { 0.2)); } + @Test + void testGetAndSetFailurePipeline() { + final String processorName = "testProcessor"; + final String pipelineName = "testPipeline"; + MetricsTestUtil.initMetrics(); + + PluginSetting pluginSetting = new PluginSetting(processorName, Collections.emptyMap()); + pluginSetting.setPipelineName(pipelineName); + AbstractProcessor, Record> processor = new ProcessorImpl(pluginSetting); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + processor.setFailurePipeline(failurePipeline); + assertThat(processor.getFailurePipeline(), sameInstance(failurePipeline)); + } + public static class ProcessorImpl extends AbstractProcessor, Record> { public ProcessorImpl(PluginSetting pluginSetting) { super(pluginSetting); @@ -113,6 +131,7 @@ public Collection> doExecute(Collection> records) .collect(Collectors.toList()); } + @Override public void prepareForShutdown() { @@ -128,4 +147,4 @@ public void shutdown() { } } -} \ No newline at end of file +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java index 2fec941c4f..5d9d396cdf 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java @@ -5,11 +5,13 @@ package org.opensearch.dataprepper.model.processor; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doCallRealMethod; public class ProcessorTest { @@ -19,5 +21,13 @@ public void testDefault() { when(processor.holdsEvents()).thenCallRealMethod(); assertThat(processor.holdsEvents(), equalTo(false)); } + + @Test + public void testSetFailurePipeline() { + Processor processor = mock(Processor.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + doCallRealMethod().when(processor).setFailurePipeline(failurePipeline); + processor.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index 1026cb6083..a6b2081928 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Duration; import java.time.Instant; @@ -30,6 +31,7 @@ import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.number.OrderingComparison.greaterThan; import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; import static org.hamcrest.number.OrderingComparison.lessThan; @@ -87,6 +89,16 @@ void testMetrics() { abstractSink.shutdown(); } + @Test + public void testGetAndSetFailurePipeline() { + AbstractSink> abstractSink = new AbstractSinkImpl(pluginSetting); + abstractSink.initialize(); + assertEquals(abstractSink.isReady(), true); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + abstractSink.setFailurePipeline(failurePipeline); + assertThat(abstractSink.getFailurePipeline(), sameInstance(failurePipeline)); + } + @Test void testSinkNotReady() throws InterruptedException { AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java index 5f66a623aa..2d58e6a3d6 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java @@ -6,7 +6,10 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doCallRealMethod; import java.util.Collection; import java.util.Collections; @@ -27,6 +30,10 @@ public void shutdown() { public void initialize() { } + @Override + public void setFailurePipeline(HeadlessPipeline failurePipeline) { + } + @Override public void output(Collection> records) { } @@ -40,4 +47,12 @@ public void testSinkUpdateLatencyMetrics() { sink = new SinkTestClass(); sink.updateLatencyMetrics(Collections.emptyList()); } + + @Test + public void testSetFailurePipeline() { + Sink testSink = mock(Sink.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + doCallRealMethod().when(testSink).setFailurePipeline(failurePipeline); + testSink.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java index efcde97b00..6395757564 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java @@ -7,11 +7,13 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doCallRealMethod; public class SourceTest { @Test @@ -20,4 +22,12 @@ void testAreAcknowledgementsEnabled() { when(objectUnderTest.areAcknowledgementsEnabled()).thenCallRealMethod(); assertFalse(objectUnderTest.areAcknowledgementsEnabled()); } + + @Test + void testSetFailurePipeline() { + Source> objectUnderTest = mock(Source.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + doCallRealMethod().when(objectUnderTest).setFailurePipeline(failurePipeline); + objectUnderTest.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index d0ea3d4c78..5624fba79d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator; import org.opensearch.dataprepper.core.pipeline.Pipeline; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; +import org.opensearch.dataprepper.core.pipeline.HeadlessPipelineSource; import org.opensearch.dataprepper.core.pipeline.PipelineRunnerImpl; import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner; import org.opensearch.dataprepper.core.pipeline.router.Router; @@ -127,24 +128,41 @@ private void buildPipelineFromConfiguration( final Map pipelineMap) { final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName); LOG.info("Building pipeline [{}] from provided configuration", pipelineName); + final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName(); try { - final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); - final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, - pipelineMap, pipelineConfigurationMap); - final Source source = pipelineSource.orElseGet(() -> { - try { - return pluginFactory.loadPlugin(Source.class, sourceSetting); - } catch (Exception e) { + Source source; + if (!pipelineName.equals(failurePipelineName)) { + final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); + if (sourceSetting == null) { + Exception e = new IllegalArgumentException(String.format("{}: Source must not be null", pipelineName)); final PluginError pluginError = PluginError.builder() .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) .pipelineName(pipelineName) - .pluginName(sourceSetting.getName()) + .pluginName("UNKNOWN") .exception(e) .build(); pluginErrorCollector.collectPluginError(pluginError); - return null; + return; } - }); + final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, + pipelineMap, pipelineConfigurationMap); + source = pipelineSource.orElseGet(() -> { + try { + return pluginFactory.loadPlugin(Source.class, sourceSetting); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName(sourceSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } + }); + } else { + source = new HeadlessPipelineSource(failurePipelineName, ""); + } LOG.info("Building buffer for the pipeline [{}]", pipelineName); Buffer pipelineDefinedBuffer = null; @@ -227,6 +245,7 @@ private void buildPipelineFromConfiguration( dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(), getPeerForwarderDrainTimeout(dataPrepperConfiguration)); + if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) { // Check if there are any processors with @SingleThread annotation boolean hasSingleThreadedProcessors = processorSets.stream() @@ -259,6 +278,18 @@ private void buildPipelineFromConfiguration( "pipelines", pipelineName, ex); processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap); } + final Pipeline failurePipeline = pipelineMap.get(failurePipelineName); + boolean acknowledgementsEnabled = false; + if (failurePipeline != null) { + for (Map.Entry pipelineEntry : pipelineMap.entrySet()) { + if (!(pipelineEntry.getKey().equals(failurePipelineName))) { + pipelineEntry.getValue().setFailurePipeline(failurePipeline); + acknowledgementsEnabled = acknowledgementsEnabled || pipelineEntry.getValue().areAcknowledgementsEnabled(); + + } + } + failurePipeline.setAcknowledgementsEnabled(acknowledgementsEnabled); + } } @@ -350,7 +381,7 @@ private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkC } private Optional getPipelineNameIfPipelineType(final PluginSetting pluginSetting) { - if (PIPELINE_TYPE.equals(pluginSetting.getName()) && + if (pluginSetting != null && PIPELINE_TYPE.equals(pluginSetting.getName()) && pluginSetting.getAttributeFromSettings(ATTRIBUTE_NAME) != null) { //Validator marked valid config with type as pipeline will have attribute name return Optional.of((String) pluginSetting.getAttributeFromSettings(ATTRIBUTE_NAME)); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java index 67b34ac3fd..259c121547 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java @@ -35,6 +35,7 @@ */ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer, ExperimentalConfigurationContainer { static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); + public static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq_pipeline"; private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory"; @@ -60,6 +61,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC private Duration sinkShutdownTimeout; private ExperimentalConfiguration experimental; private PipelineExtensions pipelineExtensions; + private String failurePipelineName = DEFAULT_FAILURE_PIPELINE_NAME; public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration(); @@ -143,6 +145,10 @@ public int getServerPort() { return serverPort; } + public String getFailurePipelineName() { + return failurePipelineName; + } + public boolean ssl() { return ssl; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java new file mode 100644 index 0000000000..47eaab1294 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; + +public class HeadlessPipelineSource implements Source>, HeadlessPipeline { + private static final Logger LOG = LoggerFactory.getLogger(HeadlessPipelineSource.class); + private static final String NUMBER_OF_SUCCESSFUL_EVENTS_COUNTER = "numberOfEventsSuccessful"; + private static final String NUMBER_OF_FAILED_EVENTS_COUNTER = "numberOfEventsFailed"; + public static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE; + private Buffer buffer; + private AtomicBoolean isStopRequested; + private PluginMetrics pluginMetrics; + private final Counter numberOfEventsSuccessful; + private final Counter numberOfEventsFailed; + private boolean acknowledgementsEnabled; + + public HeadlessPipelineSource(final String componentName, final String scope) { + pluginMetrics = PluginMetrics.fromNames(componentName, scope); + isStopRequested = new AtomicBoolean(false); + numberOfEventsSuccessful = pluginMetrics.counter(NUMBER_OF_SUCCESSFUL_EVENTS_COUNTER); + numberOfEventsFailed = pluginMetrics.counter(NUMBER_OF_FAILED_EVENTS_COUNTER); + acknowledgementsEnabled = false; + } + + @Override + public void start(Buffer buffer) { + this.buffer = buffer; + } + + @Override + public void stop() { + isStopRequested.set(true); + } + + @Override + public void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled) { + this.acknowledgementsEnabled = acknowledgementsEnabled; + } + + @VisibleForTesting + public boolean getAcknowledgementsEnabled() { + return acknowledgementsEnabled; + } + + @VisibleForTesting + long getNumberOfSuccessfulEvents() { + return (long)numberOfEventsSuccessful.count(); + } + + @VisibleForTesting + long getNumberOfFailedEvents() { + return (long)numberOfEventsFailed.count(); + } + + @Override + public void sendEvents(Collection> records) { + while (true) { + try { + buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); + numberOfEventsSuccessful.increment(records.size()); + break; + } catch (Exception e) { + LOG.error(NOISY, "Failed to write to failure pipeline.", e); + if (acknowledgementsEnabled) { + /* If acknowledgements enabled, better to retry here than retrying from the beginning */ + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } else { + numberOfEventsFailed.increment(records.size()); + for (final Record record: records) { + ((Event)record.getData()).getEventHandle().release(false); + } + break; + } + } + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index d513361540..0a6c7f68fe 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -20,7 +20,9 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; @@ -52,7 +54,7 @@ * {@link Processor} and outputs the transformed (or original) data to {@link Sink}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class Pipeline { +public class Pipeline implements HeadlessPipeline { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); private final ProcessorRegistry singleThreadUnsafeProcessorRegistry; @@ -65,6 +67,7 @@ public class Pipeline { private final Router router; private final SourceCoordinatorFactory sourceCoordinatorFactory; private final int processorThreads; + private HeadlessPipeline failurePipeline; private final int readBatchTimeoutInMillis; private final Duration processorShutdownTimeout; private final Duration sinkShutdownTimeout; @@ -121,6 +124,7 @@ public Pipeline( this.processorSets = processorSets; this.sinks = sinks; this.router = router; + this.failurePipeline = null; this.sourceCoordinatorFactory = sourceCoordinatorFactory; this.processorThreads = processorThreads; this.eventFactory = eventFactory; @@ -161,6 +165,18 @@ public Buffer getBuffer() { return this.buffer; } + public void setFailurePipeline(HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + this.source.setFailurePipeline(failurePipeline); + this.buffer.setFailurePipeline(failurePipeline); + processorSets.forEach(processorSet -> processorSet.forEach(processor -> processor.setFailurePipeline(failurePipeline))); + this.getSinks().forEach(sink -> sink.setFailurePipeline(failurePipeline)); + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + /** * @return {@link Sink} of this pipeline. */ @@ -287,6 +303,18 @@ public void execute() { } } + public void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled) { + if (getSource() instanceof HeadlessPipelineSource) { + ((HeadlessPipelineSource)getSource()).setAcknowledgementsEnabled(acknowledgementsEnabled); + } + } + + public void sendEvents(Collection> records) { + if (getSource() instanceof HeadlessPipelineSource) { + ((HeadlessPipelineSource)getSource()).sendEvents(records); + } + } + public synchronized void shutdown() { shutdown(DataPrepperShutdownOptions.defaultOptions()); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java index 08bcfab642..b6be161a4a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,10 @@ public void output(final Collection records) { } } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + } + @Override public void shutdown() { //TODO: Cleanup resources diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java index 6389aa6696..3473b2c0f0 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java @@ -108,8 +108,10 @@ Collection runProcessorsAndProcessAcknowledgements(List processors, C processAcknowledgements(inputEvents, records); } } catch (final Exception e) { - LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); - if (inputEvents != null) { + if (pipeline.getFailurePipeline() != null) { + pipeline.getFailurePipeline().sendEvents(records); + } else if (inputEvents != null) { + LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); processAcknowledgements(inputEvents, Collections.emptyList()); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index 8c3f9a8e5a..47d1d57d63 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -24,8 +24,10 @@ public class TestDataProvider { public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml"; public static final String INCORRECT_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/incorrect_source_multiple_pipeline_configuration.yml"; public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml"; + public static final String MISSING_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_source_multiple_pipeline_configuration.yml"; public static final String MISSING_PIPELINE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_pipeline_multiple_pipeline_configuration.yml"; public static final String VALID_MULTIPLE_SINKS_CONFIG_FILE = "src/test/resources/valid_multiple_sinks.yml"; + public static final String VALID_MULTIPLE_SINKS_WITH_FAILURE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml"; public static final String COMPATIBLE_VERSION_CONFIG_FILE = "src/test/resources/compatible_version.yml"; public static final String VALID_MULTIPLE_PROCESSERS_CONFIG_FILE = "src/test/resources/valid_multiple_processors.yml"; public static final String VALID_DATA_PREPPER_CONFIG_FILE = "src/test/resources/valid_data_prepper_config.yml"; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 141b13efb3..f11220002a 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderReceiveBuffer; import org.opensearch.dataprepper.core.pipeline.Pipeline; +import org.opensearch.dataprepper.core.pipeline.HeadlessPipelineSource; import org.opensearch.dataprepper.core.pipeline.router.RouterFactory; import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.core.validation.PluginErrorCollector; @@ -60,10 +61,12 @@ import java.util.UUID; import java.util.stream.Stream; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -73,7 +76,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.core.parser.PipelineTransformer.CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT; @@ -145,7 +147,6 @@ void setUp() { void tearDown() { verify(dataPrepperConfiguration).getEventConfiguration(); verify(dataPrepperConfiguration).getExperimental(); - verifyNoMoreInteractions(dataPrepperConfiguration); } private PipelineTransformer createObjectUnderTest(final String pipelineConfigurationFileLocation) { @@ -341,6 +342,16 @@ void parseConfiguration_with_missing_pipeline_name_should_throw() { verify(dataPrepperConfiguration).getPipelineExtensions(); } + @Test + void parseConfiguration_with_missing_source_should_fail() { + final PipelineTransformer pipelineTransformer = + createObjectUnderTest(TestDataProvider.MISSING_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE); + + final Map connectedPipelines =pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel); + assertThat(connectedPipelines.size(), equalTo(0)); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(2)); + } + @Test void parseConfiguration_with_missing_pipeline_name_in_multiple_pipelines_should_throw() { final PipelineTransformer pipelineTransformer = @@ -362,6 +373,28 @@ void testMultipleSinks() { verify(dataPrepperConfiguration).getPipelineExtensions(); } + @Test + void testMultipleSinksWithFailurePipeline() { + when(dataPrepperConfiguration.getFailurePipelineName()).thenReturn(DataPrepperConfiguration.DEFAULT_FAILURE_PIPELINE_NAME); + when(expressionEvaluator.isValidExpressionStatement("/value == raw")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("/value == service")).thenReturn(true); + mockDataPrepperConfigurationAccesses(); + final PipelineTransformer pipelineTransformer = + createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_SINKS_WITH_FAILURE_PIPELINE_CONFIG_FILE); + final Map pipelineMap = pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel); + assertThat(pipelineMap.size(), equalTo(4)); + verifyDataPrepperConfigurationAccesses(pipelineMap.size()); + Pipeline failurePipeline = pipelineMap.get(DataPrepperConfiguration.DEFAULT_FAILURE_PIPELINE_NAME); + assertTrue(failurePipeline != null); + for (Map.Entry entry : pipelineMap.entrySet()) { + if (!entry.getKey().equals(DataPrepperConfiguration.DEFAULT_FAILURE_PIPELINE_NAME)) { + assertThat(entry.getValue().getFailurePipeline(), sameInstance(failurePipeline)); + } + } + assertTrue(failurePipeline.getSource() instanceof HeadlessPipelineSource); + assertThat(((HeadlessPipelineSource)failurePipeline.getSource()).getAcknowledgementsEnabled(), equalTo(false)); + } + @Test void testMultipleProcessors() { mockDataPrepperConfigurationAccesses(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java new file mode 100644 index 0000000000..6bc0df0740 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import java.util.concurrent.TimeUnit; + +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.buffer.Buffer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import org.mockito.Mock; + +import static org.awaitility.Awaitility.await; +import org.awaitility.core.ConditionTimeoutException; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +class HeadlessPipelineSourceTest { + HeadlessPipelineSource headlessPipelineSource; + + @Mock + private Buffer buffer; + + private HeadlessPipelineSource createObjectUnderTest() { + final String name = UUID.randomUUID().toString(); + final String scope = UUID.randomUUID().toString(); + return new HeadlessPipelineSource(name, scope); + } + + @BeforeEach + void setup() { + buffer = mock(Buffer.class); + } + + @AfterEach + void teardown() { + if (headlessPipelineSource != null) { + headlessPipelineSource.stop(); + } + } + + @Test + public void testSendEvents() throws Exception { + doNothing().when(buffer).writeAll(any(Collection.class), any(Integer.class)); + headlessPipelineSource = createObjectUnderTest(); + headlessPipelineSource.start(buffer); + Collection> records = Collections.emptyList(); + headlessPipelineSource.sendEvents(records); + verify(buffer).writeAll(eq(records), eq(HeadlessPipelineSource.DEFAULT_WRITE_TIMEOUT)); + } + + @Test + public void testSendEventsException() throws Exception { + doThrow(RuntimeException.class).when(buffer).writeAll(any(Collection.class), any(Integer.class)); + headlessPipelineSource = createObjectUnderTest(); + headlessPipelineSource.start(buffer); + Record record = mock(Record.class); + Event event = mock(Event.class); + EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + Collection> records = Collections.singletonList(record); + headlessPipelineSource.sendEvents(records); + verify(eventHandle).release(eq(false)); + + } + + @Test + public void testSendEventsExceptionWithAcknowledgements() throws Exception { + doThrow(RuntimeException.class).when(buffer).writeAll(any(Collection.class), any(Integer.class)); + headlessPipelineSource = createObjectUnderTest(); + headlessPipelineSource.setAcknowledgementsEnabled(true); + headlessPipelineSource.start(buffer); + Record record = mock(Record.class); + Event event = mock(Event.class); + EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + Collection> records = Collections.singletonList(record); + assertThrows(ConditionTimeoutException.class, () -> { + await().atMost(2000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + headlessPipelineSource.sendEvents(records); + }); + }); + assertThat(headlessPipelineSource.getNumberOfSuccessfulEvents(), equalTo(0L)); + assertThat(headlessPipelineSource.getNumberOfFailedEvents(), equalTo(0L)); + verify(eventHandle, never()).release(eq(false)); + + } + +} + + diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java index 525ae0d2cb..7255e1f0b4 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java @@ -146,6 +146,7 @@ public void setup() { spanRecordList = Collections.singletonList(SPAN_RECORD); spanBuffer = new BlockingBuffer<>("Pipeline1"); sput = new PipelineConnector<>(); + sput.setFailurePipeline(null); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java index b7fec88f7c..434eab2d50 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java @@ -22,10 +22,12 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.AbstractBuffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; @@ -78,6 +80,7 @@ class PipelineTests { private static final int TEST_READ_BATCH_TIMEOUT = 500; private static final int TEST_PROCESSOR_THREADS = 1; private static final String TEST_PIPELINE_NAME = "test-pipeline"; + private static final String FAILURE_PIPELINE_NAME = "failure-pipeline"; private Router router; private SourceCoordinatorFactory sourceCoordinatorFactory; @@ -680,4 +683,45 @@ void isForceStopReadingBuffers_returns_true_if_bufferReadTimeout_is_exceeded() t Thread.sleep(2); assertThat(testPipeline.isForceStopReadingBuffers(), is(true)); } + + @Test + void testFailurePipeline() throws InterruptedException { + final Source> testSource = new TestSource(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + final TestSink testSink = new TestSink(); + final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>())); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.singletonList(Collections.singletonList(testProcessor)), + Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + HeadlessPipelineSource headlessPipelineSource = mock(HeadlessPipelineSource.class); + Pipeline failurePipeline = new Pipeline(FAILURE_PIPELINE_NAME, headlessPipelineSource, new BlockingBuffer(FAILURE_PIPELINE_NAME), + Collections.emptyList(), + Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + + testPipeline.setFailurePipeline(failurePipeline); + assertThat(((AbstractBuffer)testPipeline.getBuffer()).getFailurePipeline(), equalTo(failurePipeline)); + assertThat(((TestSource)testPipeline.getSource()).getFailurePipeline(), equalTo(failurePipeline)); + + Collection sinks = testPipeline.getSinks(); + List> processorSets = testPipeline.getProcessorSets(); + processorSets.forEach(processorSet -> processorSet.forEach(processor -> { + assertThat(((TestProcessor)processor).getFailurePipeline(), equalTo(failurePipeline)); + })); + assertThat(sinks.size(), equalTo(1)); + for (Sink sink: sinks) { + assertThat(((TestSink)sink).getFailurePipeline(), equalTo(failurePipeline)); + } + failurePipeline.setAcknowledgementsEnabled(false); + verify(headlessPipelineSource).setAcknowledgementsEnabled(false); + testPipeline.setAcknowledgementsEnabled(false); + Collection> records = mock(Collection.class); + failurePipeline.sendEvents(records); + verify(headlessPipelineSource).sendEvents(records); + + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java index f0ae429bb9..fd596f3702 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -17,6 +18,7 @@ @DataPrepperPlugin(name = "test_processor", pluginType = Processor.class) public class TestProcessor implements Processor, Record> { public boolean isShutdown = false; + private HeadlessPipeline failurePipeline; public TestProcessor(final PluginSetting pluginSetting) {} @@ -35,6 +37,15 @@ public boolean isReadyForShutdown() { return true; } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + @Override public void shutdown() { isShutdown = true; diff --git a/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml b/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml new file mode 100644 index 0000000000..37a59f7eb4 --- /dev/null +++ b/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml @@ -0,0 +1,13 @@ +# this configuration file is solely for testing formatting +test-pipeline-1: + buffer: + bounded_blocking: #to check non object nodes for plugins + sink: + - pipeline: + name: "test-pipeline-2" +test-pipeline-2: + source: + pipeline: + name: "test-pipeline-1" + sink: + - file: diff --git a/data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml b/data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml new file mode 100644 index 0000000000..eeb748a15e --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml @@ -0,0 +1,37 @@ +entry-pipeline: + source: + random: + route: + - "raw" : "/value == raw" + - "service" : "/value == service" + sink: + - pipeline: + name: "raw-pipeline" + routes: + - raw + - pipeline: + name: "service-map-pipeline" + routes: + - service +raw-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - string_converter: + upper_case: true + sink: + - stdout: +service-map-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - string_converter: + upper_case: true + sink: + - stdout: + +dlq_pipeline: + sink: + - stdout: diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java index 6b048f629c..4215409770 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java @@ -76,7 +76,9 @@ public Integer getReadBatchDelay() { } public void updateCommonPipelineConfiguration(final String pipelineName) { - updatePluginSetting(sourcePluginSetting, pipelineName); + if (sourcePluginSetting != null) { + updatePluginSetting(sourcePluginSetting, pipelineName); + } updatePluginSetting(bufferPluginSetting, pipelineName); processorPluginSettings.forEach(processorPluginSettings -> updatePluginSetting(processorPluginSettings, pipelineName)); @@ -92,7 +94,7 @@ private void updatePluginSetting( private PluginSetting getSourceFromPluginModel(final PluginModel pluginModel) { if (pluginModel == null) { - throw new IllegalArgumentException("Invalid configuration, source is a required component"); + return null; } return getPluginSettingFromPluginModel(pluginModel); } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java index 8baaad331e..f1f21d52a3 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java @@ -112,19 +112,6 @@ void testOnlySourceAndSink() { assertThat(pipelineConfiguration.getReadBatchDelay(), CoreMatchers.is(TestConfigurationProvider.DEFAULT_READ_BATCH_DELAY)); } - @Test - void testNoSourceConfiguration() { - final PipelineModel pipelineModel = mock(PipelineModel.class); - when(pipelineModel.getProcessors()).thenReturn(processors); - when(pipelineModel.getSinks()).thenReturn(sinks); - when(pipelineModel.getWorkers()).thenReturn(TestConfigurationProvider.TEST_WORKERS); - when(pipelineModel.getReadBatchDelay()).thenReturn(TestConfigurationProvider.TEST_DELAY); - - final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); - - assertThat(actual.getMessage(), equalTo("Invalid configuration, source is a required component")); - } - @Test void testNullProcessorConfiguration() { final PipelineModel pipelineModel = mock(PipelineModel.class); diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index 3424e7ec5f..28a08e03b7 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -116,6 +116,7 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS final DataPrepperPlugin pluginAnnotation = pluginClass.getAnnotation(DataPrepperPlugin.class); final Class pluginConfigurationType = pluginAnnotation.pluginConfigurationType(); + final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting); final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java index 1e2742a0ba..f0e840c79b 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.sink.Sink; import java.time.Duration; @@ -23,6 +24,7 @@ public class TestSink implements Sink> { private final boolean failSinkForTest; private boolean ready; private Instant readyTime; + private HeadlessPipeline failurePipeline; public TestSink() { this.failSinkForTest = false; @@ -51,6 +53,15 @@ public void output(Collection> records) { records.stream().collect(Collectors.toCollection(() -> collectedRecords)); } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + @Override public void shutdown() { isShutdown = true; diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java index 2ad29f2650..dfa09ef742 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; @@ -22,6 +23,7 @@ public class TestSource implements Source> { .map(Record::new).collect(Collectors.toList()); private boolean isStopRequested; private boolean failSourceForTest; + private HeadlessPipeline failurePipeline; public TestSource() { this.isStopRequested = false; @@ -48,6 +50,15 @@ public void start(Buffer> buffer) { } } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + @Override public void stop() { isStopRequested = true;