Skip to content

Commit 9df5edf

Browse files
Add index_type tsdb support to OpenSearch sink for Prometheus metrics
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 73b809a commit 9df5edf

12 files changed

Lines changed: 1332 additions & 21 deletions

File tree

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java

Lines changed: 528 additions & 0 deletions
Large diffs are not rendered by default.

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
3838
import org.opensearch.dataprepper.model.configuration.PluginSetting;
3939
import org.opensearch.dataprepper.model.event.Event;
40+
import org.opensearch.dataprepper.model.event.EventHandle;
4041
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
4142
import org.opensearch.dataprepper.model.failures.DlqObject;
4243
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
@@ -77,6 +78,7 @@
7778
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper;
7879
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
7980
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
81+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TSDBDocumentBuilder;
8082
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
8183
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions;
8284
import org.slf4j.Logger;
@@ -171,6 +173,8 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
171173

172174
private ExistingDocumentQueryManager existingDocumentQueryManager;
173175

176+
private final TSDBDocumentBuilder tsdbDocumentBuilder;
177+
174178
private final ExecutorService queryExecutorService;
175179

176180
private final int processWorkerThreads;
@@ -219,6 +223,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
219223
this.lastFlushTimeMap = new ConcurrentHashMap<>();
220224
this.pluginConfigObservable = pluginConfigObservable;
221225
this.objectMapper = new ObjectMapper();
226+
this.tsdbDocumentBuilder = (this.indexType == IndexType.TSDB) ? new TSDBDocumentBuilder() : null;
222227
this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ?
223228
Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null;
224229

@@ -470,6 +475,32 @@ public void doOutput(final Collection<Record<Event>> records) {
470475
}
471476

472477
dataStreamIndex.ensureTimestamp(event, indexName);
478+
479+
if (indexType == IndexType.TSDB) {
480+
try {
481+
final List<String> tsdbDocs = tsdbDocumentBuilder.build(event);
482+
final String tsdbAction = resolveEventAction(event);
483+
final List<BulkOperationWrapper> wrappers = new ArrayList<>(tsdbDocs.size());
484+
for (int i = 0; i < tsdbDocs.size(); i++) {
485+
final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDocs.get(i), null, null, null);
486+
final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null);
487+
final BulkOperationWrapper wrapper = (i == 0)
488+
? new BulkOperationWrapper(op, event.getEventHandle(), null, null)
489+
: new BulkOperationWrapper(op, (EventHandle) null, null, null);
490+
wrappers.add(wrapper);
491+
}
492+
for (final BulkOperationWrapper wrapper : wrappers) {
493+
bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime);
494+
bulkRequest.addOperation(wrapper);
495+
}
496+
} catch (final Exception e) {
497+
LOG.error("Failed to build TSDB documents for event: {}", e.getMessage(), e);
498+
dynamicIndexDroppedEvents.increment();
499+
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
500+
}
501+
continue;
502+
}
503+
473504
final SerializedJson document = getDocument(event);
474505

475506
Long version = null;
@@ -502,20 +533,7 @@ public void doOutput(final Collection<Record<Event>> records) {
502533
}
503534
}
504535

505-
String eventAction = action;
506-
if (actions != null) {
507-
for (final ActionConfiguration actionEntry: actions) {
508-
final String condition = actionEntry.getWhen();
509-
eventAction = actionEntry.getType();
510-
if (condition != null &&
511-
expressionEvaluator.evaluateConditional(condition, event)) {
512-
break;
513-
}
514-
}
515-
}
516-
if (eventAction.contains("${")) {
517-
eventAction = event.formatString(eventAction, expressionEvaluator);
518-
}
536+
String eventAction = resolveEventAction(event);
519537

520538
if (dataStreamDetector.isDataStream(indexName)) {
521539
eventAction = dataStreamIndex.determineAction(eventAction, indexName);
@@ -623,7 +641,7 @@ void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOper
623641
if (bulkOperation.getEvent() != null) {
624642
bulkOperation.getEvent().getEventHandle().release(true);
625643
} else {
626-
bulkOperation.getEventHandle().release(true);
644+
bulkOperation.releaseEventHandle(true);
627645
}
628646
}
629647
return;
@@ -776,6 +794,24 @@ private DlqObject createDlqObjectFromEvent(final Event event,
776794
return builder.build();
777795
}
778796

797+
private String resolveEventAction(final Event event) {
798+
String resolvedAction = action;
799+
if (actions != null) {
800+
for (final ActionConfiguration actionEntry : actions) {
801+
final String condition = actionEntry.getWhen();
802+
resolvedAction = actionEntry.getType();
803+
if (condition != null &&
804+
expressionEvaluator.evaluateConditional(condition, event)) {
805+
break;
806+
}
807+
}
808+
}
809+
if (resolvedAction.contains("${")) {
810+
resolvedAction = event.formatString(resolvedAction, expressionEvaluator);
811+
}
812+
return resolvedAction;
813+
}
814+
779815
/**
780816
* This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down
781817
* based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,8 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
444444
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
445445
} else if (indexType.equals(IndexType.METRIC_ANALYTICS_PLAIN)) {
446446
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_STANDARD_TEMPLATE_FILE);
447+
} else if (indexType.equals(IndexType.TSDB)) {
448+
templateURL = loadExistingTemplate(templateType, IndexConstants.TSDB_DEFAULT_TEMPLATE_FILE);
447449
} else if (templateFile != null) {
448450
if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
449451
FileReader s3FileReader = new S3FileReader(s3Client);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class IndexConstants {
4242
public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_NO_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-no-ism-template.json";
4343
public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_WITH_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-with-ism-template.json";
4444

45+
public static final String TSDB_DEFAULT_TEMPLATE_FILE = "tsdb-index-template.json";
46+
4547
static {
4648
// TODO: extract out version number into version enum
4749
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
@@ -52,5 +54,6 @@ public class IndexConstants {
5254
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS_PLAIN, "logs-otel-v1");
5355
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
5456
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS_PLAIN, "metrics-otel-v1");
57+
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TSDB, "metrics-tsdb-v1");
5558
}
5659
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public final IndexManager getIndexManager(final IndexType indexType,
5656
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
5757
break;
5858
case TRACE_ANALYTICS_SERVICE_MAP:
59+
case TSDB:
5960
indexManager = new TraceAnalyticsServiceMapIndexManager(
6061
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
6162
break;

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public enum IndexType {
2020
LOG_ANALYTICS_PLAIN("log-analytics-plain"),
2121
METRIC_ANALYTICS("metric-analytics"),
2222
METRIC_ANALYTICS_PLAIN("metric-analytics-plain"),
23+
TSDB("tsdb"),
2324
CUSTOM("custom"),
2425
MANAGEMENT_DISABLED("management_disabled");
2526

0 commit comments

Comments
 (0)