From 0352f9c027e754606d325434bd8b7878a8ad7ccf Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 14 Jul 2025 08:45:23 -0700 Subject: [PATCH 1/8] Modified to set failure pipeline in all pipeline components Signed-off-by: Kondaka --- .../dataprepper/model/buffer/Buffer.java | 21 +++++++++ .../model/failures/FailurePipeline.java | 14 ++++++ .../model/processor/AbstractProcessor.java | 12 +++++ .../model/processor/Processor.java | 21 +++++++++ .../dataprepper/model/sink/AbstractSink.java | 12 +++++ .../dataprepper/model/sink/Sink.java | 19 ++++++++ .../dataprepper/model/source/Source.java | 20 ++++++++ .../core/parser/PipelineTransformer.java | 45 ++++++++++++------ .../model/DataPrepperConfiguration.java | 6 +++ .../core/pipeline/FailurePipelineSource.java | 47 +++++++++++++++++++ .../dataprepper/core/pipeline/Pipeline.java | 24 +++++++++- .../core/pipeline/PipelineConnector.java | 10 ++++ .../core/pipeline/PipelineRunnerImpl.java | 4 +- .../core/parser/PipelineTransformerTests.java | 1 - 14 files changed, 239 insertions(+), 17 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index 6fde22ce67..bc32f32124 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.plugin.PluginComponentType; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import java.time.Duration; import java.util.Collection; @@ -138,4 +139,24 @@ default boolean isWrittenOffHeapOnly() { */ default void shutdown() { } + + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final FailurePipeline failurePipeline) { + } + + /** + * Returns default failure pipeline of a source + + * @return FailurePipeline returns failure pipeline + * @since 2.12 + */ + default FailurePipeline getFailurePipeline() { + return null; + } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java new file mode 100644 index 0000000000..44980899a5 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.failures; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; +public interface FailurePipeline { + void sendFailedEvents(Collection> events); +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java index 9ec6a689da..9fd18bfce2 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import java.util.Collection; @@ -26,6 +27,7 @@ public abstract class AbstractProcessor, OutputRec private final Counter recordsInCounter; private final Counter recordsOutCounter; private final Timer timeElapsedTimer; + private FailurePipeline failurePipeline; public AbstractProcessor(final PluginSetting pluginSetting) { pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -63,4 +65,14 @@ public Collection execute(final Collection records) { * @return Processed records */ public abstract Collection doExecute(Collection records); + + @Override + public void setFailurePipeline(FailurePipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + @Override + public FailurePipeline getFailurePipeline() { + return failurePipeline; + } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 4b66491b4f..ea5f6efc5c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.processor; import org.opensearch.dataprepper.model.plugin.PluginComponentType; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -58,4 +59,24 @@ default boolean holdsEvents() { * Final shutdown call to clean up any resources that need to be closed. */ void shutdown(); + + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final FailurePipeline failurePipeline) { + } + + /** + * Returns default failure pipeline of a source + + * @return FailurePipeline returns failure pipeline + * @since 2.12 + */ + default FailurePipeline getFailurePipeline() { + return null; + } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 26dd7e98a6..15942fb383 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -29,6 +30,7 @@ public abstract class AbstractSink> implements Sink { private int maxRetries; private int waitTimeMs; private SinkThread sinkThread; + private FailurePipeline failurePipeline; public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) { this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -69,6 +71,16 @@ public void output(Collection records) { timeElapsedTimer.record(() -> doOutput(records)); } + @Override + public void setFailurePipeline(final FailurePipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + @Override + public FailurePipeline getFailurePipeline() { + return failurePipeline; + } + /** * This method should implement the output logic * @param records Records to be output diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 0f8793c0ff..3797a293b8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.plugin.PluginComponentType; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -48,4 +49,22 @@ public interface Sink> { default void updateLatencyMetrics(final Collection events) { } + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final FailurePipeline failurePipeline) { + } + + /** + * Returns default failure pipeline of a source + + * @return FailurePipeline returns failure pipeline + * @since 2.12 + */ + default FailurePipeline getFailurePipeline() { + return null; + } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java index 90f0870c78..b1b07bcecf 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.plugin.PluginComponentType; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.opensearch.dataprepper.model.codec.HasByteDecoder; /** @@ -39,4 +40,23 @@ default boolean areAcknowledgementsEnabled() { return false; } + /** + * Sets default failure pipeline of a source + + * @param failurePipeline failure pipeline + * @since 2.12 + */ + default void setFailurePipeline(final FailurePipeline failurePipeline) { + } + + /** + * Returns default failure pipeline of a source + + * @return FailurePipeline returns failure pipeline + * @since 2.12 + */ + default FailurePipeline getFailurePipeline() { + return null; + } + } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index d0ea3d4c78..ed71dbf5c8 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator; import org.opensearch.dataprepper.core.pipeline.Pipeline; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; +import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource; import org.opensearch.dataprepper.core.pipeline.PipelineRunnerImpl; import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner; import org.opensearch.dataprepper.core.pipeline.router.Router; @@ -127,24 +128,30 @@ private void buildPipelineFromConfiguration( final Map pipelineMap) { final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName); LOG.info("Building pipeline [{}] from provided configuration", pipelineName); + final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName(); try { final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, pipelineMap, pipelineConfigurationMap); - final Source source = pipelineSource.orElseGet(() -> { - try { - return pluginFactory.loadPlugin(Source.class, sourceSetting); - } catch (Exception e) { - final PluginError pluginError = PluginError.builder() - .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) - .pipelineName(pipelineName) - .pluginName(sourceSetting.getName()) - .exception(e) - .build(); - pluginErrorCollector.collectPluginError(pluginError); - return null; - } - }); + Source source; + if (!pipelineName.equals(failurePipelineName)) { + source = pipelineSource.orElseGet(() -> { + try { + return pluginFactory.loadPlugin(Source.class, sourceSetting); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName(sourceSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } + }); + } else { + source = new FailurePipelineSource(); + } LOG.info("Building buffer for the pipeline [{}]", pipelineName); Buffer pipelineDefinedBuffer = null; @@ -227,6 +234,7 @@ private void buildPipelineFromConfiguration( dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(), getPeerForwarderDrainTimeout(dataPrepperConfiguration)); + if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) { // Check if there are any processors with @SingleThread annotation boolean hasSingleThreadedProcessors = processorSets.stream() @@ -259,6 +267,15 @@ private void buildPipelineFromConfiguration( "pipelines", pipelineName, ex); processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap); } + final Pipeline failurePipeline = pipelineMap.get(failurePipelineName); + if (failurePipeline != null) { + for (Map.Entry pipelineEntry : pipelineMap.entrySet()) { + if (!(pipelineEntry.getKey().equals(failurePipelineName))) { + pipelineEntry.getValue().setFailurePipeline(failurePipeline); + + } + } + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java index 67b34ac3fd..3459590388 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java @@ -35,6 +35,7 @@ */ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer, ExperimentalConfigurationContainer { static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); + static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq"; private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory"; @@ -60,6 +61,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC private Duration sinkShutdownTimeout; private ExperimentalConfiguration experimental; private PipelineExtensions pipelineExtensions; + private String failurePipelineName = DEFAULT_FAILURE_PIPELINE_NAME; public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration(); @@ -143,6 +145,10 @@ public int getServerPort() { return serverPort; } + public String getFailurePipelineName() { + return failurePipelineName; + } + public boolean ssl() { return ssl; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java new file mode 100644 index 0000000000..24ba1557a6 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +public class FailurePipelineSource implements Source>, FailurePipeline { + private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class); + private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE; + private Buffer buffer; + private AtomicBoolean isStopRequested; + + public FailurePipelineSource() { + isStopRequested = new AtomicBoolean(false); + } + + @Override + public void start(Buffer buffer) { + this.buffer = buffer; + } + + @Override + public void stop() { + isStopRequested.set(true); + } + + @Override + public void sendFailedEvents(Collection> records) { + try { + buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); + } catch (Exception e) { + LOG.error("Failed to write to failure pipeline"); + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index d513361540..0d3129de03 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -20,7 +20,9 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; @@ -52,7 +54,7 @@ * {@link Processor} and outputs the transformed (or original) data to {@link Sink}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class Pipeline { +public class Pipeline implements FailurePipeline { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); private final ProcessorRegistry singleThreadUnsafeProcessorRegistry; @@ -65,6 +67,7 @@ public class Pipeline { private final Router router; private final SourceCoordinatorFactory sourceCoordinatorFactory; private final int processorThreads; + private FailurePipeline failurePipeline; private final int readBatchTimeoutInMillis; private final Duration processorShutdownTimeout; private final Duration sinkShutdownTimeout; @@ -121,6 +124,7 @@ public Pipeline( this.processorSets = processorSets; this.sinks = sinks; this.router = router; + this.failurePipeline = null; this.sourceCoordinatorFactory = sourceCoordinatorFactory; this.processorThreads = processorThreads; this.eventFactory = eventFactory; @@ -161,6 +165,18 @@ public Buffer getBuffer() { return this.buffer; } + public void setFailurePipeline(FailurePipeline failurePipeline) { + this.failurePipeline = failurePipeline; + this.source.setFailurePipeline(failurePipeline); + this.buffer.setFailurePipeline(failurePipeline); + processorSets.forEach(processorSet -> processorSet.forEach(processor -> processor.setFailurePipeline(failurePipeline))); + this.getSinks().forEach(sink -> sink.setFailurePipeline(failurePipeline)); + } + + public FailurePipeline getFailurePipeline() { + return failurePipeline; + } + /** * @return {@link Sink} of this pipeline. */ @@ -287,6 +303,12 @@ public void execute() { } } + public void sendFailedEvents(Collection> records) { + if (this.source instanceof FailurePipelineSource) { + ((FailurePipelineSource)this.source).sendFailedEvents(records); + } + } + public synchronized void shutdown() { shutdown(DataPrepperShutdownOptions.defaultOptions()); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java index 08bcfab642..10442c5ab7 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,15 @@ public void output(final Collection records) { } } + @Override + public void setFailurePipeline(final FailurePipeline failurePipeline) { + } + + @Override + public FailurePipeline getFailurePipeline() { + return null; + } + @Override public void shutdown() { //TODO: Cleanup resources diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java index 6389aa6696..bc60baa2f7 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java @@ -109,7 +109,9 @@ Collection runProcessorsAndProcessAcknowledgements(List processors, C } } catch (final Exception e) { LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); - if (inputEvents != null) { + if (pipeline.getFailurePipeline() != null) { + pipeline.getFailurePipeline().sendFailedEvents(records); + } else if (inputEvents != null) { processAcknowledgements(inputEvents, Collections.emptyList()); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 141b13efb3..40d2422601 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -145,7 +145,6 @@ void setUp() { void tearDown() { verify(dataPrepperConfiguration).getEventConfiguration(); verify(dataPrepperConfiguration).getExperimental(); - verifyNoMoreInteractions(dataPrepperConfiguration); } private PipelineTransformer createObjectUnderTest(final String pipelineConfigurationFileLocation) { From 7f0cf95c6d80dc270482225d6ccc98b53c62c78e Mon Sep 17 00:00:00 2001 From: Kondaka Date: Sun, 20 Jul 2025 14:43:18 -0700 Subject: [PATCH 2/8] Fix failing test case Signed-off-by: Kondaka --- .../model/configuration/PipelineModelTest.java | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java index e6a53b3496..e5b3f53fb0 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelineModelTest.java @@ -107,23 +107,6 @@ void testPipelineModelWithValidProcessorConfig() { assertEquals(expectedPreppersPluginModel, pipelineModel.getProcessors()); } - @Test - void testPipelineModelWithNullSourceThrowsException() { - final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( - null, - validBufferPluginModel(), - validPreppersPluginModel(), - validPipelineRouter(), - validSinksPluginModel(), - TEST_WORKERS, - TEST_READ_BATCH_DELAY - )); - - final String expected = "Source must not be null"; - - assertTrue(exception.getMessage().contains(expected)); - } - @Test void testPipelineModelWithNullSinksThrowsException() { final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( From 5656d37e4a9bc99d4c1ccaf65fa5a5bd0bd4daa3 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 21 Jul 2025 13:15:00 -0700 Subject: [PATCH 3/8] Modified tests Signed-off-by: Kondaka --- .../model/buffer/AbstractBuffer.java | 11 ++++++++++ .../dataprepper/model/buffer/Buffer.java | 10 --------- .../model/processor/AbstractProcessor.java | 1 - .../model/processor/Processor.java | 10 --------- .../dataprepper/model/sink/AbstractSink.java | 1 - .../dataprepper/model/sink/Sink.java | 9 -------- .../dataprepper/model/source/Source.java | 10 --------- .../model/buffer/AbstractBufferTest.java | 12 +++++++++++ .../dataprepper/model/buffer/BufferTest.java | 10 +++++++++ .../processor/AbstractProcessorTest.java | 21 ++++++++++++++++++- .../model/processor/ProcessorTest.java | 10 +++++++++ .../model/sink/AbstractSinkTest.java | 12 +++++++++++ .../dataprepper/model/sink/SinkTest.java | 15 +++++++++++++ .../dataprepper/model/source/SourceTest.java | 10 +++++++++ .../core/pipeline/PipelineConnector.java | 5 ----- 15 files changed, 100 insertions(+), 47 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index fc562a2a62..4154cefd03 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import java.time.Instant; import java.time.Duration; @@ -38,6 +39,7 @@ public abstract class AbstractBuffer> implements Buffer { private final Timer latencyTimer; private final Timer readTimer; private final Timer checkpointTimer; + private FailurePipeline failurePipeline; public AbstractBuffer(final PluginSetting pluginSetting) { this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName()); @@ -62,6 +64,15 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN this.checkpointTimer = pluginMetrics.timer(MetricNames.CHECKPOINT_TIME_ELAPSED); } + @Override + public void setFailurePipeline(final FailurePipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public FailurePipeline getFailurePipeline() { + return failurePipeline; + } + /** * Records metrics for ingress, time elapsed, and timeouts, while calling the doWrite method * to perform the actual write diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index bc32f32124..01084cdf18 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -149,14 +149,4 @@ default void shutdown() { default void setFailurePipeline(final FailurePipeline failurePipeline) { } - /** - * Returns default failure pipeline of a source - - * @return FailurePipeline returns failure pipeline - * @since 2.12 - */ - default FailurePipeline getFailurePipeline() { - return null; - } - } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java index 9fd18bfce2..f6b3a18c30 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java @@ -71,7 +71,6 @@ public void setFailurePipeline(FailurePipeline failurePipeline) { this.failurePipeline = failurePipeline; } - @Override public FailurePipeline getFailurePipeline() { return failurePipeline; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index ea5f6efc5c..78a4ff670e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -69,14 +69,4 @@ default boolean holdsEvents() { default void setFailurePipeline(final FailurePipeline failurePipeline) { } - /** - * Returns default failure pipeline of a source - - * @return FailurePipeline returns failure pipeline - * @since 2.12 - */ - default FailurePipeline getFailurePipeline() { - return null; - } - } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 15942fb383..92deb9cd9d 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -76,7 +76,6 @@ public void setFailurePipeline(final FailurePipeline failurePipeline) { this.failurePipeline = failurePipeline; } - @Override public FailurePipeline getFailurePipeline() { return failurePipeline; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 3797a293b8..0630bc927e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -58,13 +58,4 @@ default void updateLatencyMetrics(final Collection events) { default void setFailurePipeline(final FailurePipeline failurePipeline) { } - /** - * Returns default failure pipeline of a source - - * @return FailurePipeline returns failure pipeline - * @since 2.12 - */ - default FailurePipeline getFailurePipeline() { - return null; - } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java index b1b07bcecf..325689bd5a 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java @@ -49,14 +49,4 @@ default boolean areAcknowledgementsEnabled() { default void setFailurePipeline(final FailurePipeline failurePipeline) { } - /** - * Returns default failure pipeline of a source - - * @return FailurePipeline returns failure pipeline - * @since 2.12 - */ - default FailurePipeline getFailurePipeline() { - return null; - } - } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java index f60e42ca6f..0590fd3ae9 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import java.util.AbstractMap; import java.util.ArrayList; @@ -31,6 +32,9 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.mockito.Mockito.mock; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -181,6 +185,14 @@ public void testWriteBytes() throws TimeoutException { assertThrows(RuntimeException.class, () -> abstractBuffer.writeBytes(bytes, "", 10)); } + @Test + void testGetAndSetFailurePipeline() { + final AbstractBuffer> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + abstractBuffer.setFailurePipeline(failurePipeline); + assertThat(abstractBuffer.getFailurePipeline(), sameInstance(failurePipeline)); + } + @Test public void testWriteTimeoutMetric() throws TimeoutException { // Given diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java index 0d9aa51296..ffdd12f856 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import java.time.Duration; import java.util.Optional; @@ -16,7 +17,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doCallRealMethod; class BufferTest { @@ -71,4 +74,11 @@ void testWriteBytes() { } + @Test + void testSetFailurePipeline() { + final Buffer> buffer = createObjectUnderTest(); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + doCallRealMethod().when(buffer).setFailurePipeline(failurePipeline); + buffer.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java index c938abf465..4157122f2c 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java @@ -14,6 +14,10 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; +import static org.mockito.Mockito.mock; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; import java.util.Arrays; import java.util.Collection; @@ -93,6 +97,20 @@ public void testMetricsWithPluginMetricsConstructor() { 0.2)); } + @Test + void testGetAndSetFailurePipeline() { + final String processorName = "testProcessor"; + final String pipelineName = "testPipeline"; + MetricsTestUtil.initMetrics(); + + PluginSetting pluginSetting = new PluginSetting(processorName, Collections.emptyMap()); + pluginSetting.setPipelineName(pipelineName); + AbstractProcessor, Record> processor = new ProcessorImpl(pluginSetting); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + processor.setFailurePipeline(failurePipeline); + assertThat(processor.getFailurePipeline(), sameInstance(failurePipeline)); + } + public static class ProcessorImpl extends AbstractProcessor, Record> { public ProcessorImpl(PluginSetting pluginSetting) { super(pluginSetting); @@ -113,6 +131,7 @@ public Collection> doExecute(Collection> records) .collect(Collectors.toList()); } + @Override public void prepareForShutdown() { @@ -128,4 +147,4 @@ public void shutdown() { } } -} \ No newline at end of file +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java index 2fec941c4f..0ec662a731 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java @@ -5,11 +5,13 @@ package org.opensearch.dataprepper.model.processor; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doCallRealMethod; public class ProcessorTest { @@ -19,5 +21,13 @@ public void testDefault() { when(processor.holdsEvents()).thenCallRealMethod(); assertThat(processor.holdsEvents(), equalTo(false)); } + + @Test + public void testSetFailurePipeline() { + Processor processor = mock(Processor.class); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + doCallRealMethod().when(processor).setFailurePipeline(failurePipeline); + processor.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index 1026cb6083..dd1b9d9734 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import java.time.Duration; import java.time.Instant; @@ -30,6 +31,7 @@ import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.number.OrderingComparison.greaterThan; import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; import static org.hamcrest.number.OrderingComparison.lessThan; @@ -87,6 +89,16 @@ void testMetrics() { abstractSink.shutdown(); } + @Test + public void testGetAndSetFailurePipeline() { + AbstractSink> abstractSink = new AbstractSinkImpl(pluginSetting); + abstractSink.initialize(); + assertEquals(abstractSink.isReady(), true); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + abstractSink.setFailurePipeline(failurePipeline); + assertThat(abstractSink.getFailurePipeline(), sameInstance(failurePipeline)); + } + @Test void testSinkNotReady() throws InterruptedException { AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java index 5f66a623aa..69a6e9da88 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java @@ -6,7 +6,10 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doCallRealMethod; import java.util.Collection; import java.util.Collections; @@ -27,6 +30,10 @@ public void shutdown() { public void initialize() { } + @Override + public void setFailurePipeline(FailurePipeline failurePipeline) { + } + @Override public void output(Collection> records) { } @@ -40,4 +47,12 @@ public void testSinkUpdateLatencyMetrics() { sink = new SinkTestClass(); sink.updateLatencyMetrics(Collections.emptyList()); } + + @Test + public void testSetFailurePipeline() { + Sink testSink = mock(Sink.class); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + doCallRealMethod().when(testSink).setFailurePipeline(failurePipeline); + testSink.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java index efcde97b00..b599319640 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java @@ -7,11 +7,13 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.failures.FailurePipeline; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doCallRealMethod; public class SourceTest { @Test @@ -20,4 +22,12 @@ void testAreAcknowledgementsEnabled() { when(objectUnderTest.areAcknowledgementsEnabled()).thenCallRealMethod(); assertFalse(objectUnderTest.areAcknowledgementsEnabled()); } + + @Test + void testSetFailurePipeline() { + Source> objectUnderTest = mock(Source.class); + FailurePipeline failurePipeline = mock(FailurePipeline.class); + doCallRealMethod().when(objectUnderTest).setFailurePipeline(failurePipeline); + objectUnderTest.setFailurePipeline(failurePipeline); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java index 10442c5ab7..40be778467 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java @@ -100,11 +100,6 @@ public void output(final Collection records) { public void setFailurePipeline(final FailurePipeline failurePipeline) { } - @Override - public FailurePipeline getFailurePipeline() { - return null; - } - @Override public void shutdown() { //TODO: Cleanup resources From a4dd7945a6d118acc637948db90968bba94d077c Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 21 Jul 2025 14:01:48 -0700 Subject: [PATCH 4/8] fixed spotless check errors Signed-off-by: Kondaka --- .../dataprepper/core/parser/PipelineTransformerTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 40d2422601..8a9dd96386 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -73,7 +73,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.core.parser.PipelineTransformer.CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT; From 8a76a05c95a0384e8a5880f5b4b7b38890863ba5 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Wed, 6 Aug 2025 15:17:46 -0700 Subject: [PATCH 5/8] Addressed comments. Added more tests for 100% code coverage Signed-off-by: Kondaka --- .../model/buffer/AbstractBuffer.java | 8 +- .../dataprepper/model/buffer/Buffer.java | 4 +- .../model/configuration/PipelineModel.java | 1 - .../model/failures/FailurePipeline.java | 14 --- .../model/pipeline/HeadlessPipeline.java | 30 +++++ .../model/processor/AbstractProcessor.java | 8 +- .../model/processor/Processor.java | 4 +- .../dataprepper/model/sink/AbstractSink.java | 8 +- .../dataprepper/model/sink/Sink.java | 4 +- .../dataprepper/model/source/Source.java | 4 +- .../model/buffer/AbstractBufferTest.java | 4 +- .../dataprepper/model/buffer/BufferTest.java | 4 +- .../processor/AbstractProcessorTest.java | 4 +- .../model/processor/ProcessorTest.java | 4 +- .../model/sink/AbstractSinkTest.java | 4 +- .../dataprepper/model/sink/SinkTest.java | 6 +- .../dataprepper/model/source/SourceTest.java | 4 +- .../core/parser/PipelineTransformer.java | 15 ++- .../model/DataPrepperConfiguration.java | 2 +- .../core/pipeline/FailurePipelineSource.java | 47 ------- .../core/pipeline/HeadlessPipelineSource.java | 99 +++++++++++++++ .../dataprepper/core/pipeline/Pipeline.java | 22 ++-- .../core/pipeline/PipelineConnector.java | 4 +- .../core/pipeline/PipelineRunnerImpl.java | 4 +- .../dataprepper/TestDataProvider.java | 1 + .../core/parser/PipelineTransformerTests.java | 25 ++++ .../pipeline/HeadlessPipelineSourceTest.java | 115 ++++++++++++++++++ .../core/pipeline/PipelineConnectorTest.java | 1 + .../core/pipeline/PipelineTests.java | 44 +++++++ .../core/pipeline/common/TestProcessor.java | 11 ++ ...inks_with_routes_with_failure_pipeline.yml | 37 ++++++ .../parser/model/PipelineConfiguration.java | 6 +- .../plugin/DefaultPluginFactory.java | 1 + .../dataprepper/plugins/test/TestSink.java | 11 ++ .../dataprepper/plugins/test/TestSource.java | 11 ++ ...AwsSecretsPluginConfigValueTranslator.java | 2 +- 36 files changed, 454 insertions(+), 119 deletions(-) delete mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java delete mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java create mode 100644 data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index 4154cefd03..c0a9712dfc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -13,7 +13,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Instant; import java.time.Duration; @@ -39,7 +39,7 @@ public abstract class AbstractBuffer> implements Buffer { private final Timer latencyTimer; private final Timer readTimer; private final Timer checkpointTimer; - private FailurePipeline failurePipeline; + private HeadlessPipeline failurePipeline; public AbstractBuffer(final PluginSetting pluginSetting) { this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName()); @@ -65,11 +65,11 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN } @Override - public void setFailurePipeline(final FailurePipeline failurePipeline) { + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { this.failurePipeline = failurePipeline; } - public FailurePipeline getFailurePipeline() { + public HeadlessPipeline getFailurePipeline() { return failurePipeline; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index 01084cdf18..5367757937 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -8,7 +8,7 @@ import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.plugin.PluginComponentType; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Duration; import java.util.Collection; @@ -146,7 +146,7 @@ default void shutdown() { * @param failurePipeline failure pipeline * @since 2.12 */ - default void setFailurePipeline(final FailurePipeline failurePipeline) { + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java index 7af56175a0..005ba53632 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java @@ -77,7 +77,6 @@ public PipelineModel( @JsonProperty("sink") final List sinks, @JsonProperty("workers") final Integer workers, @JsonProperty("delay") final Integer delay) { - checkArgument(Objects.nonNull(source), "Source must not be null"); checkArgument(Objects.nonNull(sinks), "Sinks must not be null"); checkArgument(sinks.size() > 0, "PipelineModel must include at least 1 sink"); diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java deleted file mode 100644 index 44980899a5..0000000000 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.model.failures; - -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; - -import java.util.Collection; -public interface FailurePipeline { - void sendFailedEvents(Collection> events); -} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java new file mode 100644 index 0000000000..b2ce928c04 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.pipeline; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; + +public interface HeadlessPipeline { + /** + * sets flag to indicate if acknowledgements are enabled + * + * @param acknowledgementsEnabled flag indicating acknowledgements are enabled + * @since 2.13 + */ + void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled); + + /** + * sends events to the headless pipeline + * + * @param records records to be sent to headless pipeline + * @since 2.13 + */ + void sendEvents(Collection> events); + +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java index f6b3a18c30..86ba5d25a2 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java @@ -11,7 +11,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.util.Collection; @@ -27,7 +27,7 @@ public abstract class AbstractProcessor, OutputRec private final Counter recordsInCounter; private final Counter recordsOutCounter; private final Timer timeElapsedTimer; - private FailurePipeline failurePipeline; + private HeadlessPipeline failurePipeline; public AbstractProcessor(final PluginSetting pluginSetting) { pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -67,11 +67,11 @@ public Collection execute(final Collection records) { public abstract Collection doExecute(Collection records); @Override - public void setFailurePipeline(FailurePipeline failurePipeline) { + public void setFailurePipeline(HeadlessPipeline failurePipeline) { this.failurePipeline = failurePipeline; } - public FailurePipeline getFailurePipeline() { + public HeadlessPipeline getFailurePipeline() { return failurePipeline; } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 78a4ff670e..6979ba2bcd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.model.processor; import org.opensearch.dataprepper.model.plugin.PluginComponentType; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -66,7 +66,7 @@ default boolean holdsEvents() { * @param failurePipeline failure pipeline * @since 2.12 */ - default void setFailurePipeline(final FailurePipeline failurePipeline) { + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 92deb9cd9d..527a83cb60 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -10,7 +10,7 @@ import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -30,7 +30,7 @@ public abstract class AbstractSink> implements Sink { private int maxRetries; private int waitTimeMs; private SinkThread sinkThread; - private FailurePipeline failurePipeline; + private HeadlessPipeline failurePipeline; public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) { this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -72,11 +72,11 @@ public void output(Collection records) { } @Override - public void setFailurePipeline(final FailurePipeline failurePipeline) { + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { this.failurePipeline = failurePipeline; } - public FailurePipeline getFailurePipeline() { + public HeadlessPipeline getFailurePipeline() { return failurePipeline; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 0630bc927e..0f22649191 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.plugin.PluginComponentType; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -55,7 +55,7 @@ default void updateLatencyMetrics(final Collection events) { * @param failurePipeline failure pipeline * @since 2.12 */ - default void setFailurePipeline(final FailurePipeline failurePipeline) { + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java index 325689bd5a..fb62af42fe 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java @@ -8,7 +8,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.plugin.PluginComponentType; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.codec.HasByteDecoder; /** @@ -46,7 +46,7 @@ default boolean areAcknowledgementsEnabled() { * @param failurePipeline failure pipeline * @since 2.12 */ - default void setFailurePipeline(final FailurePipeline failurePipeline) { + default void setFailurePipeline(final HeadlessPipeline failurePipeline) { } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java index 0590fd3ae9..7e1a36e669 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java @@ -16,7 +16,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.util.AbstractMap; import java.util.ArrayList; @@ -188,7 +188,7 @@ public void testWriteBytes() throws TimeoutException { @Test void testGetAndSetFailurePipeline() { final AbstractBuffer> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); abstractBuffer.setFailurePipeline(failurePipeline); assertThat(abstractBuffer.getFailurePipeline(), sameInstance(failurePipeline)); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java index ffdd12f856..a1a65a4f97 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java @@ -8,7 +8,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Duration; import java.util.Optional; @@ -77,7 +77,7 @@ void testWriteBytes() { @Test void testSetFailurePipeline() { final Buffer> buffer = createObjectUnderTest(); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); doCallRealMethod().when(buffer).setFailurePipeline(failurePipeline); buffer.setFailurePipeline(failurePipeline); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java index 4157122f2c..6ccf3e8f4e 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/AbstractProcessorTest.java @@ -14,7 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import static org.mockito.Mockito.mock; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; @@ -106,7 +106,7 @@ void testGetAndSetFailurePipeline() { PluginSetting pluginSetting = new PluginSetting(processorName, Collections.emptyMap()); pluginSetting.setPipelineName(pipelineName); AbstractProcessor, Record> processor = new ProcessorImpl(pluginSetting); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); processor.setFailurePipeline(failurePipeline); assertThat(processor.getFailurePipeline(), sameInstance(failurePipeline)); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java index 0ec662a731..5d9d396cdf 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.model.processor; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -25,7 +25,7 @@ public void testDefault() { @Test public void testSetFailurePipeline() { Processor processor = mock(Processor.class); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); doCallRealMethod().when(processor).setFailurePipeline(failurePipeline); processor.setFailurePipeline(failurePipeline); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index dd1b9d9734..a6b2081928 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -15,7 +15,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import java.time.Duration; import java.time.Instant; @@ -94,7 +94,7 @@ public void testGetAndSetFailurePipeline() { AbstractSink> abstractSink = new AbstractSinkImpl(pluginSetting); abstractSink.initialize(); assertEquals(abstractSink.isReady(), true); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); abstractSink.setFailurePipeline(failurePipeline); assertThat(abstractSink.getFailurePipeline(), sameInstance(failurePipeline)); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java index 69a6e9da88..2d58e6a3d6 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.junit.jupiter.api.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doCallRealMethod; @@ -31,7 +31,7 @@ public void initialize() { } @Override - public void setFailurePipeline(FailurePipeline failurePipeline) { + public void setFailurePipeline(HeadlessPipeline failurePipeline) { } @Override @@ -51,7 +51,7 @@ public void testSinkUpdateLatencyMetrics() { @Test public void testSetFailurePipeline() { Sink testSink = mock(Sink.class); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); doCallRealMethod().when(testSink).setFailurePipeline(failurePipeline); testSink.setFailurePipeline(failurePipeline); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java index b599319640..6395757564 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/SourceTest.java @@ -7,7 +7,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -26,7 +26,7 @@ void testAreAcknowledgementsEnabled() { @Test void testSetFailurePipeline() { Source> objectUnderTest = mock(Source.class); - FailurePipeline failurePipeline = mock(FailurePipeline.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); doCallRealMethod().when(objectUnderTest).setFailurePipeline(failurePipeline); objectUnderTest.setFailurePipeline(failurePipeline); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index ed71dbf5c8..3ddfe9367d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -12,7 +12,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator; import org.opensearch.dataprepper.core.pipeline.Pipeline; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; -import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource; +import org.opensearch.dataprepper.core.pipeline.HeadlessPipelineSource; import org.opensearch.dataprepper.core.pipeline.PipelineRunnerImpl; import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner; import org.opensearch.dataprepper.core.pipeline.router.Router; @@ -130,11 +130,11 @@ private void buildPipelineFromConfiguration( LOG.info("Building pipeline [{}] from provided configuration", pipelineName); final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName(); try { - final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); - final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, - pipelineMap, pipelineConfigurationMap); Source source; if (!pipelineName.equals(failurePipelineName)) { + final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); + final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, + pipelineMap, pipelineConfigurationMap); source = pipelineSource.orElseGet(() -> { try { return pluginFactory.loadPlugin(Source.class, sourceSetting); @@ -150,7 +150,7 @@ private void buildPipelineFromConfiguration( } }); } else { - source = new FailurePipelineSource(); + source = new HeadlessPipelineSource(failurePipelineName, ""); } LOG.info("Building buffer for the pipeline [{}]", pipelineName); @@ -268,13 +268,16 @@ private void buildPipelineFromConfiguration( processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap); } final Pipeline failurePipeline = pipelineMap.get(failurePipelineName); + boolean acknowledgementsEnabled = false; if (failurePipeline != null) { for (Map.Entry pipelineEntry : pipelineMap.entrySet()) { if (!(pipelineEntry.getKey().equals(failurePipelineName))) { pipelineEntry.getValue().setFailurePipeline(failurePipeline); + acknowledgementsEnabled = acknowledgementsEnabled || pipelineEntry.getValue().areAcknowledgementsEnabled(); } } + failurePipeline.setAcknowledgementsEnabled(acknowledgementsEnabled); } } @@ -367,7 +370,7 @@ private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkC } private Optional getPipelineNameIfPipelineType(final PluginSetting pluginSetting) { - if (PIPELINE_TYPE.equals(pluginSetting.getName()) && + if (pluginSetting != null && PIPELINE_TYPE.equals(pluginSetting.getName()) && pluginSetting.getAttributeFromSettings(ATTRIBUTE_NAME) != null) { //Validator marked valid config with type as pipeline will have attribute name return Optional.of((String) pluginSetting.getAttributeFromSettings(ATTRIBUTE_NAME)); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java index 3459590388..259c121547 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java @@ -35,7 +35,7 @@ */ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer, ExperimentalConfigurationContainer { static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); - static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq"; + public static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq_pipeline"; private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory"; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java deleted file mode 100644 index 24ba1557a6..0000000000 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.core.pipeline; - -import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.model.failures.FailurePipeline; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; - -public class FailurePipelineSource implements Source>, FailurePipeline { - private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class); - private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE; - private Buffer buffer; - private AtomicBoolean isStopRequested; - - public FailurePipelineSource() { - isStopRequested = new AtomicBoolean(false); - } - - @Override - public void start(Buffer buffer) { - this.buffer = buffer; - } - - @Override - public void stop() { - isStopRequested.set(true); - } - - @Override - public void sendFailedEvents(Collection> records) { - try { - buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); - } catch (Exception e) { - LOG.error("Failed to write to failure pipeline"); - } - } -} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java new file mode 100644 index 0000000000..ad0ee012b0 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; + +public class HeadlessPipelineSource implements Source>, HeadlessPipeline { + private static final Logger LOG = LoggerFactory.getLogger(HeadlessPipelineSource.class); + private static final String NUMBER_OF_SUCCESSFUL_EVENTS_COUNTER = "numberOfEventsSuccessful"; + private static final String NUMBER_OF_FAILED_EVENTS_COUNTER = "numberOfEventsFailed"; + public static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE; + private Buffer buffer; + private AtomicBoolean isStopRequested; + private PluginMetrics pluginMetrics; + private final Counter numberOfEventsSuccessful; + private final Counter numberOfEventsFailed; + private boolean acknowledgementsEnabled; + + public HeadlessPipelineSource(final String componentName, final String scope) { + pluginMetrics = PluginMetrics.fromNames(componentName, scope); + isStopRequested = new AtomicBoolean(false); + numberOfEventsSuccessful = pluginMetrics.counter(NUMBER_OF_SUCCESSFUL_EVENTS_COUNTER); + numberOfEventsFailed = pluginMetrics.counter(NUMBER_OF_FAILED_EVENTS_COUNTER); + acknowledgementsEnabled = false; + } + + @Override + public void start(Buffer buffer) { + this.buffer = buffer; + } + + @Override + public void stop() { + isStopRequested.set(true); + } + + @Override + public void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled) { + this.acknowledgementsEnabled = acknowledgementsEnabled; + } + + @VisibleForTesting + public boolean getAcknowledgementsEnabled() { + return acknowledgementsEnabled; + } + + @VisibleForTesting + long getNumberOfSuccessfulEvents() { + return (long)numberOfEventsSuccessful.count(); + } + + @VisibleForTesting + long getNumberOfFailedEvents() { + return (long)numberOfEventsFailed.count(); + } + + @Override + public void sendEvents(Collection> records) { + while (true) { + try { + buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); + numberOfEventsSuccessful.increment(records.size()); + break; + } catch (Exception e) { + LOG.error(NOISY, "Failed to write to failure pipeline"); + if (acknowledgementsEnabled) { + /* If acknowledgements enabled, better to retry here than retrying from the beginning */ + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } else { + numberOfEventsFailed.increment(records.size()); + for (final Record record: records) { + ((Event)record.getData()).getEventHandle().release(false); + } + break; + } + } + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index 0d3129de03..0a6c7f68fe 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -22,7 +22,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.sink.Sink; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; @@ -54,7 +54,7 @@ * {@link Processor} and outputs the transformed (or original) data to {@link Sink}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class Pipeline implements FailurePipeline { +public class Pipeline implements HeadlessPipeline { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); private final ProcessorRegistry singleThreadUnsafeProcessorRegistry; @@ -67,7 +67,7 @@ public class Pipeline implements FailurePipeline { private final Router router; private final SourceCoordinatorFactory sourceCoordinatorFactory; private final int processorThreads; - private FailurePipeline failurePipeline; + private HeadlessPipeline failurePipeline; private final int readBatchTimeoutInMillis; private final Duration processorShutdownTimeout; private final Duration sinkShutdownTimeout; @@ -165,7 +165,7 @@ public Buffer getBuffer() { return this.buffer; } - public void setFailurePipeline(FailurePipeline failurePipeline) { + public void setFailurePipeline(HeadlessPipeline failurePipeline) { this.failurePipeline = failurePipeline; this.source.setFailurePipeline(failurePipeline); this.buffer.setFailurePipeline(failurePipeline); @@ -173,7 +173,7 @@ public void setFailurePipeline(FailurePipeline failurePipeline) { this.getSinks().forEach(sink -> sink.setFailurePipeline(failurePipeline)); } - public FailurePipeline getFailurePipeline() { + public HeadlessPipeline getFailurePipeline() { return failurePipeline; } @@ -303,9 +303,15 @@ public void execute() { } } - public void sendFailedEvents(Collection> records) { - if (this.source instanceof FailurePipelineSource) { - ((FailurePipelineSource)this.source).sendFailedEvents(records); + public void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled) { + if (getSource() instanceof HeadlessPipelineSource) { + ((HeadlessPipelineSource)getSource()).setAcknowledgementsEnabled(acknowledgementsEnabled); + } + } + + public void sendEvents(Collection> records) { + if (getSource() instanceof HeadlessPipelineSource) { + ((HeadlessPipelineSource)getSource()).sendEvents(records); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java index 40be778467..b6be161a4a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java @@ -9,7 +9,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,7 +97,7 @@ public void output(final Collection records) { } @Override - public void setFailurePipeline(final FailurePipeline failurePipeline) { + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { } @Override diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java index bc60baa2f7..3473b2c0f0 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java @@ -108,10 +108,10 @@ Collection runProcessorsAndProcessAcknowledgements(List processors, C processAcknowledgements(inputEvents, records); } } catch (final Exception e) { - LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); if (pipeline.getFailurePipeline() != null) { - pipeline.getFailurePipeline().sendFailedEvents(records); + pipeline.getFailurePipeline().sendEvents(records); } else if (inputEvents != null) { + LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); processAcknowledgements(inputEvents, Collections.emptyList()); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index 8c3f9a8e5a..b0b6885d92 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -26,6 +26,7 @@ public class TestDataProvider { public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml"; public static final String MISSING_PIPELINE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_pipeline_multiple_pipeline_configuration.yml"; public static final String VALID_MULTIPLE_SINKS_CONFIG_FILE = "src/test/resources/valid_multiple_sinks.yml"; + public static final String VALID_MULTIPLE_SINKS_WITH_FAILURE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml"; public static final String COMPATIBLE_VERSION_CONFIG_FILE = "src/test/resources/compatible_version.yml"; public static final String VALID_MULTIPLE_PROCESSERS_CONFIG_FILE = "src/test/resources/valid_multiple_processors.yml"; public static final String VALID_DATA_PREPPER_CONFIG_FILE = "src/test/resources/valid_data_prepper_config.yml"; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 8a9dd96386..9c79fd8f6d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderReceiveBuffer; import org.opensearch.dataprepper.core.pipeline.Pipeline; +import org.opensearch.dataprepper.core.pipeline.HeadlessPipelineSource; import org.opensearch.dataprepper.core.pipeline.router.RouterFactory; import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.core.validation.PluginErrorCollector; @@ -60,10 +61,12 @@ import java.util.UUID; import java.util.stream.Stream; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -360,6 +363,28 @@ void testMultipleSinks() { verify(dataPrepperConfiguration).getPipelineExtensions(); } + @Test + void testMultipleSinksWithFailurePipeline() { + when(dataPrepperConfiguration.getFailurePipelineName()).thenReturn(DataPrepperConfiguration.DEFAULT_FAILURE_PIPELINE_NAME); + when(expressionEvaluator.isValidExpressionStatement("/value == raw")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("/value == service")).thenReturn(true); + mockDataPrepperConfigurationAccesses(); + final PipelineTransformer pipelineTransformer = + createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_SINKS_WITH_FAILURE_PIPELINE_CONFIG_FILE); + final Map pipelineMap = pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel); + assertThat(pipelineMap.size(), equalTo(4)); + verifyDataPrepperConfigurationAccesses(pipelineMap.size()); + Pipeline failurePipeline = pipelineMap.get(DataPrepperConfiguration.DEFAULT_FAILURE_PIPELINE_NAME); + assertTrue(failurePipeline != null); + for (Map.Entry entry : pipelineMap.entrySet()) { + if (!entry.getKey().equals(DataPrepperConfiguration.DEFAULT_FAILURE_PIPELINE_NAME)) { + assertThat(entry.getValue().getFailurePipeline(), sameInstance(failurePipeline)); + } + } + assertTrue(failurePipeline.getSource() instanceof HeadlessPipelineSource); + assertThat(((HeadlessPipelineSource)failurePipeline.getSource()).getAcknowledgementsEnabled(), equalTo(false)); + } + @Test void testMultipleProcessors() { mockDataPrepperConfigurationAccesses(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java new file mode 100644 index 0000000000..9f2107bd8f --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import java.util.concurrent.TimeUnit; + +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.buffer.Buffer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import org.mockito.Mock; + +import static org.awaitility.Awaitility.await; +import org.awaitility.core.ConditionTimeoutException; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +class HeadlessPipelineSourceTest { + HeadlessPipelineSource headlessPipelineSource; + + @Mock + private Buffer buffer; + + private HeadlessPipelineSource createObjectUnderTest() { + final String name = UUID.randomUUID().toString(); + final String scope = UUID.randomUUID().toString(); + return new HeadlessPipelineSource(name, scope); + } + + @BeforeEach + void setup() { + buffer = mock(Buffer.class); + } + + @AfterEach + void teardown() { + if (headlessPipelineSource != null) { + headlessPipelineSource.stop(); + } + } + + @Test + public void testSendEvents() throws Exception { + doNothing().when(buffer).writeAll(any(Collection.class), any(Integer.class)); + headlessPipelineSource = createObjectUnderTest(); + headlessPipelineSource.start(buffer); + Collection> records = Collections.emptyList(); + headlessPipelineSource.sendEvents(records); + verify(buffer).writeAll(eq(records), eq(HeadlessPipelineSource.DEFAULT_WRITE_TIMEOUT)); + } + + @Test + public void testSendEventsException() throws Exception { + doThrow(RuntimeException.class).when(buffer).writeAll(any(Collection.class), any(Integer.class)); + headlessPipelineSource = createObjectUnderTest(); + headlessPipelineSource.start(buffer); + Record record = mock(Record.class); + Event event = mock(Event.class); + EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + Collection> records = Collections.singletonList(record); + headlessPipelineSource.sendEvents(records); + verify(eventHandle).release(eq(false)); + + } + + @Test + //@Timeout(value = 2000, unit = TimeUnit.MILLISECONDS) + public void testSendEventsExceptionWithAcknowledgements() throws Exception { + doThrow(RuntimeException.class).when(buffer).writeAll(any(Collection.class), any(Integer.class)); + headlessPipelineSource = createObjectUnderTest(); + headlessPipelineSource.setAcknowledgementsEnabled(true); + headlessPipelineSource.start(buffer); + Record record = mock(Record.class); + Event event = mock(Event.class); + EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(event); + when(event.getEventHandle()).thenReturn(eventHandle); + Collection> records = Collections.singletonList(record); + assertThrows(ConditionTimeoutException.class, () -> { + await().atMost(2000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + headlessPipelineSource.sendEvents(records); + }); + }); + assertThat(headlessPipelineSource.getNumberOfSuccessfulEvents(), equalTo(0L)); + assertThat(headlessPipelineSource.getNumberOfFailedEvents(), equalTo(0L)); + verify(eventHandle, never()).release(eq(false)); + + } + +} + + diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java index 525ae0d2cb..7255e1f0b4 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineConnectorTest.java @@ -146,6 +146,7 @@ public void setup() { spanRecordList = Collections.singletonList(SPAN_RECORD); spanBuffer = new BlockingBuffer<>("Pipeline1"); sput = new PipelineConnector<>(); + sput.setFailurePipeline(null); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java index b7fec88f7c..158e971ac2 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java @@ -22,10 +22,12 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.AbstractBuffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; @@ -78,6 +80,7 @@ class PipelineTests { private static final int TEST_READ_BATCH_TIMEOUT = 500; private static final int TEST_PROCESSOR_THREADS = 1; private static final String TEST_PIPELINE_NAME = "test-pipeline"; + private static final String FAILURE_PIPELINE_NAME = "failure-pipeline"; private Router router; private SourceCoordinatorFactory sourceCoordinatorFactory; @@ -680,4 +683,45 @@ void isForceStopReadingBuffers_returns_true_if_bufferReadTimeout_is_exceeded() t Thread.sleep(2); assertThat(testPipeline.isForceStopReadingBuffers(), is(true)); } + + @Test + void testFailurePipeline() throws InterruptedException { + final Source> testSource = new TestSource(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + final TestSink testSink = new TestSink(); + final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>())); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.singletonList(Collections.singletonList(testProcessor)), + Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + HeadlessPipelineSource headlessPipelineSource = mock(HeadlessPipelineSource.class); + Pipeline failurePipeline = new Pipeline(FAILURE_PIPELINE_NAME, headlessPipelineSource, new BlockingBuffer(FAILURE_PIPELINE_NAME), + Collections.emptyList(), + Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + + testPipeline.setFailurePipeline(failurePipeline); + assertThat(((AbstractBuffer)testPipeline.getBuffer()).getFailurePipeline(), equalTo(failurePipeline)); + assertThat(((TestSource)testPipeline.getSource()).getFailurePipeline(), equalTo(failurePipeline)); + + Collection sinks = testPipeline.getSinks(); + List> processorSets = testPipeline.getProcessorSets(); + processorSets.forEach(processorSet -> processorSet.forEach(processor -> { + assertThat(((TestProcessor)processor).getFailurePipeline(), equalTo(failurePipeline)); + })); + for (Sink sink: sinks) { + assertThat(((TestSink)sink).getFailurePipeline(), equalTo(failurePipeline)); + } + failurePipeline.setAcknowledgementsEnabled(false); + verify(headlessPipelineSource).setAcknowledgementsEnabled(false); + testPipeline.setAcknowledgementsEnabled(false); + Collection> records = mock(Collection.class); + failurePipeline.sendEvents(records); + verify(headlessPipelineSource).sendEvents(records); + //assertThat(testPipeline.areAcknowledgementsEnabled(), equalTo(false)); + + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java index f0ae429bb9..fd596f3702 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/common/TestProcessor.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -17,6 +18,7 @@ @DataPrepperPlugin(name = "test_processor", pluginType = Processor.class) public class TestProcessor implements Processor, Record> { public boolean isShutdown = false; + private HeadlessPipeline failurePipeline; public TestProcessor(final PluginSetting pluginSetting) {} @@ -35,6 +37,15 @@ public boolean isReadyForShutdown() { return true; } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + @Override public void shutdown() { isShutdown = true; diff --git a/data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml b/data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml new file mode 100644 index 0000000000..eeb748a15e --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml @@ -0,0 +1,37 @@ +entry-pipeline: + source: + random: + route: + - "raw" : "/value == raw" + - "service" : "/value == service" + sink: + - pipeline: + name: "raw-pipeline" + routes: + - raw + - pipeline: + name: "service-map-pipeline" + routes: + - service +raw-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - string_converter: + upper_case: true + sink: + - stdout: +service-map-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - string_converter: + upper_case: true + sink: + - stdout: + +dlq_pipeline: + sink: + - stdout: diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java index 6b048f629c..4215409770 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfiguration.java @@ -76,7 +76,9 @@ public Integer getReadBatchDelay() { } public void updateCommonPipelineConfiguration(final String pipelineName) { - updatePluginSetting(sourcePluginSetting, pipelineName); + if (sourcePluginSetting != null) { + updatePluginSetting(sourcePluginSetting, pipelineName); + } updatePluginSetting(bufferPluginSetting, pipelineName); processorPluginSettings.forEach(processorPluginSettings -> updatePluginSetting(processorPluginSettings, pipelineName)); @@ -92,7 +94,7 @@ private void updatePluginSetting( private PluginSetting getSourceFromPluginModel(final PluginModel pluginModel) { if (pluginModel == null) { - throw new IllegalArgumentException("Invalid configuration, source is a required component"); + return null; } return getPluginSettingFromPluginModel(pluginModel); } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index 3424e7ec5f..28a08e03b7 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -116,6 +116,7 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS final DataPrepperPlugin pluginAnnotation = pluginClass.getAnnotation(DataPrepperPlugin.class); final Class pluginConfigurationType = pluginAnnotation.pluginConfigurationType(); + final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting); final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java index 1e2742a0ba..f0e840c79b 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.sink.Sink; import java.time.Duration; @@ -23,6 +24,7 @@ public class TestSink implements Sink> { private final boolean failSinkForTest; private boolean ready; private Instant readyTime; + private HeadlessPipeline failurePipeline; public TestSink() { this.failSinkForTest = false; @@ -51,6 +53,15 @@ public void output(Collection> records) { records.stream().collect(Collectors.toCollection(() -> collectedRecords)); } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + @Override public void shutdown() { isShutdown = true; diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java index 2ad29f2650..dfa09ef742 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSource.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; @@ -22,6 +23,7 @@ public class TestSource implements Source> { .map(Record::new).collect(Collectors.toList()); private boolean isStopRequested; private boolean failSourceForTest; + private HeadlessPipeline failurePipeline; public TestSource() { this.isStopRequested = false; @@ -48,6 +50,15 @@ public void start(Buffer> buffer) { } } + @Override + public void setFailurePipeline(final HeadlessPipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public HeadlessPipeline getFailurePipeline() { + return failurePipeline; + } + @Override public void stop() { isStopRequested = true; diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java index adeb498bfb..617e7873b3 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java @@ -27,7 +27,7 @@ public AwsSecretsPluginConfigValueTranslator(final SecretsSupplier secretsSuppli @Override public Object translate(final String value) { - final Matcher matcher = SECRETS_REF_PATTERN.matcher(value); +x final Matcher matcher = SECRETS_REF_PATTERN.matcher(value); if (matcher.matches()) { final String secretId = matcher.group(SECRET_CONFIGURATION_ID_GROUP); final String secretKey = matcher.group(SECRET_KEY_GROUP); From 6d0bd921e396bb8b5ad05e406243990ed214e775 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 11 Aug 2025 08:53:12 -0700 Subject: [PATCH 6/8] Fixed javadoc error Signed-off-by: Kondaka --- .../opensearch/dataprepper/model/pipeline/HeadlessPipeline.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java index b2ce928c04..c47eca24bd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/pipeline/HeadlessPipeline.java @@ -22,7 +22,7 @@ public interface HeadlessPipeline { /** * sends events to the headless pipeline * - * @param records records to be sent to headless pipeline + * @param events records to be sent to headless pipeline * @since 2.13 */ void sendEvents(Collection> events); From b9970e0b75f2108852a0343cfa16ea7ed4b6185f Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 11 Aug 2025 09:11:14 -0700 Subject: [PATCH 7/8] Removed unnecessary change to AwsSecretsPluginConfigValueTranslator.java Signed-off-by: Kondaka --- .../plugins/aws/AwsSecretsPluginConfigValueTranslator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java index 617e7873b3..adeb498bfb 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslator.java @@ -27,7 +27,7 @@ public AwsSecretsPluginConfigValueTranslator(final SecretsSupplier secretsSuppli @Override public Object translate(final String value) { -x final Matcher matcher = SECRETS_REF_PATTERN.matcher(value); + final Matcher matcher = SECRETS_REF_PATTERN.matcher(value); if (matcher.matches()) { final String secretId = matcher.group(SECRET_CONFIGURATION_ID_GROUP); final String secretKey = matcher.group(SECRET_KEY_GROUP); From fcdb2f32eb213f4ea9bf62d63aa5071bf7daa8c1 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 11 Aug 2025 13:04:13 -0700 Subject: [PATCH 8/8] Addressed review comments Signed-off-by: Kondaka --- .../core/parser/PipelineTransformer.java | 11 +++++++++++ .../core/pipeline/HeadlessPipelineSource.java | 2 +- .../opensearch/dataprepper/TestDataProvider.java | 1 + .../core/parser/PipelineTransformerTests.java | 10 ++++++++++ .../core/pipeline/HeadlessPipelineSourceTest.java | 1 - .../dataprepper/core/pipeline/PipelineTests.java | 2 +- ...ssing_source_multiple_pipeline_configuration.yml | 13 +++++++++++++ .../parser/model/PipelineConfigurationTests.java | 13 ------------- 8 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index 3ddfe9367d..5624fba79d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -133,6 +133,17 @@ private void buildPipelineFromConfiguration( Source source; if (!pipelineName.equals(failurePipelineName)) { final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); + if (sourceSetting == null) { + Exception e = new IllegalArgumentException(String.format("{}: Source must not be null", pipelineName)); + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName("UNKNOWN") + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return; + } final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, pipelineMap, pipelineConfigurationMap); source = pipelineSource.orElseGet(() -> { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java index ad0ee012b0..47eaab1294 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSource.java @@ -79,7 +79,7 @@ public void sendEvents(Collection> records) { numberOfEventsSuccessful.increment(records.size()); break; } catch (Exception e) { - LOG.error(NOISY, "Failed to write to failure pipeline"); + LOG.error(NOISY, "Failed to write to failure pipeline.", e); if (acknowledgementsEnabled) { /* If acknowledgements enabled, better to retry here than retrying from the beginning */ try { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index b0b6885d92..47d1d57d63 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -24,6 +24,7 @@ public class TestDataProvider { public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml"; public static final String INCORRECT_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/incorrect_source_multiple_pipeline_configuration.yml"; public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml"; + public static final String MISSING_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_source_multiple_pipeline_configuration.yml"; public static final String MISSING_PIPELINE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_pipeline_multiple_pipeline_configuration.yml"; public static final String VALID_MULTIPLE_SINKS_CONFIG_FILE = "src/test/resources/valid_multiple_sinks.yml"; public static final String VALID_MULTIPLE_SINKS_WITH_FAILURE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_sinks_with_routes_with_failure_pipeline.yml"; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 9c79fd8f6d..f11220002a 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -342,6 +342,16 @@ void parseConfiguration_with_missing_pipeline_name_should_throw() { verify(dataPrepperConfiguration).getPipelineExtensions(); } + @Test + void parseConfiguration_with_missing_source_should_fail() { + final PipelineTransformer pipelineTransformer = + createObjectUnderTest(TestDataProvider.MISSING_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE); + + final Map connectedPipelines =pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel); + assertThat(connectedPipelines.size(), equalTo(0)); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(2)); + } + @Test void parseConfiguration_with_missing_pipeline_name_in_multiple_pipelines_should_throw() { final PipelineTransformer pipelineTransformer = diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java index 9f2107bd8f..6bc0df0740 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/HeadlessPipelineSourceTest.java @@ -86,7 +86,6 @@ public void testSendEventsException() throws Exception { } @Test - //@Timeout(value = 2000, unit = TimeUnit.MILLISECONDS) public void testSendEventsExceptionWithAcknowledgements() throws Exception { doThrow(RuntimeException.class).when(buffer).writeAll(any(Collection.class), any(Integer.class)); headlessPipelineSource = createObjectUnderTest(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java index 158e971ac2..434eab2d50 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java @@ -712,6 +712,7 @@ void testFailurePipeline() throws InterruptedException { processorSets.forEach(processorSet -> processorSet.forEach(processor -> { assertThat(((TestProcessor)processor).getFailurePipeline(), equalTo(failurePipeline)); })); + assertThat(sinks.size(), equalTo(1)); for (Sink sink: sinks) { assertThat(((TestSink)sink).getFailurePipeline(), equalTo(failurePipeline)); } @@ -721,7 +722,6 @@ void testFailurePipeline() throws InterruptedException { Collection> records = mock(Collection.class); failurePipeline.sendEvents(records); verify(headlessPipelineSource).sendEvents(records); - //assertThat(testPipeline.areAcknowledgementsEnabled(), equalTo(false)); } } diff --git a/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml b/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml new file mode 100644 index 0000000000..37a59f7eb4 --- /dev/null +++ b/data-prepper-core/src/test/resources/missing_source_multiple_pipeline_configuration.yml @@ -0,0 +1,13 @@ +# this configuration file is solely for testing formatting +test-pipeline-1: + buffer: + bounded_blocking: #to check non object nodes for plugins + sink: + - pipeline: + name: "test-pipeline-2" +test-pipeline-2: + source: + pipeline: + name: "test-pipeline-1" + sink: + - file: diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java index 8baaad331e..f1f21d52a3 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/model/PipelineConfigurationTests.java @@ -112,19 +112,6 @@ void testOnlySourceAndSink() { assertThat(pipelineConfiguration.getReadBatchDelay(), CoreMatchers.is(TestConfigurationProvider.DEFAULT_READ_BATCH_DELAY)); } - @Test - void testNoSourceConfiguration() { - final PipelineModel pipelineModel = mock(PipelineModel.class); - when(pipelineModel.getProcessors()).thenReturn(processors); - when(pipelineModel.getSinks()).thenReturn(sinks); - when(pipelineModel.getWorkers()).thenReturn(TestConfigurationProvider.TEST_WORKERS); - when(pipelineModel.getReadBatchDelay()).thenReturn(TestConfigurationProvider.TEST_DELAY); - - final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); - - assertThat(actual.getMessage(), equalTo("Invalid configuration, source is a required component")); - } - @Test void testNullProcessorConfiguration() { final PipelineModel pipelineModel = mock(PipelineModel.class);