Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Map;

public class SinkForwardConfig {
@JsonProperty("pipelines")
List<String> pipelineNames;

@JsonProperty("with_metadata")
Map<String, Object> withMetadata;

@JsonProperty("with_data")
Map<String, Object> withData;
Comment on lines +18 to +22

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the use case for this?

It's just like an add_entries?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. These will be specific to sink and can only be determined by the sink - for example, documentId after a document is successfully stored in opensearch. The values of the map may need to support reserved words like documentId which needs to be implemented


@JsonCreator
public SinkForwardConfig() {
}

@JsonCreator
public SinkForwardConfig(
@JsonProperty("pipelines") final List<String> pipelineNames,
@JsonProperty("with_data") final Map<String, Object> withData,
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
this.pipelineNames = pipelineNames;
this.withData = withData;
this.withMetadata = withMetadata;
}

public List<String> getPipelineNames() {
return pipelineNames;
}

public Map<String, Object> getWithMetadata() {
return withMetadata;
}

public Map<String, Object> getWithData() {
return withData;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
public class SinkModel extends PluginModel {

public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings));
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, null, pluginSettings));
}

public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginSettings));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
Expand All @@ -56,6 +60,9 @@ public List<String> getExcludeKeys() {
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
}

public SinkForwardConfig getForwardConfig() {
return this.<SinkInternalJsonModel>getInternalJsonModel().forwardConfig;
}

/**
* Gets the tags target key associated with this Sink.
Expand All @@ -75,17 +82,19 @@ public static class SinkModelBuilder {

private final List<String> includeKeys;
private final List<String> excludeKeys;
private final SinkForwardConfig forwardConfig;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
this.includeKeys = Collections.emptyList();
this.excludeKeys = Collections.emptyList();
this.forwardConfig = null;
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, pluginModel.getPluginSettings());
}
}

Expand All @@ -111,16 +120,21 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("exclude_keys")
private final List<String> excludeKeys;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("forward_to")
private final SinkForwardConfig forwardConfig;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys) {
this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>());
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("forward_to") final SinkForwardConfig forwardConfig) {
this(routes, tagsTargetKey, includeKeys, excludeKeys, forwardConfig, new HashMap<>());
}

private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final SinkForwardConfig forwardConfig, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : Collections.emptyList();
this.includeKeys = includeKeys != null ? validateKeys(includeKeys, "include_keys") : Collections.emptyList();
this.excludeKeys = excludeKeys != null ? validateKeys(excludeKeys, "exclude_keys") : Collections.emptyList();
this.forwardConfig = forwardConfig;
this.tagsTargetKey = tagsTargetKey;
validateConfiguration();
}
Expand All @@ -146,7 +160,7 @@ private static List<String> validateKeys(List<String> input, String tag) {

static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null, null));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@

package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Data Prepper Sink Context class. This the class for keeping global
Expand All @@ -15,20 +22,82 @@
public class SinkContext {
private final String tagsTargetKey;
private final Collection<String> routes;

private final List<String> includeKeys;
private final List<String> excludeKeys;

private Map<String, HeadlessPipeline> forwardToPipelines;

public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
this.forwardToPipelines = new HashMap<>();
}

public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, final List<String> forwardPipelineNames) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
this.forwardToPipelines = new HashMap<>();
if (forwardPipelineNames != null) {
for (final String forwardPipelineName: forwardPipelineNames) {
this.forwardToPipelines.put(forwardPipelineName, null);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is value null here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, there are no "pipeline"s attached to to a given pipeline name. It's possible that the pipeline name is incorrect. The code below will match the names with the pipelines.

}
}
}

public SinkContext(String tagsTargetKey) {
this(tagsTargetKey, null, null, null);
this(tagsTargetKey, null, null, null, null);
}

public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines) {
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
final String key = entry.getKey();
final HeadlessPipeline pipeline = pipelines.get(key);
if (pipeline != null) {
forwardToPipelines.put(key, pipeline);
} else {
throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only catch this at runtime? We could validate on startup if a pipeline configured in forward_to doesn't exist right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We can add additional validation. But the order of events may make it difficult. The sink gets instantiated first and then the pipelines are instantiated so at the time of sink construction we do not know if a pipeline is present or not.

}
}
}

public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
if (forwardToPipelines.size() == 0) {
return false;
}

for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
if (entry.getValue() == null) {
return false;
}
}
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();

if (records.size() == 0) {
return true;
}

records.forEach((record) -> {
Event event = record.getData();
if (withData != null && !withData.isEmpty()) {
for (Map.Entry<String, Object> entry: withData.entrySet()) {
event.put(entry.getKey(), entry.getValue());
}
}
if (withMetadata != null && !withMetadata.isEmpty()) {
EventMetadata metadata = event.getMetadata();
for (Map.Entry<String, Object> entry: withMetadata.entrySet()) {
metadata.setAttribute(entry.getKey(), entry.getValue());
}
}
});

for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
entry.getValue().sendEvents(records);
}
return true;
}

/**
Expand Down Expand Up @@ -56,5 +125,9 @@ public List<String> getIncludeKeys() {
public List<String> getExcludeKeys() {
return excludeKeys;
}

public Map<String, HeadlessPipeline> getForwardToPipelines() {
return forwardToPipelines;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.InternalEventHandle;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class SinkForwardRecordsContext {
List<Record<Event>> records;
boolean forwardPipelinesPresent;

public SinkForwardRecordsContext(SinkContext sinkContext) {
forwardPipelinesPresent = (sinkContext != null && sinkContext.getForwardToPipelines().size() > 0);
records = new ArrayList<>();
}

public void addRecord(Record<Event> record) {
if (!forwardPipelinesPresent)
return;
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
if (eventHandle != null) {
eventHandle.acquireReference();
}
records.add(record);
}

public void addRecords(Collection<Record<Event>> newRecords) {
if (!forwardPipelinesPresent)
return;
newRecords.forEach((record) -> {
Event event = record.getData();
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();
if (eventHandle != null) {
eventHandle.acquireReference();
}
});
records.addAll(newRecords);
}

public List<Record<Event>> getRecords() {
return records;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.configuration;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.List;
import java.util.Map;

public class SinkForwardConfigTest {

@Test
void testDefaults() {
final SinkForwardConfig sinkForwardConfig = new SinkForwardConfig();
assertThat(sinkForwardConfig.getPipelineNames(), nullValue());
assertThat(sinkForwardConfig.getWithData(), nullValue());
assertThat(sinkForwardConfig.getWithMetadata(), nullValue());
}

@Test
void testCustomValues() {
List<String> pipelines = mock(List.class);
Map<String, Object> withData = mock(Map.class);
Map<String, Object> withMetadata = mock(Map.class);
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ void sinkModel_with_exclude_keys() {

}

@Test
void sinkModel_with_forward_pipelines() {
SinkForwardConfig sinkForwardConfig = mock(SinkForwardConfig.class);
List<String> forwardPipelineList = List.of("forward-pipeline1", "forward-pipeline2");
when(sinkForwardConfig.getPipelineNames()).thenReturn(forwardPipelineList);
final Map<String, Object> pluginSettings = new LinkedHashMap<>();

final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), sinkForwardConfig, pluginSettings);

assertThat(sinkModel.getForwardConfig().getPipelineNames(), equalTo(forwardPipelineList));
}

@Test
void sinkModel_with_invalid_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
Expand Down Expand Up @@ -213,12 +225,12 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
assertThat(actualSinkModel.getExcludeKeys(), notNullValue());
assertThat(actualSinkModel.getExcludeKeys(), empty());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
assertThat(actualSinkModel.getForwardConfig(), nullValue());

}
}

private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
Loading
Loading