-
Notifications
You must be signed in to change notification settings - Fork 331
Introduce support for Opensearch data streams #6249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4b4a13a
11292a7
1af5913
e9fbc38
81f2386
994c615
7d2c302
d4aae9e
b392748
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a new license header for new files. https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers Please use this for all new files. |
||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.sink.opensearch.index; | ||
|
|
||
| import org.opensearch.client.opensearch.OpenSearchClient; | ||
| import org.opensearch.client.opensearch.indices.GetDataStreamRequest; | ||
| import org.opensearch.client.opensearch.indices.GetDataStreamResponse; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * Utility class to detect if an index name refers to a Data Stream | ||
| */ | ||
| public class DataStreamDetector { | ||
| private static final Logger LOG = LoggerFactory.getLogger(DataStreamDetector.class); | ||
|
|
||
| private final OpenSearchClient openSearchClient; | ||
| private final IndexCache indexCache; | ||
|
|
||
| public DataStreamDetector(final OpenSearchClient openSearchClient, final IndexCache indexCache) { | ||
| this.openSearchClient = openSearchClient; | ||
| this.indexCache = indexCache; | ||
| } | ||
|
|
||
| /** | ||
| * Determines if the given index name refers to a Data Stream | ||
| * @param indexName the index name to check | ||
| * @return true if it's a Data Stream, false otherwise | ||
| */ | ||
| public boolean isDataStream(final String indexName) { | ||
| final Boolean cached = indexCache.getDataStreamResult(indexName); | ||
| if (cached != null) { | ||
| return cached; | ||
| } | ||
|
|
||
| final boolean result = checkDataStream(indexName); | ||
| indexCache.putDataStreamResult(indexName, result); | ||
| return result; | ||
| } | ||
|
|
||
| private boolean checkDataStream(final String indexName) { | ||
| try { | ||
| final GetDataStreamRequest request = GetDataStreamRequest.of(r -> r.name(indexName)); | ||
| final GetDataStreamResponse response = openSearchClient.indices().getDataStream(request); | ||
|
|
||
| // If we get a response without exception, it's a data stream | ||
| return response.dataStreams() != null && !response.dataStreams().isEmpty(); | ||
|
|
||
| } catch (final IOException e) { | ||
| // If we get a 404 or similar, it's not a data stream | ||
| LOG.debug("Index '{}' is not a Data Stream: {}", indexName, e.getMessage()); | ||
| return false; | ||
| } catch (final Exception e) { | ||
| LOG.debug("Data Stream detection not supported or failed for index '{}': {}", indexName, e.getMessage()); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.sink.opensearch.index; | ||
|
|
||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| public class DataStreamIndex { | ||
| private static final Logger LOG = LoggerFactory.getLogger(DataStreamIndex.class); | ||
| private static final String TIMESTAMP_FIELD = "@timestamp"; | ||
|
|
||
| private final DataStreamDetector dataStreamDetector; | ||
| private final IndexConfiguration indexConfiguration; | ||
|
|
||
| public DataStreamIndex(final DataStreamDetector dataStreamDetector, final IndexConfiguration indexConfiguration) { | ||
| this.dataStreamDetector = dataStreamDetector; | ||
| this.indexConfiguration = indexConfiguration; | ||
| } | ||
|
|
||
|
|
||
| public String determineAction(final String configuredAction, final String indexName) { | ||
| if (dataStreamDetector.isDataStream(indexName)) { | ||
| validateConfigurationForDataStream(indexName); | ||
|
|
||
| // Only warn if user explicitly configured a non-create action (excluding the default "index" action) | ||
| if (configuredAction != null && | ||
| !configuredAction.equals(OpenSearchBulkActions.CREATE.toString()) && | ||
| !configuredAction.equals(OpenSearchBulkActions.INDEX.toString())) { | ||
| LOG.warn("Data Stream '{}' requires 'create' action, but '{}' was configured. Using 'create' action.", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should probably be marked as NOISY. Can be a fast-follow |
||
| indexName, configuredAction); | ||
| } | ||
| return OpenSearchBulkActions.CREATE.toString(); | ||
| } | ||
| return configuredAction != null ? configuredAction : OpenSearchBulkActions.INDEX.toString(); | ||
| } | ||
|
|
||
|
|
||
| public void ensureTimestamp(final Event event, final String indexName) { | ||
| if (dataStreamDetector.isDataStream(indexName) && !event.containsKey(TIMESTAMP_FIELD)) { | ||
| event.put(TIMESTAMP_FIELD, event.getEventHandle().getInternalOriginationTime().toEpochMilli()); | ||
| } | ||
| } | ||
|
|
||
| private void validateConfigurationForDataStream(final String indexName) { | ||
| if (indexConfiguration.getDocumentIdField() != null || indexConfiguration.getDocumentId() != null) { | ||
| LOG.warn("Data Stream '{}' with document ID configuration uses first-write-wins behavior. Subsequent writes to the same ID will be ignored.", indexName); | ||
| } | ||
| if (indexConfiguration.getRoutingField() != null || indexConfiguration.getRouting() != null) { | ||
| LOG.warn("Data Stream '{}' does not support routing. Routing configuration will be ignored.", indexName); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have an integration test in
OpenSearchSinkIT.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also be beneficial to document the required permissions for the Data Prepper user in OpenSearch.