Skip to content

Commit d47e3ff

Browse files
kkondakaEC2 Default User
authored andcommitted
Support trace groups in standard otel proto codec (opensearch-project#5539)
* Support trace groups in standard otel proto codec Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments. Verified the functionality for standard logs/traces/metrics Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added string mappings for resource and scope Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 088476a commit d47e3ff

27 files changed

Lines changed: 1803 additions & 213 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class JacksonSpan extends JacksonEvent implements Span {
4141
private static final String TRACE_STATE_KEY = "traceState";
4242
private static final String PARENT_SPAN_ID_KEY = "parentSpanId";
4343
private static final String NAME_KEY = "name";
44+
private static final String FLAGS_KEY = "flags";
4445
private static final String KIND_KEY = "kind";
4546
private static final String START_TIME_KEY = "startTime";
4647
private static final String END_TIME_KEY = "endTime";
@@ -50,10 +51,10 @@ public class JacksonSpan extends JacksonEvent implements Span {
5051
private static final String DROPPED_EVENTS_COUNT_KEY = "droppedEventsCount";
5152
private static final String LINKS_KEY = "links";
5253
private static final String DROPPED_LINKS_COUNT_KEY = "droppedLinksCount";
53-
private static final String SERVICE_NAME_KEY = "serviceName";
54-
private static final String TRACE_GROUP_KEY = "traceGroup";
54+
public static final String SERVICE_NAME_KEY = "serviceName";
55+
public static final String TRACE_GROUP_KEY = "traceGroup";
5556
private static final String DURATION_IN_NANOS_KEY = "durationInNanos";
56-
private static final String TRACE_GROUP_FIELDS_KEY = "traceGroupFields";
57+
public static final String TRACE_GROUP_FIELDS_KEY = "traceGroupFields";
5758

5859
private static final List<String> REQUIRED_KEYS = Arrays.asList(TRACE_GROUP_KEY);
5960
protected static final List<String>
@@ -120,6 +121,11 @@ public String getKind() {
120121
return this.get(KIND_KEY, String.class);
121122
}
122123

124+
@Override
125+
public Integer getFlags() {
126+
return this.get(FLAGS_KEY, Integer.class);
127+
}
128+
123129
@Override
124130
public Map<String, Object> getScope() {
125131
return this.get(SCOPE_KEY, Map.class);
@@ -177,24 +183,42 @@ public Integer getDroppedLinksCount() {
177183

178184
@Override
179185
public String getTraceGroup() {
186+
EventMetadata metadata = getMetadata();
187+
Object traceGroup = metadata.getAttribute(TRACE_GROUP_KEY);
188+
if (traceGroup != null)
189+
return (String)traceGroup;
180190
return this.get(TRACE_GROUP_KEY, String.class);
181191
}
182192

193+
183194
@Override
184195
public Long getDurationInNanos() {
185196
return this.get(DURATION_IN_NANOS_KEY, Long.class);
186197
}
187198

188199
@Override
189200
public TraceGroupFields getTraceGroupFields() {
201+
EventMetadata metadata = getMetadata();
202+
Object traceGroupFields = metadata.getAttribute(TRACE_GROUP_FIELDS_KEY);
203+
if (traceGroupFields != null)
204+
return (TraceGroupFields)traceGroupFields;
190205
return this.get(TRACE_GROUP_FIELDS_KEY, DefaultTraceGroupFields.class);
191206
}
192207

193208
@Override
194209
public String getServiceName() {
210+
EventMetadata metadata = getMetadata();
211+
Object serviceName = metadata.getAttribute(SERVICE_NAME_KEY);
212+
if (serviceName != null)
213+
return (String)serviceName;
195214
return this.get(SERVICE_NAME_KEY, String.class);
196215
}
197216

217+
@Override
218+
public void setServiceName(final String serviceName) {
219+
this.put(SERVICE_NAME_KEY, serviceName);
220+
}
221+
198222
@Override
199223
public void setTraceGroup(final String traceGroup) {
200224
this.put(TRACE_GROUP_KEY, traceGroup);
@@ -369,6 +393,18 @@ public Builder withKind(final String kind) {
369393
return this;
370394
}
371395

396+
/**
397+
* Sets the flags of span
398+
*
399+
* @param flags flags
400+
* @return returns the builder
401+
* @since 2.11
402+
*/
403+
public Builder withFlags(final Integer flags) {
404+
data.put(FLAGS_KEY, flags);
405+
return this;
406+
}
407+
372408
/**
373409
* Sets the status of the log event
374410
*

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/Span.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public interface Span extends Event {
5757
*/
5858
String getKind();
5959

60+
/**
61+
* Gets the span's flags
62+
* @return the flags
63+
*/
64+
Integer getFlags();
65+
6066
/**
6167
* Gets ISO8601 representation of the start time.
6268
* @return the start time
@@ -154,6 +160,13 @@ public interface Span extends Event {
154160
*/
155161
void setTraceGroupFields(TraceGroupFields traceGroupFields);
156162

163+
/**
164+
* Sets the service name for this span.
165+
* @param serviceName service name
166+
* @since 2.11
167+
*/
168+
void setServiceName(final String serviceName);
169+
157170
/**
158171
* Gets the scope of this log event.
159172
*

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class JacksonSpanTest {
4747
protected static final String TEST_PARENT_SPAN_ID = UUID.randomUUID().toString();
4848
protected static final String TEST_NAME = UUID.randomUUID().toString();
4949
protected static final String TEST_KIND = UUID.randomUUID().toString();
50+
protected static final int TEST_FLAGS = 10;
5051
protected static final String TEST_START_TIME = UUID.randomUUID().toString();
5152
protected static final String TEST_END_TIME = UUID.randomUUID().toString();
5253
private static final Map<String, Object> TEST_ATTRIBUTES = ImmutableMap.of("key1", new Date().getTime(), "key2", UUID.randomUUID().toString());
@@ -100,6 +101,7 @@ public void setup() {
100101
.withName(TEST_NAME)
101102
.withServiceName(TEST_SERVICE_NAME)
102103
.withKind(TEST_KIND)
104+
.withFlags(TEST_FLAGS)
103105
.withScope(TEST_SCOPE)
104106
.withResource(TEST_RESOURCE)
105107
.withStatus(TEST_STATUS)
@@ -160,6 +162,12 @@ public void testGetKind() {
160162
assertThat(kind, is(equalTo(TEST_KIND)));
161163
}
162164

165+
@Test
166+
public void testGetFlags() {
167+
final Integer flags = jacksonSpan.getFlags();
168+
assertThat(flags, is(equalTo(TEST_FLAGS)));
169+
}
170+
163171
@Test
164172
public void testGetStartTime() {
165173
final String GetStartTime = jacksonSpan.getStartTime();
@@ -250,6 +258,16 @@ public void testGetTraceGroupFields() {
250258
assertThat(traceGroupFields, is(equalTo(traceGroupFields)));
251259
}
252260

261+
@Test
262+
public void testSetAndGetServiceName() {
263+
String serviceName = jacksonSpan.getServiceName();
264+
assertThat(serviceName, is(equalTo(TEST_SERVICE_NAME)));
265+
final String testServiceName = "testServiceName";
266+
jacksonSpan.setServiceName(testServiceName);
267+
serviceName = jacksonSpan.getServiceName();
268+
assertThat(serviceName, is(equalTo(testServiceName)));
269+
}
270+
253271
@Test
254272
public void testSetAndGetTraceGroup() {
255273
final String testTraceGroup = "testTraceGroup";

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonStandardSpanTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.model.trace;
77

8+
import org.opensearch.dataprepper.model.event.EventMetadata;
89
import com.fasterxml.jackson.core.JsonProcessingException;
910
import io.micrometer.core.instrument.util.IOUtils;
1011
import com.fasterxml.jackson.core.type.TypeReference;
@@ -19,6 +20,7 @@
1920
import org.json.JSONException;
2021
import org.skyscreamer.jsonassert.JSONAssert;
2122

23+
import java.time.Instant;
2224
import java.util.Arrays;
2325
import java.util.Map;
2426
import java.util.List;
@@ -86,6 +88,60 @@ public JacksonSpan createObjectUnderTest(Map<String, Object> attributes) {
8688
.build();
8789
}
8890

91+
@Test
92+
@Override
93+
public void testGetTraceGroup() {
94+
jacksonSpan = createObjectUnderTest(TEST_ATTRIBUTES);
95+
EventMetadata metadata = jacksonSpan.getMetadata();
96+
String testTraceGroup = UUID.randomUUID().toString();
97+
metadata.setAttribute(JacksonSpan.TRACE_GROUP_KEY, testTraceGroup);
98+
final String traceGroup = jacksonSpan.getTraceGroup();
99+
assertThat(traceGroup, is(equalTo(testTraceGroup)));
100+
}
101+
102+
@Test
103+
@Override
104+
public void testGetServiceName() {
105+
jacksonSpan = createObjectUnderTest(TEST_ATTRIBUTES);
106+
EventMetadata metadata = jacksonSpan.getMetadata();
107+
String testServiceName = UUID.randomUUID().toString();
108+
metadata.setAttribute(JacksonSpan.SERVICE_NAME_KEY, testServiceName);
109+
final String serviceName = jacksonSpan.getServiceName();
110+
assertThat(serviceName, is(equalTo(testServiceName)));
111+
}
112+
113+
@Test
114+
@Override
115+
public void testSetAndGetServiceName() {
116+
jacksonSpan = createObjectUnderTest(TEST_ATTRIBUTES);
117+
String serviceName = jacksonSpan.getServiceName();
118+
assertThat(serviceName, is(equalTo("testService")));
119+
120+
EventMetadata metadata = jacksonSpan.getMetadata();
121+
String testServiceName = UUID.randomUUID().toString();
122+
metadata.setAttribute(JacksonSpan.SERVICE_NAME_KEY, testServiceName);
123+
jacksonSpan.setServiceName(jacksonSpan.getServiceName());
124+
serviceName = jacksonSpan.getServiceName();
125+
assertThat(serviceName, is(equalTo(testServiceName)));
126+
}
127+
128+
129+
@Test
130+
@Override
131+
public void testGetTraceGroupFields() {
132+
jacksonSpan = createObjectUnderTest(TEST_ATTRIBUTES);
133+
DefaultTraceGroupFields testTraceGroupFields =
134+
DefaultTraceGroupFields.builder()
135+
.withDurationInNanos(10000L)
136+
.withEndTime(Instant.now().toString())
137+
.withStatusCode(10)
138+
.build();
139+
EventMetadata metadata = jacksonSpan.getMetadata();
140+
metadata.setAttribute(JacksonSpan.TRACE_GROUP_FIELDS_KEY, testTraceGroupFields);
141+
final TraceGroupFields traceGroupFields = jacksonSpan.getTraceGroupFields();
142+
assertThat(traceGroupFields, is(equalTo(testTraceGroupFields)));
143+
}
144+
89145
@Test
90146
@Override
91147
public void testToJsonStringAllParameters() throws JsonProcessingException, JSONException {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private IndexConfiguration(final Builder builder) {
164164

165165
String documentIdField = builder.documentIdField;
166166
String documentId = builder.documentId;
167-
if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW)) {
167+
if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW) || indexType.equals(IndexType.TRACE_ANALYTICS_RAW_STANDARD)) {
168168
documentId = "${spanId}";
169169
} else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) {
170170
documentId = "${hashId}";
@@ -430,12 +430,18 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
430430
InputStream s3TemplateFile = null;
431431
if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW)) {
432432
templateURL = loadExistingTemplate(templateType, IndexConstants.RAW_DEFAULT_TEMPLATE_FILE);
433+
} else if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW_STANDARD)) {
434+
templateURL = loadExistingTemplate(templateType, IndexConstants.RAW_STANDARD_TEMPLATE_FILE);
433435
} else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) {
434436
templateURL = loadExistingTemplate(templateType, IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE);
435437
} else if (indexType.equals(IndexType.LOG_ANALYTICS)) {
436438
templateURL = loadExistingTemplate(templateType, IndexConstants.LOGS_DEFAULT_TEMPLATE_FILE);
439+
} else if (indexType.equals(IndexType.LOG_ANALYTICS_STANDARD)) {
440+
templateURL = loadExistingTemplate(templateType, IndexConstants.LOGS_STANDARD_TEMPLATE_FILE);
437441
} else if (indexType.equals(IndexType.METRIC_ANALYTICS)) {
438442
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
443+
} else if (indexType.equals(IndexType.METRIC_ANALYTICS_STANDARD)) {
444+
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_STANDARD_TEMPLATE_FILE);
439445
} else if (templateFile != null) {
440446
if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
441447
FileReader s3FileReader = new S3FileReader(s3Client);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@ public class IndexConstants {
1515
public static final Map<IndexType, String> TYPE_TO_DEFAULT_ALIAS = new HashMap<>();
1616
// TODO: extract out version number into version enum
1717
public static final String RAW_DEFAULT_TEMPLATE_FILE = "otel-v1-apm-span-index-template.json";
18+
public static final String RAW_STANDARD_TEMPLATE_FILE = "otel-v1-apm-span-index-standard-template.json";
1819
public static final String RAW_ISM_POLICY = "raw-span-policy";
1920
public static final String RAW_ISM_FILE_NO_ISM_TEMPLATE = "raw-span-policy-no-ism-template.json";
2021
public static final String RAW_ISM_FILE_WITH_ISM_TEMPLATE = "raw-span-policy-with-ism-template.json";
2122

2223
public static final String LOGS_DEFAULT_TEMPLATE_FILE = "logs-otel-v1-index-template.json";
24+
public static final String LOGS_STANDARD_TEMPLATE_FILE = "logs-otel-v1-index-standard-template.json";
2325
public static final String LOGS_ISM_POLICY = "logs-policy";
2426
public static final String LOGS_ISM_FILE_NO_ISM_TEMPLATE = "logs-policy-no-ism-template.json";
2527
public static final String LOGS_ISM_FILE_WITH_ISM_TEMPLATE = "logs-policy-with-ism-template.json";
2628

2729
public static final String METRICS_DEFAULT_TEMPLATE_FILE = "metrics-otel-v1-index-template.json";
30+
public static final String METRICS_STANDARD_TEMPLATE_FILE = "metrics-otel-v1-index-standard-template.json";
2831
public static final String METRICS_ISM_POLICY = "metrics-policy";
2932
public static final String METRICS_ISM_FILE_NO_ISM_TEMPLATE = "metrics-policy-no-ism-template.json";
3033
public static final String METRICS_ISM_FILE_WITH_ISM_TEMPLATE = "metrics-policy-with-ism-template.json";
@@ -39,7 +42,10 @@ public class IndexConstants {
3942
// TODO: extract out version number into version enum
4043
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
4144
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
45+
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW_STANDARD, "otel-v1-apm-span");
4246
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS, "logs-otel-v1");
47+
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS_STANDARD, "logs-otel-v1");
4348
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
49+
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS_STANDARD, "metrics-otel-v1");
4450
}
4551
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public final IndexManager getIndexManager(final IndexType indexType,
5151
IndexManager indexManager;
5252
switch (indexType) {
5353
case TRACE_ANALYTICS_RAW:
54+
case TRACE_ANALYTICS_RAW_STANDARD:
5455
indexManager = new TraceAnalyticsRawIndexManager(
5556
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
5657
break;
@@ -59,10 +60,12 @@ public final IndexManager getIndexManager(final IndexType indexType,
5960
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
6061
break;
6162
case LOG_ANALYTICS:
63+
case LOG_ANALYTICS_STANDARD:
6264
indexManager = new LogAnalyticsIndexManager(
6365
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
6466
break;
6567
case METRIC_ANALYTICS:
68+
case METRIC_ANALYTICS_STANDARD:
6669
indexManager = new MetricAnalyticsIndexManager(
6770
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
6871
break;

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313

1414
public enum IndexType {
1515
TRACE_ANALYTICS_RAW("trace-analytics-raw"),
16+
TRACE_ANALYTICS_RAW_STANDARD("trace-analytics-standard-raw"),
1617
TRACE_ANALYTICS_SERVICE_MAP("trace-analytics-service-map"),
1718
LOG_ANALYTICS("log-analytics"),
19+
LOG_ANALYTICS_STANDARD("log-analytics-standard"),
1820
METRIC_ANALYTICS("metric-analytics"),
21+
METRIC_ANALYTICS_STANDARD("metric-analytics-standard"),
1922
CUSTOM("custom"),
2023
MANAGEMENT_DISABLED("management_disabled");
2124

0 commit comments

Comments
 (0)