Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.configuration;

import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -30,6 +31,9 @@ public SinkForwardConfig(
@JsonProperty("pipelines") final List<String> pipelineNames,
@JsonProperty("with_data") final Map<String, Object> withData,
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
if (pipelineNames.size() != 1) {
throw new InvalidPluginConfigurationException("Supports only one forwarding pipeline");
}
this.pipelineNames = pipelineNames;
this.withData = withData;
this.withMetadata = withMetadata;
Expand All @@ -46,5 +50,6 @@ public Map<String, Object> getWithMetadata() {
public Map<String, Object> getWithData() {
return withData;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecords
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
entry.getValue().sendEvents(records);
}
sinkForwardRecordsContext.clearRecords();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.InternalEventHandle;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -25,28 +24,21 @@ public SinkForwardRecordsContext(SinkContext sinkContext) {
public void addRecord(Record<Event> record) {
if (!forwardPipelinesPresent)
return;
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
if (eventHandle != null) {
eventHandle.acquireReference();
}
records.add(record);
}

public void addRecords(Collection<Record<Event>> newRecords) {
if (!forwardPipelinesPresent)
return;
newRecords.forEach((record) -> {
Event event = record.getData();
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();
if (eventHandle != null) {
eventHandle.acquireReference();
}
});
records.addAll(newRecords);
}

public List<Record<Event>> getRecords() {
return records;
}

public void clearRecords() {
records.clear();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package org.opensearch.dataprepper.model.configuration;

import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Map;
Expand All @@ -26,14 +28,27 @@ void testDefaults() {
}

@Test
void testCustomValues() {
List<String> pipelines = mock(List.class);
void pipelines_lsit_with_one_pipeline_succeeds() {
List<String> pipelines = List.of("pipeline1");
Map<String, Object> withData = mock(Map.class);
Map<String, Object> withMetadata = mock(Map.class);
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
}

@Test
void pipelines_list_with_two_or_more_pipelines_throws_exception() {
List<String> pipelines = List.of("pipeline1", "pipeline2");
Map<String, Object> withData = mock(Map.class);
Map<String, Object> withMetadata = mock(Map.class);
assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(pipelines, withData, withMetadata));
}

@Test
void empty_pipelines_list_throws_exception() {
assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(List.of(), Map.of(), Map.of()));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,21 @@ public void testForwardToPipelinesWithPipelineMap() {
verify(forwardPipeline2, times(0)).sendEvents(eq(records));
sinkForwardRecordsContext.addRecords(records);
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
verify(forwardPipeline1, times(1)).sendEvents(any());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we loosening these verifies? This doesn't seem right.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We are not loosening the verifies. Since the forwardRecords() clears the records It can't be verified like that anymore. The original records would now be empty and the verification would fail

verify(forwardPipeline2, times(1)).sendEvents(any());
verify(event, times(1)).put(any(String.class), any(Object.class));
verify(event, times(1)).getMetadata();
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
records = Collections.singletonList(record);
sinkForwardRecordsContext.addRecords(records);
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, null), equalTo(true));
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
verify(forwardPipeline1, times(2)).sendEvents(any());
verify(forwardPipeline2, times(2)).sendEvents(any());
records = Collections.singletonList(record);
sinkForwardRecordsContext.addRecords(records);
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(true));
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
verify(forwardPipeline1, times(3)).sendEvents(any());
verify(forwardPipeline2, times(3)).sendEvents(any());
}

@Test
Expand Down Expand Up @@ -148,6 +152,7 @@ public void testWithNoForwardToPipelines() {
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
sinkForwardRecordsContext.addRecords(List.of(record));
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false));
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;

import static org.mockito.Mockito.mock;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import java.util.List;
Expand All @@ -30,16 +28,9 @@ public void testSinkForwardRecordContextBasic() {
SinkContext sinkContext = mock(SinkContext.class);
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
Event event = mock(Event.class);
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
doNothing().when(eventHandle).acquireReference();
Record<Event> record1 = mock(Record.class);
Record<Event> record2 = mock(Record.class);
Record<Event> record3 = mock(Record.class);
when(record1.getData()).thenReturn(event);
when(record2.getData()).thenReturn(event);
when(record3.getData()).thenReturn(event);
when(event.getEventHandle()).thenReturn(eventHandle);
sinkForwardRecordsContext.addRecord(record1);
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
Expand All @@ -52,19 +43,28 @@ public void testSinkForwardRecordContextWithForwardingPipelines() {
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
Event event = mock(Event.class);
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
doNothing().when(eventHandle).acquireReference();
Record<Event> record1 = mock(Record.class);
Record<Event> record2 = mock(Record.class);
Record<Event> record3 = mock(Record.class);
when(record1.getData()).thenReturn(event);
when(record2.getData()).thenReturn(event);
when(record3.getData()).thenReturn(event);
when(event.getEventHandle()).thenReturn(eventHandle);
sinkForwardRecordsContext.addRecord(record1);
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
assertThat(records.size(), equalTo(3));
sinkForwardRecordsContext.clearRecords();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be it's own test.

sinkForwardRecordsContext_clearRecords_removes_all_records()

assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
}

@Test
public void testSinkForwardRecordContextClearRecords() {
SinkContext sinkContext = mock(SinkContext.class);
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
Record<Event> record1 = mock(Record.class);
Record<Event> record2 = mock(Record.class);
sinkForwardRecordsContext.addRecords(List.of(record1, record2));
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(2));
sinkForwardRecordsContext.clearRecords();
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -94,6 +95,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -151,21 +153,25 @@ public class OpenSearchSinkIT {
private PluginConfigObservable pluginConfigObservable;

public OpenSearchSink createObjectUnderTest(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) {
sinkContext = mock(SinkContext.class);
when(sinkContext.getTagsTargetKey()).thenReturn(null);
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME);
when(pluginSetting.getName()).thenReturn(PLUGIN_NAME);
OpenSearchSink sink = new OpenSearchSink(
pluginSetting, null, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig);
pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig);
if (doInitialize) {
sink.doInitialize();
}
return sink;
}

public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) {
public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, final Map<String, HeadlessPipeline> forwardPipelineMap, boolean doInitialize) {
sinkContext = mock(SinkContext.class);
testTagsTargetKey = RandomStringUtils.randomAlphabetic(5);
when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey);
when(sinkContext.getForwardToPipelines()).thenReturn(forwardPipelineMap);
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME);
when(pluginSetting.getName()).thenReturn(PLUGIN_NAME);
Expand Down Expand Up @@ -846,6 +852,36 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
}
}

@Test
public void testOutputForwardsCreatedDocumentsToAPipeline() throws IOException, InterruptedException {
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
Map<String, HeadlessPipeline> forwardPipelineMap = Map.of("fwd_pipeline1", forwardPipeline1);
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, forwardPipelineMap, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
assertThat(retSources.size(), equalTo(1));
assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
sink.shutdown();

// verify metrics
final List<Measurement> bulkRequestLatencies = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_LATENCY).toString());
assertThat(bulkRequestLatencies.size(), equalTo(3));
// COUNT
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);
verify(sinkContext).forwardRecords(any(), eq(null), eq(null));
}

@Test
public void testOutputCustomIndex() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
Expand Down Expand Up @@ -1255,7 +1291,7 @@ public void testEventOutputWithTags() throws IOException, InterruptedException {
final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, true);
final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, Map.of(), true);
sink.output(testRecords);

final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
Expand Down
Loading
Loading