Skip to content

Commit 15c537d

Browse files
Address code review feedback for TSDB support
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent ac1a537 commit 15c537d

7 files changed

Lines changed: 90 additions & 132 deletions

File tree

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ void closeClient() throws IOException {
153153
}
154154
}
155155

156-
// --- Index Initialization ---
157156

158157
@Test
159158
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -169,7 +168,6 @@ void testInstantiateSinkTsdbDefault() throws IOException {
169168
final Response response = client.performRequest(request);
170169
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
171170

172-
// Query mappings via alias (TSDB uses NoIsmPolicyManagement so index name may not follow -000001 pattern)
173171
final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(
174172
OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : "";
175173
final Request mappingRequest = new Request(HttpMethod.GET, indexAlias + "/_mappings" + extraURI);
@@ -182,19 +180,15 @@ void testInstantiateSinkTsdbDefault() throws IOException {
182180
final Map<String, Object> mappings = (Map<String, Object>) ((Map<String, Object>) mappingResult.get(actualIndex)).get("mappings");
183181
assertThat(mappings, notNullValue());
184182

185-
// Verify TSDB-specific mapping fields
186183
@SuppressWarnings("unchecked")
187184
final Map<String, Object> properties = (Map<String, Object>) mappings.get("properties");
188185
assertThat(properties, notNullValue());
189186
assertThat(properties.containsKey("labels"), equalTo(true));
190187
assertThat(properties.containsKey("timestamp"), equalTo(true));
191188
assertThat(properties.containsKey("value"), equalTo(true));
192189

193-
// TSDB uses NoIsmPolicyManagement — no ISM policy should be attached
194-
// (unlike metric-analytics which has ISM)
195190
}
196191

197-
// --- Gauge Output ---
198192

199193
@Test
200194
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -222,15 +216,13 @@ void testOutputGauge() throws IOException, InterruptedException {
222216
assertThat(((Number) doc.get("value")).doubleValue(), closeTo(72.5, 0.001));
223217
assertThat(doc.get("timestamp"), notNullValue());
224218

225-
// Verify metrics
226219
final List<Measurement> bulkRequestErrors = MetricsTestUtil.getMeasurementList(
227220
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
228221
.add(OpenSearchSink.BULKREQUEST_ERRORS).toString());
229222
assertThat(bulkRequestErrors.size(), equalTo(1));
230223
Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0);
231224
}
232225

233-
// --- Sum (Counter) Output ---
234226

235227
@Test
236228
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -255,19 +247,16 @@ void testOutputMonotonicSum() throws IOException, InterruptedException {
255247
assertThat(sources, hasSize(1));
256248

257249
final Map<String, Object> doc = sources.get(0);
258-
// Monotonic sum should have _total suffix
259250
assertThat(doc.get("labels"), equalTo("__name__ http_requests_total method GET"));
260251
assertThat(((Number) doc.get("value")).doubleValue(), closeTo(100.0, 0.001));
261252

262-
// Verify metrics
263253
final List<Measurement> bulkRequestErrors = MetricsTestUtil.getMeasurementList(
264254
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
265255
.add(OpenSearchSink.BULKREQUEST_ERRORS).toString());
266256
assertThat(bulkRequestErrors.size(), equalTo(1));
267257
Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0);
268258
}
269259

270-
// --- Histogram Expansion ---
271260

272261
@Test
273262
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -291,20 +280,16 @@ void testOutputHistogramExpansion() throws IOException, InterruptedException {
291280

292281
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB);
293282
final List<Map<String, Object>> sources = getSearchResponseDocSources(indexAlias);
294-
// 4 bucket docs + 1 _count + 1 _sum = 6
295283
assertThat(sources, hasSize(6));
296284

297-
// Collect labels for verification
298285
final List<String> labels = sources.stream()
299286
.map(s -> (String) s.get("labels"))
300287
.collect(Collectors.toList());
301288

302-
// Verify bucket docs exist with cumulative counts
303289
assertThat(labels.stream().filter(l -> l.contains("request_duration_bucket")).count(), equalTo(4L));
304290
assertThat(labels.stream().filter(l -> l.contains("request_duration_count")).count(), equalTo(1L));
305291
assertThat(labels.stream().filter(l -> l.contains("request_duration_sum")).count(), equalTo(1L));
306292

307-
// Verify cumulative bucket values
308293
final Map<String, Double> labelToValue = sources.stream()
309294
.collect(Collectors.toMap(s -> (String) s.get("labels"), s -> ((Number) s.get("value")).doubleValue()));
310295

@@ -315,22 +300,19 @@ void testOutputHistogramExpansion() throws IOException, InterruptedException {
315300
assertThat(labelToValue.get("__name__ request_duration_count method GET"), closeTo(20.0, 0.001));
316301
assertThat(labelToValue.get("__name__ request_duration_sum method GET"), closeTo(5.5, 0.001));
317302

318-
// Verify document success count matches expanded document count
319303
final List<Measurement> documentsSuccess = MetricsTestUtil.getMeasurementList(
320304
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
321305
.add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString());
322306
assertThat(documentsSuccess.size(), equalTo(1));
323307
assertThat(documentsSuccess.get(0).getValue(), closeTo(6.0, 0));
324308

325-
// Verify no errors
326309
final List<Measurement> bulkRequestErrors = MetricsTestUtil.getMeasurementList(
327310
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
328311
.add(OpenSearchSink.BULKREQUEST_ERRORS).toString());
329312
assertThat(bulkRequestErrors.size(), equalTo(1));
330313
Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0);
331314
}
332315

333-
// --- Summary Expansion ---
334316

335317
@Test
336318
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -358,7 +340,6 @@ void testOutputSummaryExpansion() throws IOException, InterruptedException {
358340

359341
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB);
360342
final List<Map<String, Object>> sources = getSearchResponseDocSources(indexAlias);
361-
// 2 quantile docs + 1 _count + 1 _sum = 4
362343
assertThat(sources, hasSize(4));
363344

364345
final Map<String, Double> labelToValue = sources.stream()
@@ -369,15 +350,13 @@ void testOutputSummaryExpansion() throws IOException, InterruptedException {
369350
assertThat(labelToValue.get("__name__ rpc_latency_count service api"), closeTo(1000.0, 0.001));
370351
assertThat(labelToValue.get("__name__ rpc_latency_sum service api"), closeTo(300.5, 0.001));
371352

372-
// Verify metrics
373353
final List<Measurement> bulkRequestErrors = MetricsTestUtil.getMeasurementList(
374354
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
375355
.add(OpenSearchSink.BULKREQUEST_ERRORS).toString());
376356
assertThat(bulkRequestErrors.size(), equalTo(1));
377357
Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0);
378358
}
379359

380-
// --- Multiple Metrics in Single Batch ---
381360

382361
@Test
383362
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -408,18 +387,15 @@ void testOutputMixedMetricTypes() throws IOException, InterruptedException {
408387

409388
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB);
410389
final List<Map<String, Object>> sources = getSearchResponseDocSources(indexAlias);
411-
// 1 gauge + 1 sum = 2 documents
412390
assertThat(sources, hasSize(2));
413391

414-
// Verify metrics
415392
final List<Measurement> bulkRequestErrors = MetricsTestUtil.getMeasurementList(
416393
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
417394
.add(OpenSearchSink.BULKREQUEST_ERRORS).toString());
418395
assertThat(bulkRequestErrors.size(), equalTo(1));
419396
Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0);
420397
}
421398

422-
// --- Re-instantiation (no duplicate index) ---
423399

424400
@Test
425401
@DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6")
@@ -433,15 +409,13 @@ void testReinstantiateSinkDoesNotCreateDuplicateIndex() throws IOException {
433409
Response response = client.performRequest(request);
434410
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
435411

436-
// Reinstantiate sink — should not fail
437412
createObjectUnderTest(config, true);
438413

439414
request = new Request(HttpMethod.HEAD, indexAlias);
440415
response = client.performRequest(request);
441416
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
442417
}
443418

444-
// --- Helper methods ---
445419

446420
private OpenSearchSink createObjectUnderTest(final OpenSearchSinkConfig openSearchSinkConfig, final boolean doInitialize) {
447421
final SinkContext sinkContext = mock(SinkContext.class);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.dataprepper.model.configuration.PluginSetting;
3434
import org.opensearch.dataprepper.model.event.Event;
3535
import org.opensearch.dataprepper.model.event.EventHandle;
36+
import org.opensearch.dataprepper.model.event.InternalEventHandle;
3637
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
3738
import org.opensearch.dataprepper.model.failures.DlqObject;
3839
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
@@ -73,7 +74,8 @@
7374
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper;
7475
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
7576
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
76-
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TSDBDocumentBuilder;
77+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilder;
78+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilderFactory;
7779
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
7880
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions;
7981
import org.slf4j.Logger;
@@ -170,7 +172,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
170172

171173
private ExistingDocumentQueryManager existingDocumentQueryManager;
172174

173-
private final TSDBDocumentBuilder tsdbDocumentBuilder;
175+
private final CustomDocumentBuilder customDocumentBuilder;
174176

175177
private final ExecutorService queryExecutorService;
176178

@@ -222,7 +224,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
222224
this.pluginConfigObservable = pluginConfigObservable;
223225
this.objectMapper = new ObjectMapper();
224226
this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters());
225-
this.tsdbDocumentBuilder = (this.indexType == IndexType.TSDB) ? new TSDBDocumentBuilder() : null;
227+
this.customDocumentBuilder = new CustomDocumentBuilderFactory().create(this.indexType);
226228
this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ?
227229
Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null;
228230

@@ -390,20 +392,20 @@ public void doOutput(final Collection<Record<Event>> records) {
390392

391393
dataStreamIndex.ensureTimestamp(event, indexName);
392394

393-
if (indexType == IndexType.TSDB) {
395+
if (customDocumentBuilder != null) {
394396
try {
395-
final List<String> tsdbDocs = tsdbDocumentBuilder.build(event);
397+
final List<String> tsdbDocs = customDocumentBuilder.buildDocuments(event);
396398
final String tsdbAction = resolveEventAction(event);
397-
final List<BulkOperationWrapper> wrappers = new ArrayList<>(tsdbDocs.size());
398-
for (int i = 0; i < tsdbDocs.size(); i++) {
399-
final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDocs.get(i), null, null, null);
400-
final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null);
401-
final BulkOperationWrapper wrapper = (i == tsdbDocs.size() - 1)
402-
? new BulkOperationWrapper(op, event.getEventHandle(), null, null)
403-
: new BulkOperationWrapper(op, (EventHandle) null, null, null);
404-
wrappers.add(wrapper);
399+
final EventHandle eventHandle = event.getEventHandle();
400+
if (tsdbDocs.size() > 1 && eventHandle instanceof InternalEventHandle) {
401+
for (int i = 0; i < tsdbDocs.size() - 1; i++) {
402+
((InternalEventHandle) eventHandle).acquireReference();
403+
}
405404
}
406-
for (final BulkOperationWrapper wrapper : wrappers) {
405+
for (final String tsdbDoc : tsdbDocs) {
406+
final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDoc, null, null, null);
407+
final BulkOperation op = bulkOperationFactory.create(tsdbAction, doc, null, indexName, null);
408+
final BulkOperationWrapper wrapper = new BulkOperationWrapper(op, eventHandle, null, null);
407409
bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime);
408410
bulkRequest.addOperation(wrapper);
409411
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
11+
12+
import org.opensearch.dataprepper.model.event.Event;
13+
14+
import java.util.List;
15+
16+
/**
17+
* Converts a Data Prepper {@link Event} into one or more JSON document strings
18+
* suitable for indexing into OpenSearch. Implementations handle index-type-specific
19+
* document transformations such as TSDB metric expansion.
20+
*/
21+
public interface CustomDocumentBuilder {
22+
List<String> buildDocuments(Event event);
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
11+
12+
public final class CustomDocumentBuilderFactory {
13+
14+
public CustomDocumentBuilder create(final IndexType indexType) {
15+
if (indexType == IndexType.TSDB) {
16+
return new TSDBDocumentBuilder();
17+
}
18+
return null;
19+
}
20+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public final IndexManager getIndexManager(final IndexType indexType,
5656
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
5757
break;
5858
case TRACE_ANALYTICS_SERVICE_MAP:
59-
case TSDB:
6059
indexManager = new TraceAnalyticsServiceMapIndexManager(
6160
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
6261
break;

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java

Lines changed: 15 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
1111

12+
import com.fasterxml.jackson.core.JsonProcessingException;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import com.fasterxml.jackson.databind.node.ObjectNode;
1215
import org.opensearch.dataprepper.model.event.Event;
1316
import org.opensearch.dataprepper.model.metric.Gauge;
1417
import org.opensearch.dataprepper.model.metric.Histogram;
@@ -27,13 +30,14 @@
2730
import java.util.List;
2831
import java.util.Map;
2932

30-
public final class TSDBDocumentBuilder {
33+
public final class TSDBDocumentBuilder implements CustomDocumentBuilder {
3134

3235
private static final Logger LOG = LoggerFactory.getLogger(TSDBDocumentBuilder.class);
3336
private static final String NAME_LABEL = "__name__";
34-
private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray();
37+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
3538

36-
public List<String> build(final Event event) {
39+
@Override
40+
public List<String> buildDocuments(final Event event) {
3741
if (!(event instanceof Metric)) {
3842
throw new IllegalArgumentException(
3943
"TSDB index_type requires Metric events. Received: " + event.getClass().getName());
@@ -171,45 +175,14 @@ private static String buildLabels(final String metricName, final String[][] sort
171175
}
172176

173177
private static String buildJsonDoc(final String labels, final long timestamp, final double value) {
174-
final StringBuilder sb = new StringBuilder(labels.length() + 64);
175-
sb.append("{\"labels\":\"");
176-
appendJsonEscaped(sb, labels);
177-
sb.append("\",\"timestamp\":");
178-
sb.append(timestamp);
179-
sb.append(",\"value\":");
180-
sb.append(value);
181-
sb.append('}');
182-
return sb.toString();
183-
}
184-
185-
private static void appendJsonEscaped(final StringBuilder sb, final String s) {
186-
for (int i = 0; i < s.length(); i++) {
187-
final char c = s.charAt(i);
188-
switch (c) {
189-
case '"':
190-
sb.append("\\\"");
191-
break;
192-
case '\\':
193-
sb.append("\\\\");
194-
break;
195-
case '\n':
196-
sb.append("\\n");
197-
break;
198-
case '\r':
199-
sb.append("\\r");
200-
break;
201-
case '\t':
202-
sb.append("\\t");
203-
break;
204-
default:
205-
if (c < 0x20) {
206-
sb.append("\\u00");
207-
sb.append(HEX_CHARS[(c >> 4) & 0xF]);
208-
sb.append(HEX_CHARS[c & 0xF]);
209-
} else {
210-
sb.append(c);
211-
}
212-
}
178+
final ObjectNode node = OBJECT_MAPPER.createObjectNode();
179+
node.put("labels", labels);
180+
node.put("timestamp", timestamp);
181+
node.put("value", value);
182+
try {
183+
return OBJECT_MAPPER.writeValueAsString(node);
184+
} catch (final JsonProcessingException e) {
185+
throw new RuntimeException("Failed to serialize TSDB document", e);
213186
}
214187
}
215188

0 commit comments

Comments
 (0)