Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;

import java.time.Instant;
import java.time.Duration;
Expand All @@ -38,6 +39,7 @@ public abstract class AbstractBuffer<T extends Record<?>> implements Buffer<T> {
private final Timer latencyTimer;
private final Timer readTimer;
private final Timer checkpointTimer;
private HeadlessPipeline failurePipeline;

public AbstractBuffer(final PluginSetting pluginSetting) {
this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName());
Expand All @@ -62,6 +64,15 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN
this.checkpointTimer = pluginMetrics.timer(MetricNames.CHECKPOINT_TIME_ELAPSED);
}

@Override
public void setFailurePipeline(final HeadlessPipeline failurePipeline) {
this.failurePipeline = failurePipeline;
}

public HeadlessPipeline getFailurePipeline() {
return failurePipeline;
}

/**
* Records metrics for ingress, time elapsed, and timeouts, while calling the doWrite method
* to perform the actual write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;

import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -138,4 +139,14 @@ default boolean isWrittenOffHeapOnly() {
*/
default void shutdown() {
}

/**
* Sets default failure pipeline of a source

* @param failurePipeline failure pipeline
* @since 2.12
*/
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public PipelineModel(
@JsonProperty("sink") final List<SinkModel> sinks,
@JsonProperty("workers") final Integer workers,
@JsonProperty("delay") final Integer delay) {
checkArgument(Objects.nonNull(source), "Source must not be null");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is source not required anymore? Doesn't even DLQ pipeline have a pipeline source?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PIpelineModel, it does not.

@kkondaka , Do we have validations elsewhere on this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 DLQ pipelines do not have source because source/processor/buffer/sink can end events to DLQ pipeline. So, the source is the new "HeadlessPipelineSource" that I added. This source is created automatically for a DLQ pipeline. It is not configurable.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable. Yes, there are validations that fail if a source is not specified.

checkArgument(Objects.nonNull(sinks), "Sinks must not be null");
checkArgument(sinks.size() > 0, "PipelineModel must include at least 1 sink");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.pipeline;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;

public interface HeadlessPipeline {
/**
* sets flag to indicate if acknowledgements are enabled
*
* @param acknowledgementsEnabled flag indicating acknowledgements are enabled
* @since 2.13
*/
void setAcknowledgementsEnabled(final boolean acknowledgementsEnabled);

/**
* sends events to the headless pipeline
*
* @param events records to be sent to headless pipeline
* @since 2.13
*/
void sendEvents(Collection<Record<Event>> events);

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;

import java.util.Collection;

Expand All @@ -26,6 +27,7 @@ public abstract class AbstractProcessor<InputRecord extends Record<?>, OutputRec
private final Counter recordsInCounter;
private final Counter recordsOutCounter;
private final Timer timeElapsedTimer;
private HeadlessPipeline failurePipeline;

public AbstractProcessor(final PluginSetting pluginSetting) {
pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand Down Expand Up @@ -63,4 +65,13 @@ public Collection<OutputRecord> execute(final Collection<InputRecord> records) {
* @return Processed records
*/
public abstract Collection<OutputRecord> doExecute(Collection<InputRecord> records);

@Override
public void setFailurePipeline(HeadlessPipeline failurePipeline) {
this.failurePipeline = failurePipeline;
}

public HeadlessPipeline getFailurePipeline() {
return failurePipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.processor;

import org.opensearch.dataprepper.model.plugin.PluginComponentType;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;
Expand Down Expand Up @@ -58,4 +59,14 @@ default boolean holdsEvents() {
* Final shutdown call to clean up any resources that need to be closed.
*/
void shutdown();

/**
* Sets default failure pipeline of a source

* @param failurePipeline failure pipeline
* @since 2.12
*/
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;

Expand All @@ -29,6 +30,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;
private HeadlessPipeline failurePipeline;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand Down Expand Up @@ -69,6 +71,15 @@ public void output(Collection<T> records) {
timeElapsedTimer.record(() -> doOutput(records));
}

@Override
public void setFailurePipeline(final HeadlessPipeline failurePipeline) {
this.failurePipeline = failurePipeline;
}

public HeadlessPipeline getFailurePipeline() {
return failurePipeline;
}

/**
* This method should implement the output logic
* @param records Records to be output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.plugin.PluginComponentType;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;
Expand Down Expand Up @@ -48,4 +49,13 @@ public interface Sink<T extends Record<?>> {
default void updateLatencyMetrics(final Collection<T> events) {
}

/**
* Sets default failure pipeline of a source

* @param failurePipeline failure pipeline
* @since 2.12
*/
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.codec.HasByteDecoder;

/**
Expand Down Expand Up @@ -39,4 +40,13 @@ default boolean areAcknowledgementsEnabled() {
return false;
}

/**
* Sets default failure pipeline of a source

* @param failurePipeline failure pipeline
* @since 2.12
*/
default void setFailurePipeline(final HeadlessPipeline failurePipeline) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand All @@ -31,6 +32,9 @@
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static org.mockito.Mockito.mock;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -181,6 +185,14 @@ public void testWriteBytes() throws TimeoutException {
assertThrows(RuntimeException.class, () -> abstractBuffer.writeBytes(bytes, "", 10));
}

@Test
void testGetAndSetFailurePipeline() {
final AbstractBuffer<Record<String>> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting);
HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class);
abstractBuffer.setFailurePipeline(failurePipeline);
assertThat(abstractBuffer.getFailurePipeline(), sameInstance(failurePipeline));
}

@Test
public void testWriteTimeoutMetric() throws TimeoutException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;

import java.time.Duration;
import java.util.Optional;
Expand All @@ -16,7 +17,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doCallRealMethod;

class BufferTest {

Expand Down Expand Up @@ -71,4 +74,11 @@ void testWriteBytes() {

}

@Test
void testSetFailurePipeline() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class);
doCallRealMethod().when(buffer).setFailurePipeline(failurePipeline);
buffer.setFailurePipeline(failurePipeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,6 @@ void testPipelineModelWithValidProcessorConfig() {
assertEquals(expectedPreppersPluginModel, pipelineModel.getProcessors());
}

@Test
void testPipelineModelWithNullSourceThrowsException() {
final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel(
null,
validBufferPluginModel(),
validPreppersPluginModel(),
validPipelineRouter(),
validSinksPluginModel(),
TEST_WORKERS,
TEST_READ_BATCH_DELAY
));

final String expected = "Source must not be null";

assertTrue(exception.getMessage().contains(expected));
}

@Test
void testPipelineModelWithNullSinksThrowsException() {
final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import static org.mockito.Mockito.mock;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -93,6 +97,20 @@ public void testMetricsWithPluginMetricsConstructor() {
0.2));
}

@Test
void testGetAndSetFailurePipeline() {
final String processorName = "testProcessor";
final String pipelineName = "testPipeline";
MetricsTestUtil.initMetrics();

PluginSetting pluginSetting = new PluginSetting(processorName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractProcessor<Record<String>, Record<String>> processor = new ProcessorImpl(pluginSetting);
HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class);
processor.setFailurePipeline(failurePipeline);
assertThat(processor.getFailurePipeline(), sameInstance(failurePipeline));
}

public static class ProcessorImpl extends AbstractProcessor<Record<String>, Record<String>> {
public ProcessorImpl(PluginSetting pluginSetting) {
super(pluginSetting);
Expand All @@ -113,6 +131,7 @@ public Collection<Record<String>> doExecute(Collection<Record<String>> records)
.collect(Collectors.toList());
}


@Override
public void prepareForShutdown() {

Expand All @@ -128,4 +147,4 @@ public void shutdown() {

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package org.opensearch.dataprepper.model.processor;

import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doCallRealMethod;

public class ProcessorTest {

Expand All @@ -19,5 +21,13 @@ public void testDefault() {
when(processor.holdsEvents()).thenCallRealMethod();
assertThat(processor.holdsEvents(), equalTo(false));
}

@Test
public void testSetFailurePipeline() {
Processor processor = mock(Processor.class);
HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class);
doCallRealMethod().when(processor).setFailurePipeline(failurePipeline);
processor.setFailurePipeline(failurePipeline);
}
}

Loading
Loading