Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,80 @@ public void testOpenSearchIndexWithInvalidChars() throws IOException, Interrupte
Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize());
}

@Test
@DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+")
public void testDataStreamDetection() throws IOException, InterruptedException {
final String dataStreamName = "test-data-stream-" + UUID.randomUUID();
final String templateName = dataStreamName + "-template";
final File tempDirectory = Files.createTempDirectory("").toFile();
final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt";

try {
// Create an index template for the data stream first
final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName);
final String templateBody = "{" +
"\"index_patterns\": [\"" + dataStreamName + "\"]," +
"\"data_stream\": {}," +
"\"template\": {" +
"\"mappings\": {" +
"\"properties\": {" +
"\"@timestamp\": {\"type\": \"date\"}" +
"}" +
"}" +
"}" +
"}";
createTemplateRequest.setJsonEntity(templateBody);
client.performRequest(createTemplateRequest);

// Create a data stream
final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName);
client.performRequest(createDataStreamRequest);

// Initialize sink AFTER creating the data stream so detection works
Map<String, Object> metadata = initializeConfigurationMetadata(null, dataStreamName, null);
metadata.put(RetryConfiguration.DLQ_FILE, dlqFile);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);

// Test that the data stream is detected
final String testIdField = "someId";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));

sink.output(testRecords);
sink.shutdown();

// Wait for indexing to complete
Thread.sleep(2000);

// Verify the document was written to the data stream
final List<Map<String, Object>> retSources = getSearchResponseDocSources(dataStreamName);
assertThat("Expected 1 document in data stream " + dataStreamName + " but found " + retSources.size(),
retSources.size(), equalTo(1));
} catch (Exception e) {
throw e;
} finally {
// Clean up the data stream
final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName);
try {
client.performRequest(deleteDataStreamRequest);
} catch (IOException e) {
// Ignore cleanup errors
}

// Clean up the index template
final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName);
try {
client.performRequest(deleteTemplateRequest);
} catch (IOException e) {
// Ignore cleanup errors
}

// Clean up DLQ
FileUtils.deleteQuietly(tempDirectory);
}
}

@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
@DisabledIf(value = "isES6",
Expand Down Expand Up @@ -1962,6 +2036,96 @@ private static boolean isES6() {
return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0;
}

private static boolean isDataStreamNotSupported() {
// Data streams require OpenSearch 1.3.0+
return OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.parse("opensearch:1.3.0")) < 0;
}

@Test
@DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+")
public void testDataStreamFirstWriteWinsBehavior() throws IOException, InterruptedException {
final String dataStreamName = "test-first-write-wins-" + UUID.randomUUID();
final String templateName = dataStreamName + "-template";
final File tempDirectory = Files.createTempDirectory("").toFile();
final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt";

try {
// Create an index template for the data stream
final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName);
final String templateBody = "{" +
"\"index_patterns\": [\"" + dataStreamName + "\"]," +
"\"data_stream\": {}," +
"\"template\": {" +
"\"mappings\": {" +
"\"properties\": {" +
"\"@timestamp\": {\"type\": \"date\"}," +
"\"value\": {\"type\": \"keyword\"}" +
"}" +
"}" +
"}" +
"}";
createTemplateRequest.setJsonEntity(templateBody);
client.performRequest(createTemplateRequest);

// Create the data stream
final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName);
client.performRequest(createDataStreamRequest);

// Initialize sink with document_id configuration
final String testIdField = "someId";
final String testId = "duplicate-id";
Map<String, Object> metadata = initializeConfigurationMetadata(null, dataStreamName, null);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
metadata.put(RetryConfiguration.DLQ_FILE, dlqFile);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);

// Write first document with value "first"
final String firstDoc = "{\"" + testIdField + "\": \"" + testId + "\", \"value\": \"first\"}";
final List<Record<Event>> firstRecords = Collections.singletonList(jsonStringToRecord(firstDoc));
sink.output(firstRecords);

// Wait for indexing
Thread.sleep(1000);

// Write second document with same ID but value "second"
final String secondDoc = "{\"" + testIdField + "\": \"" + testId + "\", \"value\": \"second\"}";
final List<Record<Event>> secondRecords = Collections.singletonList(jsonStringToRecord(secondDoc));
sink.output(secondRecords);

sink.shutdown();

// Wait for indexing to complete
Thread.sleep(2000);

// Verify only one document exists
final List<Map<String, Object>> retSources = getSearchResponseDocSources(dataStreamName);
assertThat("Expected exactly 1 document due to first-write-wins", retSources.size(), equalTo(1));

// Verify the document has the FIRST value (first-write-wins)
final Map<String, Object> document = retSources.get(0);
assertThat("Expected first write to win", document.get("value"), equalTo("first"));

} finally {
// Clean up
final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName);
try {
client.performRequest(deleteDataStreamRequest);
} catch (IOException e) {
// Ignore cleanup errors
}

final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName);
try {
client.performRequest(deleteTemplateRequest);
} catch (IOException e) {
// Ignore cleanup errors
}

FileUtils.deleteQuietly(tempDirectory);
}
}

private static Stream<Object> getAttributeTestSpecialAndExtremeValues() {
return Stream.of(
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.ClusterSettingsParser;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamDetector;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have an integration test in OpenSearchSinkIT.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be beneficial to document the required permissions for the Data Prepper user in OpenSearch.

import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamIndex;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexCache;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DocumentBuilder;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager;
Expand Down Expand Up @@ -150,6 +153,9 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private final ExpressionEvaluator expressionEvaluator;

private FailedBulkOperationConverter failedBulkOperationConverter;
private DataStreamDetector dataStreamDetector;
private DataStreamIndex dataStreamIndex;
IndexCache indexCache;

private DlqProvider dlqProvider;
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
Expand Down Expand Up @@ -306,6 +312,10 @@ private void doInitializeInternal() throws IOException {
queryExecutorService.submit(existingDocumentQueryManager);
}

this.indexCache = new IndexCache();
this.dataStreamDetector = new DataStreamDetector(openSearchClient, indexCache);
this.dataStreamIndex = new DataStreamIndex(dataStreamDetector, openSearchSinkConfig.getIndexConfiguration());

this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -436,7 +446,6 @@ public void doOutput(final Collection<Record<Event>> records) {

for (final Record<Event> record : records) {
final Event event = record.getData();
final SerializedJson document = getDocument(event);
String indexName = configuredIndexAlias;
try {
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
Expand All @@ -446,6 +455,9 @@ public void doOutput(final Collection<Record<Event>> records) {
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
continue;
}

dataStreamIndex.ensureTimestamp(event, indexName);
final SerializedJson document = getDocument(event);

Long version = null;
String versionExpressionEvaluationResult = null;
Expand Down Expand Up @@ -483,6 +495,10 @@ public void doOutput(final Collection<Record<Event>> records) {
if (eventAction.contains("${")) {
eventAction = event.formatString(eventAction, expressionEvaluator);
}

if (dataStreamDetector.isDataStream(indexName)) {
eventAction = dataStreamIndex.determineAction(eventAction, indexName);
}
if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) {
LOG.error("Unknown action {}, skipping the event", eventAction);
invalidActionErrorsCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a new license header for new files.

https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

Please use this for all new files.

* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.GetDataStreamRequest;
import org.opensearch.client.opensearch.indices.GetDataStreamResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Utility class to detect if an index name refers to a Data Stream
*/
public class DataStreamDetector {
private static final Logger LOG = LoggerFactory.getLogger(DataStreamDetector.class);

private final OpenSearchClient openSearchClient;
private final IndexCache indexCache;

public DataStreamDetector(final OpenSearchClient openSearchClient, final IndexCache indexCache) {
this.openSearchClient = openSearchClient;
this.indexCache = indexCache;
}

/**
* Determines if the given index name refers to a Data Stream
* @param indexName the index name to check
* @return true if it's a Data Stream, false otherwise
*/
public boolean isDataStream(final String indexName) {
final Boolean cached = indexCache.getDataStreamResult(indexName);
if (cached != null) {
return cached;
}

final boolean result = checkDataStream(indexName);
indexCache.putDataStreamResult(indexName, result);
return result;
}

private boolean checkDataStream(final String indexName) {
try {
final GetDataStreamRequest request = GetDataStreamRequest.of(r -> r.name(indexName));
final GetDataStreamResponse response = openSearchClient.indices().getDataStream(request);

// If we get a response without exception, it's a data stream
return response.dataStreams() != null && !response.dataStreams().isEmpty();

} catch (final IOException e) {
// If we get a 404 or similar, it's not a data stream
LOG.debug("Index '{}' is not a Data Stream: {}", indexName, e.getMessage());
return false;
} catch (final Exception e) {
LOG.debug("Data Stream detection not supported or failed for index '{}': {}", indexName, e.getMessage());
return false;
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class DataStreamIndex {
private static final Logger LOG = LoggerFactory.getLogger(DataStreamIndex.class);
private static final String TIMESTAMP_FIELD = "@timestamp";

private final DataStreamDetector dataStreamDetector;
private final IndexConfiguration indexConfiguration;

public DataStreamIndex(final DataStreamDetector dataStreamDetector, final IndexConfiguration indexConfiguration) {
this.dataStreamDetector = dataStreamDetector;
this.indexConfiguration = indexConfiguration;
}


public String determineAction(final String configuredAction, final String indexName) {
if (dataStreamDetector.isDataStream(indexName)) {
validateConfigurationForDataStream(indexName);

// Only warn if user explicitly configured a non-create action (excluding the default "index" action)
if (configuredAction != null &&
!configuredAction.equals(OpenSearchBulkActions.CREATE.toString()) &&
!configuredAction.equals(OpenSearchBulkActions.INDEX.toString())) {
LOG.warn("Data Stream '{}' requires 'create' action, but '{}' was configured. Using 'create' action.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be marked as NOISY. Can be a fast-follow

indexName, configuredAction);
}
return OpenSearchBulkActions.CREATE.toString();
}
return configuredAction != null ? configuredAction : OpenSearchBulkActions.INDEX.toString();
}


public void ensureTimestamp(final Event event, final String indexName) {
if (dataStreamDetector.isDataStream(indexName) && !event.containsKey(TIMESTAMP_FIELD)) {
event.put(TIMESTAMP_FIELD, event.getEventHandle().getInternalOriginationTime().toEpochMilli());
}
}

private void validateConfigurationForDataStream(final String indexName) {
if (indexConfiguration.getDocumentIdField() != null || indexConfiguration.getDocumentId() != null) {
LOG.warn("Data Stream '{}' with document ID configuration uses first-write-wins behavior. Subsequent writes to the same ID will be ignored.", indexName);
}
if (indexConfiguration.getRoutingField() != null || indexConfiguration.getRouting() != null) {
LOG.warn("Data Stream '{}' does not support routing. Routing configuration will be ignored.", indexName);
}
}


}
Loading
Loading