Skip to content

Introduce support for Opensearch data streams#6249

Merged
dlvenable merged 9 commits into
opensearch-project:mainfrom
JonahCalvo:data-stream-support
Nov 18, 2025
Merged

Introduce support for Opensearch data streams#6249
dlvenable merged 9 commits into
opensearch-project:mainfrom
JonahCalvo:data-stream-support

Conversation

@JonahCalvo

@JonahCalvo JonahCalvo commented Nov 6, 2025

Copy link
Copy Markdown
Contributor

Description

This PR adds native support for OpenSearch Data Streams to the OpenSearch sink, enabling automatic detection and proper handling of data stream indices with zero configuration changes required.

  • Upon initialization, uses GetDataStreamRequest API to identify when an index name refers to a data stream, caching the result with index name

  • Overrides configured actions to use create for data streams (required by OpenSearch)

  • Automatically adds @timestamp field for data streams when missing

Issues Resolved

Resolves #2037 #2038

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Jonah Calvo and others added 3 commits November 5, 2025 04:14

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JonahCalvo for this change!

Do we also need to verify that the documentId is not set? What about routing? Is this allowed with data streams?

BulkOperation bulkOperation;

try {
if (dataStreamDetector.isDataStream(indexName) && !event.containsKey("@timestamp")) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this always be @timestamp for data streams? If not should probably make this configurable right?

Signed-off-by: Jonah Calvo <caljonah@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JonahCalvo for making the improvements. I have only a few comments left.

@@ -0,0 +1,66 @@
/*

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a new license header for new files.

https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

Please use this for all new files.

}
}

public void validateDataStreamCompatibility(final String indexName, final String documentId, final String routing) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you are using this.

Also, it would be better to pass an instance of IndexConfiguration. This keeps to caller from having to know which configurations the data streams care about.

import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.ClusterSettingsParser;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamDetector;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have an integration test in OpenSearchSinkIT.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be beneficial to document the required permissions for the Data Prepper user in OpenSearch.

@JonahCalvo JonahCalvo force-pushed the data-stream-support branch 12 times, most recently from 58e013e to 71573c4 Compare November 12, 2025 20:01
@JonahCalvo JonahCalvo force-pushed the data-stream-support branch 6 times, most recently from 4a45fb1 to 61998e2 Compare November 17, 2025 17:26
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Jonah Calvo added 3 commits November 17, 2025 11:57
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
if (configuredAction != null &&
!configuredAction.equals(OpenSearchBulkActions.CREATE.toString()) &&
!configuredAction.equals(OpenSearchBulkActions.INDEX.toString())) {
LOG.warn("Data Stream '{}' requires 'create' action, but '{}' was configured. Using 'create' action.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be marked as NOISY. Can be a fast-follow

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JonahCalvo !

@dlvenable dlvenable merged commit 23eb0d9 into opensearch-project:main Nov 18, 2025
69 of 70 checks passed
eatulban pushed a commit to eatulban/data-prepper that referenced this pull request Dec 11, 2025
Add OpenSearch Data Stream support with automatic action selection

Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Data Prepper can not write to Opensearch Datastream

4 participants