Add support for forwarding successful records from sinks using SinkContext#5994
Conversation
| this.forwardToPipelines = new HashMap<>(); | ||
| if (forwardPipelineNames != null) { | ||
| for (final String forwardPipelineName: forwardPipelineNames) { | ||
| this.forwardToPipelines.put(forwardPipelineName, null); |
There was a problem hiding this comment.
Initially, there are no "pipeline"s attached to to a given pipeline name. It's possible that the pipeline name is incorrect. The code below will match the names with the pipelines.
| if (pipeline != null) { | ||
| forwardToPipelines.put(key, pipeline); | ||
| } else { | ||
| throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key)); |
There was a problem hiding this comment.
Can we only catch this at runtime? We could validate on startup if a pipeline configured in forward_to doesn't exist right?
There was a problem hiding this comment.
Sure. We can add additional validation. But the order of events may make it difficult. The sink gets instantiated first and then the pipelines are instantiated so at the time of sink construction we do not know if a pipeline is present or not.
| @JsonProperty("with_metadata") | ||
| Map<String, Object> withMetadata; | ||
|
|
||
| @JsonProperty("with_data") | ||
| Map<String, Object> withData; |
There was a problem hiding this comment.
Trying to understand the use case for this?
It's just like an add_entries?
There was a problem hiding this comment.
Not really. These will be specific to sink and can only be determined by the sink - for example, documentId after a document is successfully stored in opensearch. The values of the map may need to support reserved words like documentId which needs to be implemented
| } | ||
| } | ||
|
|
||
| if (doForward) { |
There was a problem hiding this comment.
I think it would be nice to call the SinkContext to get a "forwardTo" batch. Then you populate it and forward it. This can move these conditions in there so we don't repeat a lot of possibly fragile code.
ForwardBatch forwardBatch = sinkContext.getForwardBatch();
...
checkTypeAndWriteObject(record.getData(), writer);
forwardBatch(record);
...
forwardBatch.forward();
|
|
||
| final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_FORWARD); | ||
|
|
||
| for (int i = 0; i < sinkRecords.size(); i++) { |
There was a problem hiding this comment.
Assert the size of sinkRecords first.
|
|
||
| final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ); | ||
|
|
||
| for (int i = 0; i < sinkRecords.size(); i++) { |
There was a problem hiding this comment.
Assert the size of sinkRecords first.
| } | ||
|
|
||
| @Test | ||
| void pipeline_forward_test() { |
There was a problem hiding this comment.
While both sink forwarding and DLQ use similar code underneath, they are different end functions. I think we should have different IT test suites for each to reflect that.
There was a problem hiding this comment.
I have one above for DLQ and this one is for forwarding. Is that what you meant?
There was a problem hiding this comment.
A test suite is a collection of tests. So I mean to have different files. PipelineDlqIT and ForwardPipelinesIT.
There was a problem hiding this comment.
Oh. I can split them into two
| package org.opensearch.dataprepper.model.sink; | ||
|
|
||
| public class SinkForwardRecordsContext { | ||
| public SinkForwardRecordsContext() {} |
There was a problem hiding this comment.
Perhaps adding in here would make sense. What if a sink only writes some records to the destination?
This context would collect all of the successful ones.
| return; | ||
|
|
||
| for (final Record<Object> record : records) { | ||
| if (record.getData() instanceof Event) { |
There was a problem hiding this comment.
This logic can go into SinkForwardRecordsContext.
public void addRecord(Record<?> record) {
if(! (record.getData() instanceof Event)) {
return;
}
...
There was a problem hiding this comment.
That means we can't forward Record<Object> records. I was testing with File sink forwarding to another file sink to verify the forwarding logic.
| for (final Record<Object> record : records) { | ||
| if (record.getData() instanceof Event) { | ||
| Event event = (Event)record.getData(); | ||
| sinkForwardRecordsContext.addRecord(new Record<>(event)); |
There was a problem hiding this comment.
Why do you create a new Record?
There was a problem hiding this comment.
@dlvenable because FileSink uses Record<Object> and not Record<Event>. It looks like I can't simply cast it. I don't think we should change forwardRecords to accept Record<Object>
|
@dlvenable I think this can be merged as is and if you want the last comment to be addressed, I can do in a different PR |
8f194b7 to
70d543c
Compare
…ntext Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
70d543c to
ef7aaa3
Compare
Description
Add support for forwarding successful records from sinks using SinkContext.
A new config option is added to all sinks
Issues Resolved
Resolves #5985
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.