Introduce support for Opensearch data streams#6249
Conversation
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @JonahCalvo for this change!
Do we also need to verify that the documentId is not set? What about routing? Is this allowed with data streams?
| BulkOperation bulkOperation; | ||
|
|
||
| try { | ||
| if (dataStreamDetector.isDataStream(indexName) && !event.containsKey("@timestamp")) { |
There was a problem hiding this comment.
Will this always be @timestamp for data streams? If not should probably make this configurable right?
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @JonahCalvo for making the improvements. I have only a few comments left.
| @@ -0,0 +1,66 @@ | |||
| /* | |||
There was a problem hiding this comment.
We have a new license header for new files.
https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers
Please use this for all new files.
| } | ||
| } | ||
|
|
||
| public void validateDataStreamCompatibility(final String indexName, final String documentId, final String routing) { |
There was a problem hiding this comment.
I don't think you are using this.
Also, it would be better to pass an instance of IndexConfiguration. This keeps to caller from having to know which configurations the data streams care about.
| import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter; | ||
| import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData; | ||
| import org.opensearch.dataprepper.plugins.sink.opensearch.index.ClusterSettingsParser; | ||
| import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamDetector; |
There was a problem hiding this comment.
We should probably have an integration test in OpenSearchSinkIT.
There was a problem hiding this comment.
It would also be beneficial to document the required permissions for the Data Prepper user in OpenSearch.
58e013e to
71573c4
Compare
4a45fb1 to
61998e2
Compare
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
61998e2 to
81f2386
Compare
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
0d53a7e to
7b5b46b
Compare
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
7b5b46b to
b392748
Compare
| if (configuredAction != null && | ||
| !configuredAction.equals(OpenSearchBulkActions.CREATE.toString()) && | ||
| !configuredAction.equals(OpenSearchBulkActions.INDEX.toString())) { | ||
| LOG.warn("Data Stream '{}' requires 'create' action, but '{}' was configured. Using 'create' action.", |
There was a problem hiding this comment.
This should probably be marked as NOISY. Can be a fast-follow
Add OpenSearch Data Stream support with automatic action selection Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Description
This PR adds native support for OpenSearch Data Streams to the OpenSearch sink, enabling automatic detection and proper handling of data stream indices with zero configuration changes required.
Upon initialization, uses GetDataStreamRequest API to identify when an index name refers to a data stream, caching the result with index name
Overrides configured actions to use
createfor data streams (required by OpenSearch)Automatically adds @timestamp field for data streams when missing
Issues Resolved
Resolves #2037 #2038
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.