Skip to content

Add forward_to support to opensearch sink#6349

Merged
kkondaka merged 4 commits into
opensearch-project:mainfrom
kkondaka:os-sink-forwarding
Dec 16, 2025
Merged

Add forward_to support to opensearch sink#6349
kkondaka merged 4 commits into
opensearch-project:mainfrom
kkondaka:os-sink-forwarding

Conversation

@kkondaka

Copy link
Copy Markdown
Collaborator

Description

forward_to option to all sinks is added via SinkContext but the sink level implementation for opensearch sink is not done. This PR adds the support for forwarding events after successful delivery to opensearch sink.

Also added check to not allow more than one forwarding pipeline because that would require acknowledgement support. Forwarding to multiple pipelines an be done after initially forwarding to one pipeline, if it is really needed.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@JsonProperty("with_data") final Map<String, Object> withData,
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
if (pipelineNames.size() != 1) {
throw new RuntimeException("Supports only one forwarding pipeline");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should throw InvalidPipelineConfigurationException.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable I do not think it can throw InvalidPipelineConfigurationException because it is in a different package that depends on data-prepper-api package. It will introduce circular dependency between the packages.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to IllegalArgumentException

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkondaka ,

We can use InvalidPluginConfigurationException instead.

}

@Test
void testInvalidValues() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear on this. Is this testing all invalid values? When testing invalid values, you should use all valid values except one specific invalid value. Otherwise, you don't know what you tested against or if it works.

Additionally, you should test invalid values on both ends: empty list and list with two pipelines.

Also, make the names clearer.

e.g.

empty_pipelines_list_throws_exception()
pipelines_list_with_two_or_more_pipelines_throws_exception()

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Added two negative tests to test 0 and two pipelines. Added one positive test to test with one pipeline.

assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
verify(forwardPipeline1, times(1)).sendEvents(any());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we loosening these verifies? This doesn't seem right.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We are not loosening the verifies. Since the forwardRecords() clears the records It can't be verified like that anymore. The original records would now be empty and the verification would fail

sinkForwardRecordsContext.addRecords(List.of(record2, record3));
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
assertThat(records.size(), equalTo(3));
sinkForwardRecordsContext.clearRecords();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be it's own test.

sinkForwardRecordsContext_clearRecords_removes_all_records()

}

@Test
public void testForwardingRecords() throws IOException, InterruptedException {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This this a better name and scope down what it is doing.

output_forwards_created_documents_to...()

Also, as this is an IT, I think it would be better to actually forward these to another OpenSearch index.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Yes, it is IT but to test forwarding to another Opensearch sink requires complete pipeline functionality. We are not forwarding to another sink but forwarding to a pipeline. Goal is to verify that it is forwarding to pipelines. I do not think we need to mock entire pipeline. We could probably send to a sink directly from
sinkContext.forwardRecords() But sending another opensearch sink really doesn't test any more functionality than what is being tested already.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think you are right here. This is an integration test of OpenSearch functionality, not core Data Prepper.

final AccumulatingBulkRequest requestToReissue = bulkRequestSupplier.get();
final ImmutableList.Builder<FailedBulkOperation> nonRetryableFailures = ImmutableList.builder();
int index = 0;
List<BulkOperationWrapper> successfulOperations = new ArrayList<>();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize this array with the total number of operations, assuming the best case.

@kkondaka kkondaka Dec 16, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable You mean this?

successfulOperations = new ArrayList<>(request.getOperations.size());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.

});
}

private void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOperations) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs unit tests. There are a lot of conditions to cover as well.

return new BulkRetryStrategy(
requestFunction,
logFailure,
(operations) -> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This moves some unnecessary logic into the tests. I think you can just make a mock(Consumer.class) and then call verify(successOperationsHandler).accept(successOperations).

We probably need tests to verify that failed operations are not included.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Unfortunately, that wont work. The tests count on eventhandle getting released. So, it is important to release the event handle in this Consumer.

});
}

private void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOperations) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider making this handle List and moving the logic to a higher level to make it re-usable between the sinks? Use bulk operation wrapper makes this very coupled to opensearch sink

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 by "handle list", you mean "event handle list"? We can't forward event handles. It needs to be events. I don't think this part of the code would be that much re-usable in other Sinks. We are moving towards using only pipeline DLQ, which means all sinks need to keep the events, and just event handles. Due to legacy code we have multiple conditions in opensearch.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to say List<Event> sorry

@graytaylor0

Copy link
Copy Markdown
Member

The OpenSearchSinkIT are all failing with this error

 OpenSearchSinkIT > testOpenSearchIndexWithDate(String) > [2] yyyy-MM-dd FAILED
    java.lang.NullPointerException at OpenSearchSinkIT.java:159

OpenSearchSinkIT > testOpenSearchIndexWithDate(String) > [3] dd-MM-yyyy FAILED
    java.lang.NullPointerException at OpenSearchSinkIT.java:159

OpenSearchSinkIT > testOpenSearchRouting(String) > [1]  FAILED
    java.lang.NullPointerException at OpenSearchSinkIT.java:159

OpenSearchSinkIT > testOpenSearchRouting(String) > [2] info/ids/rid FAILED
    java.lang.NullPointerException at OpenSearchSinkIT.java:159

OpenSearchSinkIT > testOpenSearchRouting(String) > [3] rid FAILED
    java.lang.NullPointerException at OpenSearchSinkIT.java:159

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@kkondaka kkondaka merged commit 37d65f8 into opensearch-project:main Dec 16, 2025
69 of 70 checks passed
wandna-amazon pushed a commit to wandna-amazon/data-prepper that referenced this pull request Jan 8, 2026
* Add forward_to support to opensearch sink

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added integration test

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Nathan Wand <wandna@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
* Add forward_to support to opensearch sink

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added integration test

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
* Add forward_to support to opensearch sink

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added integration test

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
* Add forward_to support to opensearch sink

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added integration test

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants