Add forward_to support to opensearch sink#6349
Conversation
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
| @JsonProperty("with_data") final Map<String, Object> withData, | ||
| @JsonProperty("with_metadata") final Map<String, Object> withMetadata) { | ||
| if (pipelineNames.size() != 1) { | ||
| throw new RuntimeException("Supports only one forwarding pipeline"); |
There was a problem hiding this comment.
This should throw InvalidPipelineConfigurationException.
There was a problem hiding this comment.
@dlvenable I do not think it can throw InvalidPipelineConfigurationException because it is in a different package that depends on data-prepper-api package. It will introduce circular dependency between the packages.
There was a problem hiding this comment.
Changed it to IllegalArgumentException
There was a problem hiding this comment.
We can use InvalidPluginConfigurationException instead.
| } | ||
|
|
||
| @Test | ||
| void testInvalidValues() { |
There was a problem hiding this comment.
I'm unclear on this. Is this testing all invalid values? When testing invalid values, you should use all valid values except one specific invalid value. Otherwise, you don't know what you tested against or if it works.
Additionally, you should test invalid values on both ends: empty list and list with two pipelines.
Also, make the names clearer.
e.g.
empty_pipelines_list_throws_exception()
pipelines_list_with_two_or_more_pipelines_throws_exception()
There was a problem hiding this comment.
Fair point. Added two negative tests to test 0 and two pipelines. Added one positive test to test with one pipeline.
| assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true)); | ||
| verify(forwardPipeline1, times(1)).sendEvents(eq(records)); | ||
| verify(forwardPipeline2, times(1)).sendEvents(eq(records)); | ||
| verify(forwardPipeline1, times(1)).sendEvents(any()); |
There was a problem hiding this comment.
Why are we loosening these verifies? This doesn't seem right.
There was a problem hiding this comment.
No. We are not loosening the verifies. Since the forwardRecords() clears the records It can't be verified like that anymore. The original records would now be empty and the verification would fail
| sinkForwardRecordsContext.addRecords(List.of(record2, record3)); | ||
| List<Record<Event>> records = sinkForwardRecordsContext.getRecords(); | ||
| assertThat(records.size(), equalTo(3)); | ||
| sinkForwardRecordsContext.clearRecords(); |
There was a problem hiding this comment.
This should be it's own test.
sinkForwardRecordsContext_clearRecords_removes_all_records()
| } | ||
|
|
||
| @Test | ||
| public void testForwardingRecords() throws IOException, InterruptedException { |
There was a problem hiding this comment.
This this a better name and scope down what it is doing.
output_forwards_created_documents_to...()
Also, as this is an IT, I think it would be better to actually forward these to another OpenSearch index.
There was a problem hiding this comment.
@dlvenable Yes, it is IT but to test forwarding to another Opensearch sink requires complete pipeline functionality. We are not forwarding to another sink but forwarding to a pipeline. Goal is to verify that it is forwarding to pipelines. I do not think we need to mock entire pipeline. We could probably send to a sink directly from
sinkContext.forwardRecords() But sending another opensearch sink really doesn't test any more functionality than what is being tested already.
There was a problem hiding this comment.
Yea, I think you are right here. This is an integration test of OpenSearch functionality, not core Data Prepper.
| final AccumulatingBulkRequest requestToReissue = bulkRequestSupplier.get(); | ||
| final ImmutableList.Builder<FailedBulkOperation> nonRetryableFailures = ImmutableList.builder(); | ||
| int index = 0; | ||
| List<BulkOperationWrapper> successfulOperations = new ArrayList<>(); |
There was a problem hiding this comment.
Initialize this array with the total number of operations, assuming the best case.
There was a problem hiding this comment.
@dlvenable You mean this?
successfulOperations = new ArrayList<>(request.getOperations.size());
| }); | ||
| } | ||
|
|
||
| private void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOperations) { |
There was a problem hiding this comment.
This needs unit tests. There are a lot of conditions to cover as well.
| return new BulkRetryStrategy( | ||
| requestFunction, | ||
| logFailure, | ||
| (operations) -> { |
There was a problem hiding this comment.
This moves some unnecessary logic into the tests. I think you can just make a mock(Consumer.class) and then call verify(successOperationsHandler).accept(successOperations).
We probably need tests to verify that failed operations are not included.
There was a problem hiding this comment.
@dlvenable Unfortunately, that wont work. The tests count on eventhandle getting released. So, it is important to release the event handle in this Consumer.
| }); | ||
| } | ||
|
|
||
| private void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOperations) { |
There was a problem hiding this comment.
Did you consider making this handle List and moving the logic to a higher level to make it re-usable between the sinks? Use bulk operation wrapper makes this very coupled to opensearch sink
There was a problem hiding this comment.
@graytaylor0 by "handle list", you mean "event handle list"? We can't forward event handles. It needs to be events. I don't think this part of the code would be that much re-usable in other Sinks. We are moving towards using only pipeline DLQ, which means all sinks need to keep the events, and just event handles. Due to legacy code we have multiple conditions in opensearch.
There was a problem hiding this comment.
I meant to say List<Event> sorry
|
The OpenSearchSinkIT are all failing with this error |
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
* Add forward_to support to opensearch sink Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added integration test Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> Signed-off-by: Nathan Wand <wandna@amazon.com>
* Add forward_to support to opensearch sink Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added integration test Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
* Add forward_to support to opensearch sink Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added integration test Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
* Add forward_to support to opensearch sink Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added integration test Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Description
forward_tooption to all sinks is added viaSinkContextbut the sink level implementation for opensearch sink is not done. This PR adds the support for forwarding events after successful delivery to opensearch sink.Also added check to not allow more than one forwarding pipeline because that would require acknowledgement support. Forwarding to multiple pipelines an be done after initially forwarding to one pipeline, if it is really needed.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.