Skip to content

Commit f134755

Browse files
committed
Addressed comments. Added more tests for 100% code coverage
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 0f74631 commit f134755

36 files changed

Lines changed: 454 additions & 119 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1414
import org.opensearch.dataprepper.model.record.Record;
1515
import org.opensearch.dataprepper.model.event.Event;
16-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
16+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1717

1818
import java.time.Instant;
1919
import java.time.Duration;
@@ -39,7 +39,7 @@ public abstract class AbstractBuffer<T extends Record<?>> implements Buffer<T> {
3939
private final Timer latencyTimer;
4040
private final Timer readTimer;
4141
private final Timer checkpointTimer;
42-
private FailurePipeline failurePipeline;
42+
private HeadlessPipeline failurePipeline;
4343

4444
public AbstractBuffer(final PluginSetting pluginSetting) {
4545
this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName());
@@ -65,11 +65,11 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN
6565
}
6666

6767
@Override
68-
public void setFailurePipeline(final FailurePipeline failurePipeline) {
68+
public void setFailurePipeline(final HeadlessPipeline failurePipeline) {
6969
this.failurePipeline = failurePipeline;
7070
}
7171

72-
public FailurePipeline getFailurePipeline() {
72+
public HeadlessPipeline getFailurePipeline() {
7373
return failurePipeline;
7474
}
7575

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import org.opensearch.dataprepper.model.CheckpointState;
99
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
1010
import org.opensearch.dataprepper.model.record.Record;
11-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
11+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1212

1313
import java.time.Duration;
1414
import java.util.Collection;
@@ -146,7 +146,7 @@ default void shutdown() {
146146
* @param failurePipeline failure pipeline
147147
* @since 2.12
148148
*/
149-
default void setFailurePipeline(final FailurePipeline failurePipeline) {
149+
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
150150
}
151151

152152
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public PipelineModel(
7777
@JsonProperty("sink") final List<SinkModel> sinks,
7878
@JsonProperty("workers") final Integer workers,
7979
@JsonProperty("delay") final Integer delay) {
80-
checkArgument(Objects.nonNull(source), "Source must not be null");
8180
checkArgument(Objects.nonNull(sinks), "Sinks must not be null");
8281
checkArgument(sinks.size() > 0, "PipelineModel must include at least 1 sink");
8382

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java

Lines changed: 0 additions & 14 deletions
This file was deleted.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.pipeline;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
11+
import java.util.Collection;
12+
13+
public interface HeadlessPipeline {
14+
/**
15+
* sets flag to indicate if acknowledgements are enabled
16+
*
17+
* @param acknowledgementsEnabled flag indicating acknowledgements are enabled
18+
* @since 2.13
19+
*/
20+
void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled);
21+
22+
/**
23+
* sends events to the headless pipeline
24+
*
25+
* @param records records to be sent to headless pipeline
26+
* @since 2.13
27+
*/
28+
void sendEvents(Collection<Record<Event>> events);
29+
30+
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.opensearch.dataprepper.metrics.PluginMetrics;
1212
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1313
import org.opensearch.dataprepper.model.record.Record;
14-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
14+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1515

1616
import java.util.Collection;
1717

@@ -27,7 +27,7 @@ public abstract class AbstractProcessor<InputRecord extends Record<?>, OutputRec
2727
private final Counter recordsInCounter;
2828
private final Counter recordsOutCounter;
2929
private final Timer timeElapsedTimer;
30-
private FailurePipeline failurePipeline;
30+
private HeadlessPipeline failurePipeline;
3131

3232
public AbstractProcessor(final PluginSetting pluginSetting) {
3333
pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
@@ -67,11 +67,11 @@ public Collection<OutputRecord> execute(final Collection<InputRecord> records) {
6767
public abstract Collection<OutputRecord> doExecute(Collection<InputRecord> records);
6868

6969
@Override
70-
public void setFailurePipeline(FailurePipeline failurePipeline) {
70+
public void setFailurePipeline(HeadlessPipeline failurePipeline) {
7171
this.failurePipeline = failurePipeline;
7272
}
7373

74-
public FailurePipeline getFailurePipeline() {
74+
public HeadlessPipeline getFailurePipeline() {
7575
return failurePipeline;
7676
}
7777
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package org.opensearch.dataprepper.model.processor;
77

88
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
9-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
9+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1010
import org.opensearch.dataprepper.model.record.Record;
1111

1212
import java.util.Collection;
@@ -66,7 +66,7 @@ default boolean holdsEvents() {
6666
* @param failurePipeline failure pipeline
6767
* @since 2.12
6868
*/
69-
default void setFailurePipeline(final FailurePipeline failurePipeline) {
69+
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
7070
}
7171

7272
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.opensearch.dataprepper.metrics.MetricNames;
1111
import org.opensearch.dataprepper.metrics.PluginMetrics;
1212
import org.opensearch.dataprepper.model.configuration.PluginSetting;
13-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
13+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1414
import org.opensearch.dataprepper.model.record.Record;
1515
import org.opensearch.dataprepper.model.event.Event;
1616

@@ -30,7 +30,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
3030
private int maxRetries;
3131
private int waitTimeMs;
3232
private SinkThread sinkThread;
33-
private FailurePipeline failurePipeline;
33+
private HeadlessPipeline failurePipeline;
3434

3535
public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
3636
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
@@ -72,11 +72,11 @@ public void output(Collection<T> records) {
7272
}
7373

7474
@Override
75-
public void setFailurePipeline(final FailurePipeline failurePipeline) {
75+
public void setFailurePipeline(final HeadlessPipeline failurePipeline) {
7676
this.failurePipeline = failurePipeline;
7777
}
7878

79-
public FailurePipeline getFailurePipeline() {
79+
public HeadlessPipeline getFailurePipeline() {
8080
return failurePipeline;
8181
}
8282

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package org.opensearch.dataprepper.model.sink;
77

88
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
9-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
9+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1010
import org.opensearch.dataprepper.model.record.Record;
1111

1212
import java.util.Collection;
@@ -55,7 +55,7 @@ default void updateLatencyMetrics(final Collection<T> events) {
5555
* @param failurePipeline failure pipeline
5656
* @since 2.12
5757
*/
58-
default void setFailurePipeline(final FailurePipeline failurePipeline) {
58+
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
5959
}
6060

6161
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/Source.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import org.opensearch.dataprepper.model.buffer.Buffer;
99
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
1010
import org.opensearch.dataprepper.model.record.Record;
11-
import org.opensearch.dataprepper.model.failures.FailurePipeline;
11+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1212
import org.opensearch.dataprepper.model.codec.HasByteDecoder;
1313

1414
/**
@@ -46,7 +46,7 @@ default boolean areAcknowledgementsEnabled() {
4646
* @param failurePipeline failure pipeline
4747
* @since 2.12
4848
*/
49-
default void setFailurePipeline(final FailurePipeline failurePipeline) {
49+
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
5050
}
5151

5252
}

0 commit comments

Comments
 (0)