-
Notifications
You must be signed in to change notification settings - Fork 325
Add support for forwarding successful records from sinks using SinkContext #5994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
811a132
ff8a07f
fb60535
e50a5d6
2cbd1af
9466560
020f6fd
ef7aaa3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.model.configuration; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| public class SinkForwardConfig { | ||
| @JsonProperty("pipelines") | ||
| List<String> pipelineNames; | ||
|
|
||
| @JsonProperty("with_metadata") | ||
| Map<String, Object> withMetadata; | ||
|
|
||
| @JsonProperty("with_data") | ||
| Map<String, Object> withData; | ||
|
|
||
| @JsonCreator | ||
| public SinkForwardConfig() { | ||
| } | ||
|
|
||
| @JsonCreator | ||
| public SinkForwardConfig( | ||
| @JsonProperty("pipelines") final List<String> pipelineNames, | ||
| @JsonProperty("with_data") final Map<String, Object> withData, | ||
| @JsonProperty("with_metadata") final Map<String, Object> withMetadata) { | ||
| this.pipelineNames = pipelineNames; | ||
| this.withData = withData; | ||
| this.withMetadata = withMetadata; | ||
| } | ||
|
|
||
| public List<String> getPipelineNames() { | ||
| return pipelineNames; | ||
| } | ||
|
|
||
| public Map<String, Object> getWithMetadata() { | ||
| return withMetadata; | ||
| } | ||
|
|
||
| public Map<String, Object> getWithData() { | ||
| return withData; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,8 +5,15 @@ | |
|
|
||
| package org.opensearch.dataprepper.model.sink; | ||
|
|
||
| import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.EventMetadata; | ||
|
|
||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * Data Prepper Sink Context class. This the class for keeping global | ||
|
|
@@ -15,20 +22,82 @@ | |
| public class SinkContext { | ||
| private final String tagsTargetKey; | ||
| private final Collection<String> routes; | ||
|
|
||
| private final List<String> includeKeys; | ||
| private final List<String> excludeKeys; | ||
|
|
||
| private Map<String, HeadlessPipeline> forwardToPipelines; | ||
|
|
||
| public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys) { | ||
| this.tagsTargetKey = tagsTargetKey; | ||
| this.routes = routes; | ||
| this.includeKeys = includeKeys; | ||
| this.excludeKeys = excludeKeys; | ||
| this.forwardToPipelines = new HashMap<>(); | ||
| } | ||
|
|
||
| public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, final List<String> forwardPipelineNames) { | ||
| this.tagsTargetKey = tagsTargetKey; | ||
| this.routes = routes; | ||
| this.includeKeys = includeKeys; | ||
| this.excludeKeys = excludeKeys; | ||
| this.forwardToPipelines = new HashMap<>(); | ||
| if (forwardPipelineNames != null) { | ||
| for (final String forwardPipelineName: forwardPipelineNames) { | ||
| this.forwardToPipelines.put(forwardPipelineName, null); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is value null here?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initially, there are no "pipeline"s attached to to a given pipeline name. It's possible that the pipeline name is incorrect. The code below will match the names with the pipelines. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| public SinkContext(String tagsTargetKey) { | ||
| this(tagsTargetKey, null, null, null); | ||
| this(tagsTargetKey, null, null, null, null); | ||
| } | ||
|
|
||
| public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines) { | ||
| for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) { | ||
| final String key = entry.getKey(); | ||
| final HeadlessPipeline pipeline = pipelines.get(key); | ||
| if (pipeline != null) { | ||
| forwardToPipelines.put(key, pipeline); | ||
| } else { | ||
| throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we only catch this at runtime? We could validate on startup if a pipeline configured in
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. We can add additional validation. But the order of events may make it difficult. The sink gets instantiated first and then the pipelines are instantiated so at the time of sink construction we do not know if a pipeline is present or not. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Map<String, Object> withData, final Map<String, Object> withMetadata) { | ||
| if (forwardToPipelines.size() == 0) { | ||
| return false; | ||
| } | ||
|
|
||
| for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) { | ||
| if (entry.getValue() == null) { | ||
| return false; | ||
| } | ||
| } | ||
| List<Record<Event>> records = sinkForwardRecordsContext.getRecords(); | ||
|
|
||
| if (records.size() == 0) { | ||
| return true; | ||
| } | ||
|
|
||
| records.forEach((record) -> { | ||
| Event event = record.getData(); | ||
| if (withData != null && !withData.isEmpty()) { | ||
| for (Map.Entry<String, Object> entry: withData.entrySet()) { | ||
| event.put(entry.getKey(), entry.getValue()); | ||
| } | ||
| } | ||
| if (withMetadata != null && !withMetadata.isEmpty()) { | ||
| EventMetadata metadata = event.getMetadata(); | ||
| for (Map.Entry<String, Object> entry: withMetadata.entrySet()) { | ||
| metadata.setAttribute(entry.getKey(), entry.getValue()); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) { | ||
| entry.getValue().sendEvents(records); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -56,5 +125,9 @@ public List<String> getIncludeKeys() { | |
| public List<String> getExcludeKeys() { | ||
| return excludeKeys; | ||
| } | ||
|
|
||
| public Map<String, HeadlessPipeline> getForwardToPipelines() { | ||
| return forwardToPipelines; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.model.sink; | ||
|
|
||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.InternalEventHandle; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
|
|
||
| public class SinkForwardRecordsContext { | ||
| List<Record<Event>> records; | ||
| boolean forwardPipelinesPresent; | ||
|
|
||
| public SinkForwardRecordsContext(SinkContext sinkContext) { | ||
| forwardPipelinesPresent = (sinkContext != null && sinkContext.getForwardToPipelines().size() > 0); | ||
| records = new ArrayList<>(); | ||
| } | ||
|
|
||
| public void addRecord(Record<Event> record) { | ||
| if (!forwardPipelinesPresent) | ||
| return; | ||
| InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle(); | ||
| if (eventHandle != null) { | ||
| eventHandle.acquireReference(); | ||
| } | ||
| records.add(record); | ||
| } | ||
|
|
||
| public void addRecords(Collection<Record<Event>> newRecords) { | ||
| if (!forwardPipelinesPresent) | ||
| return; | ||
| newRecords.forEach((record) -> { | ||
| Event event = record.getData(); | ||
| InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle(); | ||
| if (eventHandle != null) { | ||
| eventHandle.acquireReference(); | ||
| } | ||
| }); | ||
| records.addAll(newRecords); | ||
| } | ||
|
|
||
| public List<Record<Event>> getRecords() { | ||
| return records; | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.model.configuration; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import static org.hamcrest.CoreMatchers.equalTo; | ||
| import static org.hamcrest.CoreMatchers.nullValue; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.hamcrest.MatcherAssert.assertThat; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| public class SinkForwardConfigTest { | ||
|
|
||
| @Test | ||
| void testDefaults() { | ||
| final SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(); | ||
| assertThat(sinkForwardConfig.getPipelineNames(), nullValue()); | ||
| assertThat(sinkForwardConfig.getWithData(), nullValue()); | ||
| assertThat(sinkForwardConfig.getWithMetadata(), nullValue()); | ||
| } | ||
|
|
||
| @Test | ||
| void testCustomValues() { | ||
| List<String> pipelines = mock(List.class); | ||
| Map<String, Object> withData = mock(Map.class); | ||
| Map<String, Object> withMetadata = mock(Map.class); | ||
| SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata); | ||
| assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines)); | ||
| assertThat(sinkForwardConfig.getWithData(), equalTo(withData)); | ||
| assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata)); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand the use case for this?
It's just like an add_entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. These will be specific to sink and can only be determined by the sink - for example, documentId after a document is successfully stored in opensearch. The values of the map may need to support reserved words like
documentIdwhich needs to be implemented