Skip to content

Commit b7fc0d4

Browse files
committed
Support for otel_traces codec to create Span Events from OTEL Traces
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 506a04f commit b7fc0d4

14 files changed

Lines changed: 645 additions & 4 deletions

data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
21
/*
3-
* * Copyright OpenSearch Contributors
4-
* * SPDX-License-Identifier: Apache-2.0
5-
* */
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
610

711
package org.opensearch.dataprepper.plugins.otel.codec;
812

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonValue;
15+
16+
import java.util.Arrays;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
public enum OTelTraceFormatOption {
21+
JSON("json"),
22+
PROTOBUF("protobuf");
23+
24+
private static final Map<String, OTelTraceFormatOption> NAMES_MAP = Arrays.stream(OTelTraceFormatOption.values())
25+
.collect(Collectors.toMap(
26+
value -> value.optionName,
27+
value -> value
28+
));
29+
30+
private final String optionName;
31+
32+
OTelTraceFormatOption(final String optionName) {
33+
this.optionName = optionName;
34+
}
35+
36+
@JsonValue
37+
public String getFormatName() {
38+
return optionName;
39+
}
40+
41+
@JsonCreator
42+
public static OTelTraceFormatOption fromFormatName(final String optionName) {
43+
return NAMES_MAP.get(optionName);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
15+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
16+
import org.opensearch.dataprepper.model.codec.InputCodec;
17+
import org.opensearch.dataprepper.model.event.Event;
18+
import org.opensearch.dataprepper.model.record.Record;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.util.Objects;
23+
import java.util.function.Consumer;
24+
25+
@DataPrepperPlugin(name = "otel_traces", pluginType = InputCodec.class, pluginConfigurationType = OTelTraceInputCodecConfig.class)
26+
public class OTelTraceInputCodec implements InputCodec {
27+
28+
private final ByteDecoder decoder;
29+
30+
@DataPrepperPluginConstructor
31+
public OTelTraceInputCodec(final OTelTraceInputCodecConfig config) {
32+
Objects.requireNonNull(config);
33+
OTelTraceFormatOption format = config.getFormat();
34+
OTelOutputFormat otelFormat = config.getOTelOutputFormat();
35+
36+
if (format == OTelTraceFormatOption.JSON) {
37+
decoder = new OTelTraceJsonDecoder(otelFormat);
38+
} else if (format == OTelTraceFormatOption.PROTOBUF) {
39+
decoder = new OTelTraceProtoBufDecoder(otelFormat, config.getLengthPrefixedEncoding());
40+
} else {
41+
throw new RuntimeException("The format " + config.getFormat() + " is not supported.");
42+
}
43+
}
44+
45+
@Override
46+
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
47+
decoder.parse(inputStream, null, eventConsumer);
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import com.fasterxml.jackson.annotation.JsonClassDescription;
14+
import com.fasterxml.jackson.annotation.JsonProperty;
15+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
16+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
17+
import jakarta.validation.constraints.AssertTrue;
18+
import jakarta.validation.constraints.NotNull;
19+
20+
@JsonPropertyOrder
21+
@JsonClassDescription("The `otel_traces` codec parses trace files that follow the OpenTelemetry Protocol Specification. "
22+
+ "It creates a Data Prepper Span event for each span record along with the resource attributes in the file.")
23+
public class OTelTraceInputCodecConfig {
24+
25+
static final OTelTraceFormatOption DEFAULT_FORMAT = OTelTraceFormatOption.JSON;
26+
static final OTelOutputFormat DEFAULT_OTEL_FORMAT = OTelOutputFormat.OPENSEARCH;
27+
28+
@JsonProperty(value = "format", defaultValue = "json")
29+
@JsonPropertyDescription("Specifies the format of the OTel traces. Valid options are 'json' and 'protobuf'.")
30+
@NotNull
31+
private OTelTraceFormatOption format = DEFAULT_FORMAT;
32+
33+
@JsonProperty(value = "otel_format", defaultValue = "opensearch")
34+
@JsonPropertyDescription("Specifies the output format of the decoded spans.")
35+
@NotNull
36+
private OTelOutputFormat otelFormat = DEFAULT_OTEL_FORMAT;
37+
38+
@JsonProperty(value = "length_prefixed_encoding", defaultValue = "false")
39+
@JsonPropertyDescription("Specifies if the length precedes the data in protobuf format.")
40+
private boolean lengthPrefixedEncoding;
41+
42+
public OTelTraceFormatOption getFormat() {
43+
return format;
44+
}
45+
46+
public OTelOutputFormat getOTelOutputFormat() {
47+
return otelFormat;
48+
}
49+
50+
@AssertTrue(message = "Not a valid format.")
51+
boolean isValidFormat() {
52+
return format != null;
53+
}
54+
55+
public boolean getLengthPrefixedEncoding() {
56+
return lengthPrefixedEncoding;
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import com.google.protobuf.util.JsonFormat;
14+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
15+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.record.Record;
18+
import org.opensearch.dataprepper.model.trace.Span;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.InputStreamReader;
23+
import java.io.Reader;
24+
import java.time.Instant;
25+
import java.util.List;
26+
import java.util.function.Consumer;
27+
28+
public class OTelTraceJsonDecoder implements ByteDecoder {
29+
30+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
31+
32+
public OTelTraceJsonDecoder(OTelOutputFormat otelOutputFormat) {
33+
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
34+
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
35+
: new OTelProtoStandardCodec.OTelProtoDecoder();
36+
}
37+
38+
@Override
39+
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
40+
Reader reader = new InputStreamReader(inputStream);
41+
ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder();
42+
JsonFormat.parser().merge(reader, builder);
43+
ExportTraceServiceRequest request = builder.build();
44+
45+
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
46+
for (Span span : spans) {
47+
eventConsumer.accept(new Record<>(span));
48+
}
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
14+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
15+
import org.opensearch.dataprepper.model.event.Event;
16+
import org.opensearch.dataprepper.model.record.Record;
17+
import org.opensearch.dataprepper.model.trace.Span;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.nio.ByteBuffer;
24+
import java.time.Instant;
25+
import java.util.List;
26+
import java.util.function.Consumer;
27+
28+
public class OTelTraceProtoBufDecoder implements ByteDecoder {
29+
30+
private static final Logger LOG = LoggerFactory.getLogger(OTelTraceProtoBufDecoder.class);
31+
private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024);
32+
33+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
34+
private final boolean lengthPrefixedEncoding;
35+
36+
public OTelTraceProtoBufDecoder(OTelOutputFormat otelOutputFormat, boolean lengthPrefixedEncoding) {
37+
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
38+
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
39+
: new OTelProtoStandardCodec.OTelProtoDecoder();
40+
this.lengthPrefixedEncoding = lengthPrefixedEncoding;
41+
}
42+
43+
private void parseRequest(final ExportTraceServiceRequest request, final Instant timeReceivedMs,
44+
Consumer<Record<Event>> eventConsumer) {
45+
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
46+
for (Span span : spans) {
47+
eventConsumer.accept(new Record<>(span));
48+
}
49+
}
50+
51+
@Override
52+
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
53+
if (!lengthPrefixedEncoding) {
54+
int available = inputStream.available();
55+
if (available > MAX_REQUEST_LEN) {
56+
throw new IllegalArgumentException(
57+
String.format("Buffer length %d exceeds max allowed buffer length of %d", available, MAX_REQUEST_LEN));
58+
}
59+
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream);
60+
parseRequest(request, timeReceivedMs, eventConsumer);
61+
return;
62+
}
63+
64+
byte[] lenBytes = new byte[4];
65+
while (inputStream.read(lenBytes, 0, 4) == 4) {
66+
ByteBuffer lengthBuffer = ByteBuffer.wrap(lenBytes);
67+
int len = lengthBuffer.getInt();
68+
if (len > MAX_REQUEST_LEN) {
69+
throw new IllegalArgumentException(
70+
String.format("Buffer length %d exceeds max allowed buffer length of %d", len, MAX_REQUEST_LEN));
71+
}
72+
byte[] buffer = new byte[len];
73+
if (inputStream.read(buffer, 0, len) != len) {
74+
LOG.warn("Failed to read {} bytes", len);
75+
continue;
76+
}
77+
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(buffer);
78+
parseRequest(request, timeReceivedMs, eventConsumer);
79+
}
80+
}
81+
}

data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugins.otel.codec;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.EnumSource;
16+
17+
import static org.hamcrest.CoreMatchers.equalTo;
18+
import static org.hamcrest.CoreMatchers.notNullValue;
19+
import static org.hamcrest.CoreMatchers.nullValue;
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
22+
class OTelTraceFormatOptionTest {
23+
24+
@ParameterizedTest
25+
@EnumSource(OTelTraceFormatOption.class)
26+
void fromFormatName_returns_correct_value(OTelTraceFormatOption option) {
27+
assertThat(OTelTraceFormatOption.fromFormatName(option.getFormatName()), equalTo(option));
28+
}
29+
30+
@Test
31+
void fromFormatName_returns_null_for_unknown() {
32+
assertThat(OTelTraceFormatOption.fromFormatName("unknown"), nullValue());
33+
}
34+
35+
@Test
36+
void json_has_correct_name() {
37+
assertThat(OTelTraceFormatOption.JSON.getFormatName(), equalTo("json"));
38+
}
39+
40+
@Test
41+
void protobuf_has_correct_name() {
42+
assertThat(OTelTraceFormatOption.PROTOBUF.getFormatName(), equalTo("protobuf"));
43+
}
44+
}

0 commit comments

Comments
 (0)