diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java
new file mode 100644
index 0000000000..9b6a3ba01a
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java
@@ -0,0 +1,506 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.sink.opensearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.micrometer.core.instrument.Measurement;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.DisabledIf;
+import org.mockito.Mock;
+import org.opensearch.client.Request;
+import org.opensearch.client.Response;
+import org.opensearch.client.RestClient;
+import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+import org.opensearch.dataprepper.metrics.MetricNames;
+import org.opensearch.dataprepper.metrics.MetricsTestUtil;
+import org.opensearch.dataprepper.model.configuration.PipelineDescription;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.metric.DefaultQuantile;
+import org.opensearch.dataprepper.model.metric.JacksonGauge;
+import org.opensearch.dataprepper.model.metric.JacksonHistogram;
+import org.opensearch.dataprepper.model.metric.JacksonSum;
+import org.opensearch.dataprepper.model.metric.JacksonSummary;
+import org.opensearch.dataprepper.model.metric.Quantile;
+import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.sink.SinkContext;
+import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
+import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
+import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants;
+import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
+
+import javax.ws.rs.HttpMethod;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.http.HttpStatus.SC_OK;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
+import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
+import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts;
+import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.waitForClusterStateUpdatesToFinish;
+import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates;
+
+/**
+ * Integration tests for the OpenSearch sink with {@code index_type: tsdb}.
+ *
+ *
These tests require a running OpenSearch cluster. Configure via system properties:
+ *
+ * - {@code tests.opensearch.host}
+ * - {@code tests.opensearch.user} (optional)
+ * - {@code tests.opensearch.password} (optional)
+ *
+ *
+ * Note: TSDB-specific settings ({@code tsdb_engine.enabled}, {@code tsdb_store}) require the TSDB plugin
+ * installed on the OpenSearch cluster. These tests validate the sink's document conversion and indexing
+ * behavior using the standard TSDB index template. If the TSDB plugin is not installed, index creation
+ * will use the template mappings but TSDB-specific settings will be ignored by OpenSearch.
+ */
+class OpenSearchSinkTsdbIT {
+
+ private static final String AUTHENTICATION = "authentication";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String PLUGIN_NAME = "opensearch";
+ private static final String PIPELINE_NAME = "tsdbIntegTestPipeline";
+ private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false";
+ private static final String TEST_TIME = "2024-02-02T10:30:00Z";
+
+ private RestClient client;
+ private final List sinksToShutdown = new ArrayList<>();
+ private ObjectMapper objectMapper;
+
+ @Mock
+ private AwsCredentialsSupplier awsCredentialsSupplier;
+ @Mock
+ private ExpressionEvaluator expressionEvaluator;
+ @Mock
+ private PipelineDescription pipelineDescription;
+ @Mock
+ private PluginSetting pluginSetting;
+ @Mock
+ private PluginConfigObservable pluginConfigObservable;
+
+ @BeforeEach
+ void setup() {
+ pluginConfigObservable = mock(PluginConfigObservable.class);
+ expressionEvaluator = mock(ExpressionEvaluator.class);
+ pipelineDescription = mock(PipelineDescription.class);
+ pluginSetting = mock(PluginSetting.class);
+ awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
+ when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false);
+ objectMapper = new ObjectMapper();
+ }
+
+ @BeforeEach
+ void metricsInit() throws IOException {
+ MetricsTestUtil.initMetrics();
+ client = createOpenSearchClient();
+ }
+
+ @AfterEach
+ void cleanOpenSearch() throws Exception {
+ wipeAllOpenSearchIndices();
+ wipeAllTemplates();
+ waitForClusterStateUpdatesToFinish();
+ }
+
+ @AfterEach
+ void shutdownSinks() {
+ for (final OpenSearchSink sink : sinksToShutdown) {
+ sink.shutdown();
+ }
+ sinksToShutdown.clear();
+ }
+
+ @AfterEach
+ void closeClient() throws IOException {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+
+ @Test
+ @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
+ @Timeout(value = 50, unit = TimeUnit.SECONDS)
+ void testInstantiateSinkTsdbDefault() throws IOException {
+ final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null);
+ final OpenSearchSink sink = createObjectUnderTest(config, true);
+
+ final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB);
+ assertThat(indexAlias, equalTo("metrics-tsdb-v1"));
+
+ final Request request = new Request(HttpMethod.HEAD, indexAlias);
+ final Response response = client.performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
+
+ final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(
+ OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : "";
+ final Request mappingRequest = new Request(HttpMethod.GET, indexAlias + "/_mappings" + extraURI);
+ final Response mappingResponse = client.performRequest(mappingRequest);
+ final String mappingBody = EntityUtils.toString(mappingResponse.getEntity());
+ @SuppressWarnings("unchecked")
+ final Map mappingResult = createContentParser(XContentType.JSON.xContent(), mappingBody).map();
+ final String actualIndex = mappingResult.keySet().iterator().next();
+ @SuppressWarnings("unchecked")
+ final Map mappings = (Map) ((Map) mappingResult.get(actualIndex)).get("mappings");
+ assertThat(mappings, notNullValue());
+
+ @SuppressWarnings("unchecked")
+ final Map properties = (Map) mappings.get("properties");
+ assertThat(properties, notNullValue());
+ assertThat(properties.containsKey("labels"), equalTo(true));
+ assertThat(properties.containsKey("timestamp"), equalTo(true));
+ assertThat(properties.containsKey("value"), equalTo(true));
+
+ }
+
+
+ @Test
+ @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
+ @Timeout(value = 50, unit = TimeUnit.SECONDS)
+ void testOutputGauge() throws IOException, InterruptedException {
+ final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null);
+ final OpenSearchSink sink = createObjectUnderTest(config, true);
+
+ final JacksonGauge gauge = JacksonGauge.builder()
+ .withName("cpu_temp")
+ .withValue(72.5)
+ .withTime(TEST_TIME)
+ .withAttributes(Map.of("host", "server-01"))
+ .withEventKind("GAUGE")
+ .build();
+
+ sink.output(List.of(new Record<>(gauge)));
+
+ final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB);
+ final List