Skip to content

Commit fd78dc1

Browse files
committed
Initial implementation of Pipeline DLQ
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 76b06ef commit fd78dc1

27 files changed

Lines changed: 178 additions & 44 deletions

File tree

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model;
7+
8+
import org.opensearch.dataprepper.model.source.Source;
9+
10+
public interface PipelineIf {
11+
Source getSource();
12+
}
13+
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.failures;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
11+
import java.util.Collection;
12+
public interface FailurePipeline {
13+
void sendFailedEvents(Collection<Record<Event>> events);
14+
}
15+
16+

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import io.micrometer.core.instrument.Timer;
1010
import org.opensearch.dataprepper.metrics.MetricNames;
1111
import org.opensearch.dataprepper.metrics.PluginMetrics;
12+
import org.opensearch.dataprepper.model.PipelineIf;
13+
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
1214
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1315
import org.opensearch.dataprepper.model.record.Record;
1416
import org.opensearch.dataprepper.model.event.Event;
@@ -64,16 +66,16 @@ public void initialize() {
6466
* @param records the records to write to the sink.
6567
*/
6668
@Override
67-
public void output(Collection<T> records) {
69+
public void output(Collection<T> records, final PipelineIf failurePipeline) {
6870
recordsInCounter.increment(records.size()*1.0);
69-
timeElapsedTimer.record(() -> doOutput(records));
71+
timeElapsedTimer.record(() -> doOutput(records, failurePipeline));
7072
}
7173

7274
/**
7375
* This method should implement the output logic
7476
* @param records Records to be output
7577
*/
76-
public abstract void doOutput(Collection<T> records);
78+
public abstract void doOutput(Collection<T> records, final PipelineIf failurePipeline);
7779

7880
@Override
7981
public void shutdown() {

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.model.sink;
77

88
import org.opensearch.dataprepper.model.record.Record;
9+
import org.opensearch.dataprepper.model.PipelineIf;
910

1011
import java.util.Collection;
1112

@@ -20,7 +21,11 @@ public interface Sink<T extends Record<?>> {
2021
*
2122
* @param records the records to write to the sink.
2223
*/
23-
void output(Collection<T> records);
24+
default void output(Collection<T> records) {
25+
output(records, null);
26+
}
27+
28+
void output(Collection<T> records, PipelineIf failurePipeline);
2429

2530
/**
2631
* Prepare sink for shutdown, by cleaning up resources and threads.

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.model.sink;
77

88
import org.opensearch.dataprepper.model.record.Record;
9+
import org.opensearch.dataprepper.model.PipelineIf;
910
import org.junit.jupiter.api.Test;
1011

1112
import java.util.Collection;
@@ -28,7 +29,7 @@ public void initialize() {
2829
}
2930

3031
@Override
31-
public void output(Collection<Record<?>> records) {
32+
public void output(Collection<Record<?>> records, PipelineIf failurePipeline) {
3233
}
3334

3435
};

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
99
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
1010
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
11+
import org.opensearch.dataprepper.model.PipelineIf;
1112
import org.opensearch.dataprepper.model.event.Event;
1213
import org.opensearch.dataprepper.model.event.EventHandle;
1314
import org.opensearch.dataprepper.model.record.Record;
@@ -34,7 +35,7 @@ public InMemorySink(final InMemoryConfig inMemoryConfig,
3435
}
3536

3637
@Override
37-
public void output(final Collection<Record<Event>> records) {
38+
public void output(final Collection<Record<Event>> records, final PipelineIf failurePipeline) {
3839
inMemorySinkAccessor.addEvents(testingKey, records);
3940
boolean result = inMemorySinkAccessor.getResult();
4041
records.stream().forEach((record) -> {

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.opensearch.dataprepper.DataPrepperShutdownListener;
1010
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
1111
import org.opensearch.dataprepper.core.parser.PipelineTransformer;
12+
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
1213
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderServer;
1314
import org.opensearch.dataprepper.core.pipeline.Pipeline;
1415
import org.opensearch.dataprepper.core.pipeline.PipelineObserver;

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
1212
import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator;
1313
import org.opensearch.dataprepper.core.pipeline.Pipeline;
14+
import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource;
1415
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
1516
import org.opensearch.dataprepper.core.pipeline.router.Router;
1617
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
@@ -59,6 +60,7 @@ public class PipelineTransformer {
5960
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.";
6061
private static final String PIPELINE_TYPE = "pipeline";
6162
private static final String ATTRIBUTE_NAME = "name";
63+
//private static final String FAILURE_PIPELINE_NAME = "dlq";
6264
private final PipelinesDataFlowModel pipelinesDataFlowModel;
6365
private final RouterFactory routerFactory;
6466
private final DataPrepperConfiguration dataPrepperConfiguration;
@@ -128,24 +130,30 @@ private void buildPipelineFromConfiguration(
128130
final Map<String, Pipeline> pipelineMap) {
129131
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName);
130132
LOG.info("Building pipeline [{}] from provided configuration", pipelineName);
133+
final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName();
131134
try {
135+
Source source;
132136
final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting();
133-
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
134-
pipelineMap, pipelineConfigurationMap);
135-
final Source source = pipelineSource.orElseGet(() -> {
136-
try {
137-
return pluginFactory.loadPlugin(Source.class, sourceSetting);
138-
} catch (Exception e) {
139-
final PluginError pluginError = PluginError.builder()
140-
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
141-
.pipelineName(pipelineName)
142-
.pluginName(sourceSetting.getName())
143-
.exception(e)
144-
.build();
145-
pluginErrorCollector.collectPluginError(pluginError);
146-
return null;
147-
}
148-
});
137+
if (!pipelineName.equals(failurePipelineName)) {
138+
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
139+
pipelineMap, pipelineConfigurationMap);
140+
source = pipelineSource.orElseGet(() -> {
141+
try {
142+
return pluginFactory.loadPlugin(Source.class, sourceSetting);
143+
} catch (Exception e) {
144+
final PluginError pluginError = PluginError.builder()
145+
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
146+
.pipelineName(pipelineName)
147+
.pluginName(sourceSetting.getName())
148+
.exception(e)
149+
.build();
150+
pluginErrorCollector.collectPluginError(pluginError);
151+
return null;
152+
}
153+
});
154+
} else {
155+
source = new FailurePipelineSource();
156+
}
149157

150158
LOG.info("Building buffer for the pipeline [{}]", pipelineName);
151159
Buffer pipelineDefinedBuffer = null;
@@ -234,7 +242,14 @@ private void buildPipelineFromConfiguration(
234242
"pipelines", pipelineName, ex);
235243
processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap);
236244
}
237-
245+
final Pipeline failurePipeline = pipelineMap.get(failurePipelineName);
246+
if (failurePipeline != null) {
247+
for (Map.Entry<String, Pipeline> pipelineEntry : pipelineMap.entrySet()) {
248+
if (!(pipelineEntry.getKey().equals(failurePipelineName))) {
249+
pipelineEntry.getValue().setFailurePipeline(failurePipeline);
250+
}
251+
}
252+
}
238253
}
239254

240255
private List<IdentifiedComponent<Processor>> newProcessor(final PluginSetting pluginSetting) {

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
3535
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);
3636

37+
static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq";
3738
private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";
3839

3940
static final int MAX_TAGS_NUMBER = 3;
@@ -213,6 +214,8 @@ public void setMetricTagFilters(final List<MetricTagFilter> metricTagFilters) {
213214
}
214215
}
215216

217+
public String getFailurePipelineName() { return DEFAULT_FAILURE_PIPELINE_NAME; }
218+
216219
public Duration getProcessorShutdownTimeout() {
217220
return processorShutdownTimeout;
218221
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.core.pipeline;
7+
8+
import org.opensearch.dataprepper.model.source.Source;
9+
import org.opensearch.dataprepper.model.failures.FailurePipeline;
10+
import org.opensearch.dataprepper.model.record.Record;
11+
import org.opensearch.dataprepper.model.event.Event;
12+
import org.opensearch.dataprepper.model.buffer.Buffer;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.util.Collection;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
19+
public class FailurePipelineSource implements Source<Record<Event>>, FailurePipeline {
20+
private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class);
21+
private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE;
22+
private Buffer buffer;
23+
private AtomicBoolean isStopRequested;
24+
25+
public FailurePipelineSource() {
26+
isStopRequested = new AtomicBoolean(false);
27+
}
28+
29+
@Override
30+
public void start(Buffer buffer) {
31+
this.buffer = buffer;
32+
}
33+
34+
@Override
35+
public void stop() {
36+
isStopRequested.set(true);
37+
}
38+
39+
@Override
40+
public void sendFailedEvents(Collection<Record<Event>> records) {
41+
try {
42+
buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT);
43+
} catch (Exception e) {
44+
LOG.error("Failed to write to failure pipeline");
45+
}
46+
}
47+
48+
}

0 commit comments

Comments
 (0)