Skip to content

Commit 81f2386

Browse files
author
Jonah Calvo
committed
some more small formatting changes for data streams support
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
1 parent e9fbc38 commit 81f2386

6 files changed

Lines changed: 171 additions & 51 deletions

File tree

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,15 @@
4747
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
4848
import org.opensearch.dataprepper.model.record.Record;
4949
import org.opensearch.dataprepper.model.sink.SinkContext;
50+
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy;
5051
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
52+
import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
53+
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
5154
import org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager;
5255
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
5356
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants;
5457
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
58+
import org.opensearch.dataprepper.plugins.sink.opensearch.RetryConfiguration;
5559

5660
import javax.ws.rs.HttpMethod;
5761
import java.io.BufferedReader;
@@ -1637,6 +1641,131 @@ public void testOpenSearchIndexWithInvalidChars() throws IOException, Interrupte
16371641
Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize());
16381642
}
16391643

1644+
@Test
1645+
@DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+")
1646+
public void testDataStreamDetection() throws IOException, InterruptedException {
1647+
final String dataStreamName = "test-data-stream-" + UUID.randomUUID();
1648+
final String templateName = dataStreamName + "-template";
1649+
final File tempDirectory = Files.createTempDirectory("").toFile();
1650+
final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt";
1651+
1652+
try {
1653+
// Create an index template for the data stream first
1654+
final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName);
1655+
final String templateBody = "{" +
1656+
"\"index_patterns\": [\"" + dataStreamName + "\"]," +
1657+
"\"data_stream\": {}," +
1658+
"\"template\": {" +
1659+
"\"mappings\": {" +
1660+
"\"properties\": {" +
1661+
"\"@timestamp\": {\"type\": \"date\"}" +
1662+
"}" +
1663+
"}" +
1664+
"}" +
1665+
"}";
1666+
createTemplateRequest.setJsonEntity(templateBody);
1667+
client.performRequest(createTemplateRequest);
1668+
1669+
// Create a data stream
1670+
final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName);
1671+
client.performRequest(createDataStreamRequest);
1672+
1673+
// Initialize sink AFTER creating the data stream so detection works
1674+
Map<String, Object> metadata = initializeConfigurationMetadata(null, dataStreamName, null);
1675+
metadata.put(RetryConfiguration.DLQ_FILE, dlqFile);
1676+
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
1677+
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
1678+
1679+
// Clear the index cache to force re-detection of data stream
1680+
sink.indexCache.clearAll();
1681+
System.out.println("DEBUG: Index cache cleared after initialization");
1682+
1683+
// Test that the data stream is detected
1684+
final String testIdField = "someId";
1685+
final String testId = "foo";
1686+
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
1687+
1688+
System.out.println("DEBUG: Outputting records to sink...");
1689+
System.out.println("DEBUG: Expected index name: " + dataStreamName);
1690+
1691+
// Verify data stream is detected before writing
1692+
Request verifyRequest = new Request(HttpMethod.GET, "/_data_stream/" + dataStreamName);
1693+
Response verifyResponse = client.performRequest(verifyRequest);
1694+
System.out.println("DEBUG: Data stream exists before write: " + (verifyResponse.getStatusLine().getStatusCode() == 200));
1695+
1696+
sink.output(testRecords);
1697+
System.out.println("DEBUG: Records output complete, shutting down sink...");
1698+
sink.shutdown();
1699+
System.out.println("DEBUG: Sink shutdown complete");
1700+
1701+
// Wait for indexing to complete
1702+
Thread.sleep(2000);
1703+
System.out.println("DEBUG: Wait complete, checking for documents...");
1704+
1705+
// Verify the document was written to the data stream
1706+
System.out.println("DEBUG: Data stream name: " + dataStreamName);
1707+
System.out.println("DEBUG: Test record: " + testRecords.get(0).getData());
1708+
1709+
// Check if data stream exists
1710+
Request checkRequest = new Request(HttpMethod.GET, "/_data_stream/" + dataStreamName);
1711+
Response checkResponse = client.performRequest(checkRequest);
1712+
System.out.println("DEBUG: Data stream exists: " + checkResponse.getStatusLine().getStatusCode());
1713+
System.out.println("DEBUG: Data stream info: " + EntityUtils.toString(checkResponse.getEntity()));
1714+
1715+
// Check metrics for errors
1716+
final List<Measurement> documentErrors = MetricsTestUtil.getMeasurementList(
1717+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
1718+
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
1719+
System.out.println("DEBUG: Document errors: " + (documentErrors.isEmpty() ? "none" : documentErrors.get(0).getValue()));
1720+
1721+
final List<Measurement> documentsSuccess = MetricsTestUtil.getMeasurementList(
1722+
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
1723+
.add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString());
1724+
System.out.println("DEBUG: Documents success: " + (documentsSuccess.isEmpty() ? "none" : documentsSuccess.get(0).getValue()));
1725+
1726+
// Check DLQ file for error details
1727+
if (new File(dlqFile).exists()) {
1728+
System.out.println("DEBUG: DLQ file contents:");
1729+
Files.lines(Paths.get(dlqFile)).forEach(line -> System.out.println("DLQ: " + line));
1730+
} else {
1731+
System.out.println("DEBUG: No DLQ file created");
1732+
}
1733+
1734+
// The issue is that DataStreamDetector.isDataStream() is returning false
1735+
// This happens because the sink is initialized BEFORE the data stream is created
1736+
// So the cache has a negative result
1737+
1738+
final List<Map<String, Object>> retSources = getSearchResponseDocSources(dataStreamName);
1739+
System.out.println("DEBUG: Number of documents found: " + retSources.size());
1740+
System.out.println("DEBUG: Documents: " + retSources);
1741+
assertThat("Expected 1 document in data stream " + dataStreamName + " but found " + retSources.size(),
1742+
retSources.size(), equalTo(1));
1743+
} catch (Exception e) {
1744+
System.err.println("ERROR: Test failed with exception: " + e.getMessage());
1745+
e.printStackTrace();
1746+
throw e;
1747+
} finally {
1748+
// Clean up the data stream
1749+
final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName);
1750+
try {
1751+
client.performRequest(deleteDataStreamRequest);
1752+
} catch (IOException e) {
1753+
// Ignore cleanup errors
1754+
}
1755+
1756+
// Clean up the index template
1757+
final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName);
1758+
try {
1759+
client.performRequest(deleteTemplateRequest);
1760+
} catch (IOException e) {
1761+
// Ignore cleanup errors
1762+
}
1763+
1764+
// Clean up DLQ
1765+
FileUtils.deleteQuietly(tempDirectory);
1766+
}
1767+
}
1768+
16401769
@Test
16411770
@Timeout(value = 1, unit = TimeUnit.MINUTES)
16421771
@DisabledIf(value = "isES6",
@@ -1962,6 +2091,11 @@ private static boolean isES6() {
19622091
return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0;
19632092
}
19642093

2094+
private static boolean isDataStreamNotSupported() {
2095+
// Data streams require OpenSearch 1.3.0+
2096+
return OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.parse("opensearch:1.3.0")) < 0;
2097+
}
2098+
19652099
private static Stream<Object> getAttributeTestSpecialAndExtremeValues() {
19662100
return Stream.of(
19672101
null,

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
155155
private FailedBulkOperationConverter failedBulkOperationConverter;
156156
private DataStreamDetector dataStreamDetector;
157157
private DataStreamIndex dataStreamIndex;
158-
private IndexCache indexCache;
158+
IndexCache indexCache;
159159

160160
private DlqProvider dlqProvider;
161161
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
@@ -314,7 +314,7 @@ private void doInitializeInternal() throws IOException {
314314

315315
this.indexCache = new IndexCache();
316316
this.dataStreamDetector = new DataStreamDetector(openSearchClient, indexCache);
317-
this.dataStreamIndex = new DataStreamIndex(dataStreamDetector, documentIdField, documentId, routingField, routing);
317+
this.dataStreamIndex = new DataStreamIndex(dataStreamDetector, openSearchSinkConfig.getIndexConfiguration());
318318

319319
this.initialized = true;
320320
LOG.info("Initialized OpenSearch sink");
@@ -446,7 +446,6 @@ public void doOutput(final Collection<Record<Event>> records) {
446446

447447
for (final Record<Event> record : records) {
448448
final Event event = record.getData();
449-
final SerializedJson document = getDocument(event);
450449
String indexName = configuredIndexAlias;
451450
try {
452451
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
@@ -456,6 +455,9 @@ public void doOutput(final Collection<Record<Event>> records) {
456455
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
457456
continue;
458457
}
458+
459+
dataStreamIndex.ensureTimestamp(event, indexName);
460+
final SerializedJson document = getDocument(event);
459461

460462
Long version = null;
461463
String versionExpressionEvaluationResult = null;
@@ -512,8 +514,6 @@ public void doOutput(final Collection<Record<Event>> records) {
512514
BulkOperation bulkOperation;
513515

514516
try {
515-
dataStreamIndex.ensureTimestamp(event, indexName);
516-
517517
bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode());
518518
} catch (final Exception e) {
519519
LOG.error("An exception occurred while constructing the bulk operation for a document: ", e);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetector.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
@@ -35,12 +40,16 @@ public DataStreamDetector(final OpenSearchClient openSearchClient, final IndexCa
3540
*/
3641
public boolean isDataStream(final String indexName) {
3742
final Boolean cached = indexCache.getDataStreamResult(indexName);
43+
LOG.info("isDataStream called for '{}', cached result: {}", indexName, cached);
3844
if (cached != null) {
45+
LOG.info("Returning cached result for '{}': {}", indexName, cached);
3946
return cached;
4047
}
4148

49+
LOG.info("No cached result for '{}', checking OpenSearch...", indexName);
4250
final boolean result = checkDataStream(indexName);
4351
indexCache.putDataStreamResult(indexName, result);
52+
LOG.info("Cached result for '{}': {}", indexName, result);
4453
return result;
4554
}
4655

@@ -50,7 +59,9 @@ private boolean checkDataStream(final String indexName) {
5059
final GetDataStreamResponse response = openSearchClient.indices().getDataStream(request);
5160

5261
// If we get a response without exception, it's a data stream
53-
return response.dataStreams() != null && !response.dataStreams().isEmpty();
62+
final boolean isDataStream = response.dataStreams() != null && !response.dataStreams().isEmpty();
63+
LOG.info("Data stream check for '{}': {}", indexName, isDataStream);
64+
return isDataStream;
5465

5566
} catch (final IOException e) {
5667
// If we get a 404 or similar, it's not a data stream

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndex.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@ public class DataStreamIndex {
1616
private static final String TIMESTAMP_FIELD = "@timestamp";
1717

1818
private final DataStreamDetector dataStreamDetector;
19-
private final String documentIdField;
20-
private final String documentId;
21-
private final String routingField;
22-
private final String routing;
19+
private final IndexConfiguration indexConfiguration;
2320

24-
public DataStreamIndex(final DataStreamDetector dataStreamDetector, final String documentIdField, final String documentId, final String routingField, final String routing) {
21+
public DataStreamIndex(final DataStreamDetector dataStreamDetector, final IndexConfiguration indexConfiguration) {
2522
this.dataStreamDetector = dataStreamDetector;
26-
this.documentIdField = documentIdField;
27-
this.documentId = documentId;
28-
this.routingField = routingField;
29-
this.routing = routing;
23+
this.indexConfiguration = indexConfiguration;
3024
}
3125

3226

@@ -48,28 +42,25 @@ public String determineAction(final String configuredAction, final String indexN
4842

4943

5044
public void ensureTimestamp(final Event event, final String indexName) {
51-
if (dataStreamDetector.isDataStream(indexName) && !event.containsKey(TIMESTAMP_FIELD)) {
52-
event.put(TIMESTAMP_FIELD, event.getEventHandle().getInternalOriginationTime());
45+
LOG.info("ensureTimestamp called for index: {}", indexName);
46+
final boolean isDataStream = dataStreamDetector.isDataStream(indexName);
47+
final boolean hasTimestamp = event.containsKey(TIMESTAMP_FIELD);
48+
LOG.info("Index '{}' - isDataStream: {}, hasTimestamp: {}", indexName, isDataStream, hasTimestamp);
49+
50+
if (isDataStream && !hasTimestamp) {
51+
event.put(TIMESTAMP_FIELD, event.getEventHandle().getInternalOriginationTime().toEpochMilli());
52+
LOG.info("Added @timestamp to event for data stream '{}'", indexName);
5353
}
5454
}
5555

5656
private void validateConfigurationForDataStream(final String indexName) {
57-
if (documentIdField != null || documentId != null) {
57+
if (indexConfiguration.getDocumentIdField() != null || indexConfiguration.getDocumentId() != null) {
5858
LOG.warn("Data Stream '{}' with document ID configuration uses first-write-wins behavior. Subsequent writes to the same ID will be ignored.", indexName);
5959
}
60-
if (routingField != null || routing != null) {
60+
if (indexConfiguration.getRoutingField() != null || indexConfiguration.getRouting() != null) {
6161
LOG.warn("Data Stream '{}' does not support routing. Routing configuration will be ignored.", indexName);
6262
}
6363
}
6464

65-
public void validateDataStreamCompatibility(final String indexName, final String documentId, final String routing) {
66-
if (dataStreamDetector.isDataStream(indexName)) {
67-
if (documentId != null) {
68-
LOG.warn("Data Stream '{}' with document ID '{}' uses first-write-wins behavior. Subsequent writes to the same ID will be ignored.", indexName, documentId);
69-
}
70-
if (routing != null) {
71-
LOG.warn("Data Stream '{}' does not support routing. Routing '{}' will be ignored.", indexName, routing);
72-
}
73-
}
74-
}
65+
7566
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public Boolean getDataStreamResult(final String indexName) {
2323
return dataStreamCache.get(indexName);
2424
}
2525

26-
void clearAll() {
26+
public void clearAll() {
2727
dataStreamCache.clear();
2828
}
2929
}

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndexTest.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ class DataStreamIndexTest {
2727
@Mock
2828
private DataStreamDetector dataStreamDetector;
2929

30+
@Mock
31+
private IndexConfiguration indexConfiguration;
32+
3033
@Mock
3134
private Event event;
3235

@@ -37,7 +40,7 @@ class DataStreamIndexTest {
3740

3841
@BeforeEach
3942
void setUp() {
40-
dataStreamIndex = new DataStreamIndex(dataStreamDetector, null, null, null, null);
43+
dataStreamIndex = new DataStreamIndex(dataStreamDetector, indexConfiguration);
4144
}
4245

4346
@Test
@@ -100,24 +103,5 @@ void ensureTimestamp_doesNotSetTimestamp_whenNotDataStream() {
100103
verify(event, never()).put("@timestamp", eventHandle.getInternalOriginationTime());
101104
}
102105

103-
@Test
104-
void validateDataStreamCompatibility_logsWarning_whenDataStreamHasDocumentId() {
105-
when(dataStreamDetector.isDataStream("my-data-stream")).thenReturn(true);
106-
107-
dataStreamIndex.validateDataStreamCompatibility("my-data-stream", "doc-123", null);
108-
}
109-
110-
@Test
111-
void validateDataStreamCompatibility_logsWarning_whenDataStreamHasRouting() {
112-
when(dataStreamDetector.isDataStream("my-data-stream")).thenReturn(true);
113-
114-
dataStreamIndex.validateDataStreamCompatibility("my-data-stream", null, "routing-key");
115-
}
116-
117-
@Test
118-
void validateDataStreamCompatibility_doesNotLogWarning_whenNotDataStream() {
119-
when(dataStreamDetector.isDataStream("regular-index")).thenReturn(false);
120-
121-
dataStreamIndex.validateDataStreamCompatibility("regular-index", "doc-123", "routing-key");
122-
}
106+
123107
}

0 commit comments

Comments
 (0)