Skip to content

Add support for forwarding successful records from sinks using SinkContext#5994

Merged
kkondaka merged 8 commits into
opensearch-project:mainfrom
kkondaka:forward-pipelines
Oct 3, 2025
Merged

Add support for forwarding successful records from sinks using SinkContext#5994
kkondaka merged 8 commits into
opensearch-project:mainfrom
kkondaka:forward-pipelines

Conversation

@kkondaka

@kkondaka kkondaka commented Aug 14, 2025

Copy link
Copy Markdown
Collaborator

Description

Add support for forwarding successful records from sinks using SinkContext.
A new config option is added to all sinks

forward_to: 
  pipelines: [ <list of sub-pipepine names> ]
  with_data:
       key: value
  with_metadata:
       key: value

Issues Resolved

Resolves #5985

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

graytaylor0
graytaylor0 previously approved these changes Aug 19, 2025
this.forwardToPipelines = new HashMap<>();
if (forwardPipelineNames != null) {
for (final String forwardPipelineName: forwardPipelineNames) {
this.forwardToPipelines.put(forwardPipelineName, null);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is value null here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, there are no "pipeline"s attached to to a given pipeline name. It's possible that the pipeline name is incorrect. The code below will match the names with the pipelines.

if (pipeline != null) {
forwardToPipelines.put(key, pipeline);
} else {
throw new RuntimeException(String.format("forwarding pipeline {} doesn't exist", key));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only catch this at runtime? We could validate on startup if a pipeline configured in forward_to doesn't exist right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We can add additional validation. But the order of events may make it difficult. The sink gets instantiated first and then the pipelines are instantiated so at the time of sink construction we do not know if a pipeline is present or not.

Comment on lines +18 to +22
@JsonProperty("with_metadata")
Map<String, Object> withMetadata;

@JsonProperty("with_data")
Map<String, Object> withData;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the use case for this?

It's just like an add_entries?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. These will be specific to sink and can only be determined by the sink - for example, documentId after a document is successfully stored in opensearch. The values of the map may need to support reserved words like documentId which needs to be implemented

}
}

if (doForward) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to call the SinkContext to get a "forwardTo" batch. Then you populate it and forward it. This can move these conditions in there so we don't repeat a lot of possibly fragile code.

ForwardBatch forwardBatch = sinkContext.getForwardBatch();
...

checkTypeAndWriteObject(record.getData(), writer);
forwardBatch(record);
...

forwardBatch.forward();


final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_FORWARD);

for (int i = 0; i < sinkRecords.size(); i++) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert the size of sinkRecords first.


final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ);

for (int i = 0; i < sinkRecords.size(); i++) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert the size of sinkRecords first.

}

@Test
void pipeline_forward_test() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While both sink forwarding and DLQ use similar code underneath, they are different end functions. I think we should have different IT test suites for each to reflect that.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one above for DLQ and this one is for forwarding. Is that what you meant?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test suite is a collection of tests. So I mean to have different files. PipelineDlqIT and ForwardPipelinesIT.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I can split them into two

package org.opensearch.dataprepper.model.sink;

public class SinkForwardRecordsContext {
public SinkForwardRecordsContext() {}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps adding in here would make sense. What if a sink only writes some records to the destination?

This context would collect all of the successful ones.

return;

for (final Record<Object> record : records) {
if (record.getData() instanceof Event) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic can go into SinkForwardRecordsContext.

public void addRecord(Record<?> record) {
  if(! (record.getData() instanceof Event)) {
    return;
  }
  ...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means we can't forward Record<Object> records. I was testing with File sink forwarding to another file sink to verify the forwarding logic.

for (final Record<Object> record : records) {
if (record.getData() instanceof Event) {
Event event = (Event)record.getData();
sinkForwardRecordsContext.addRecord(new Record<>(event));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you create a new Record?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable because FileSink uses Record<Object> and not Record<Event>. It looks like I can't simply cast it. I don't think we should change forwardRecords to accept Record<Object>

@kkondaka

Copy link
Copy Markdown
Collaborator Author

@dlvenable I think this can be merged as is and if you want the last comment to be addressed, I can do in a different PR

graytaylor0
graytaylor0 previously approved these changes Oct 1, 2025
@graytaylor0 graytaylor0 self-requested a review October 1, 2025 22:37
dlvenable
dlvenable previously approved these changes Oct 1, 2025
@kkondaka kkondaka dismissed stale reviews from dlvenable and graytaylor0 via 70d543c October 2, 2025 00:05
…ntext

Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
@kkondaka kkondaka merged commit 3e71902 into opensearch-project:main Oct 3, 2025
44 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Provide sinks access to all headless pipelines

3 participants