Skip to content

Commit 87be60a

Browse files
authored
Support for otel_traces codec to create Span Events from OTEL Traces (#6843)
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent d237e45 commit 87be60a

11 files changed

Lines changed: 625 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonValue;
15+
16+
import java.util.Arrays;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
public enum OTelTracesFormatOption {
21+
JSON("json"),
22+
PROTOBUF("protobuf");
23+
24+
private static final Map<String, OTelTracesFormatOption> NAMES_MAP = Arrays.stream(OTelTracesFormatOption.values())
25+
.collect(Collectors.toMap(
26+
value -> value.optionName,
27+
value -> value
28+
));
29+
30+
private final String optionName;
31+
32+
OTelTracesFormatOption(final String optionName) {
33+
this.optionName = optionName;
34+
}
35+
36+
@JsonValue
37+
public String getFormatName() {
38+
return optionName;
39+
}
40+
41+
@JsonCreator
42+
public static OTelTracesFormatOption fromFormatName(final String optionName) {
43+
return NAMES_MAP.get(optionName);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
15+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
16+
import org.opensearch.dataprepper.model.codec.InputCodec;
17+
import org.opensearch.dataprepper.model.event.Event;
18+
import org.opensearch.dataprepper.model.record.Record;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.util.Objects;
23+
import java.util.function.Consumer;
24+
25+
@DataPrepperPlugin(name = "otel_traces", pluginType = InputCodec.class, pluginConfigurationType = OTelTracesInputCodecConfig.class)
26+
public class OTelTracesInputCodec implements InputCodec {
27+
28+
private final ByteDecoder decoder;
29+
30+
@DataPrepperPluginConstructor
31+
public OTelTracesInputCodec(final OTelTracesInputCodecConfig config) {
32+
Objects.requireNonNull(config);
33+
OTelTracesFormatOption format = config.getFormat();
34+
OTelOutputFormat otelFormat = config.getOTelOutputFormat();
35+
36+
if (format == OTelTracesFormatOption.JSON) {
37+
decoder = new OTelTracesJsonDecoder(otelFormat);
38+
} else if (format == OTelTracesFormatOption.PROTOBUF) {
39+
decoder = new OTelTracesProtoBufDecoder(otelFormat, config.getLengthPrefixedEncoding());
40+
} else {
41+
throw new RuntimeException("The format " + config.getFormat() + " is not supported.");
42+
}
43+
}
44+
45+
@Override
46+
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
47+
decoder.parse(inputStream, null, eventConsumer);
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import com.fasterxml.jackson.annotation.JsonClassDescription;
14+
import com.fasterxml.jackson.annotation.JsonProperty;
15+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
16+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
17+
import jakarta.validation.constraints.AssertTrue;
18+
import jakarta.validation.constraints.NotNull;
19+
20+
@JsonPropertyOrder
21+
@JsonClassDescription("The `otel_traces` codec parses trace files that follow the OpenTelemetry Protocol Specification. "
22+
+ "It creates a Data Prepper Span event for each span record along with the resource attributes in the file.")
23+
public class OTelTracesInputCodecConfig {
24+
25+
static final OTelTracesFormatOption DEFAULT_FORMAT = OTelTracesFormatOption.JSON;
26+
static final OTelOutputFormat DEFAULT_OTEL_FORMAT = OTelOutputFormat.OPENSEARCH;
27+
28+
@JsonProperty(value = "format", defaultValue = "json")
29+
@JsonPropertyDescription("Specifies the format of the OTel traces. Valid options are 'json' and 'protobuf'.")
30+
@NotNull
31+
private OTelTracesFormatOption format = DEFAULT_FORMAT;
32+
33+
@JsonProperty(value = "otel_format", defaultValue = "opensearch")
34+
@JsonPropertyDescription("Specifies the output format of the decoded spans.")
35+
@NotNull
36+
private OTelOutputFormat otelFormat = DEFAULT_OTEL_FORMAT;
37+
38+
@JsonProperty(value = "length_prefixed_encoding", defaultValue = "false")
39+
@JsonPropertyDescription("Specifies if the length precedes the data in protobuf format.")
40+
private boolean lengthPrefixedEncoding;
41+
42+
public OTelTracesFormatOption getFormat() {
43+
return format;
44+
}
45+
46+
public OTelOutputFormat getOTelOutputFormat() {
47+
return otelFormat;
48+
}
49+
50+
@AssertTrue(message = "Not a valid format.")
51+
boolean isValidFormat() {
52+
return format != null;
53+
}
54+
55+
public boolean getLengthPrefixedEncoding() {
56+
return lengthPrefixedEncoding;
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import com.google.protobuf.util.JsonFormat;
14+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
15+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.record.Record;
18+
import org.opensearch.dataprepper.model.trace.Span;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.InputStreamReader;
23+
import java.io.Reader;
24+
import java.time.Instant;
25+
import java.util.List;
26+
import java.util.function.Consumer;
27+
28+
public class OTelTracesJsonDecoder implements ByteDecoder {
29+
30+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
31+
32+
public OTelTracesJsonDecoder(OTelOutputFormat otelOutputFormat) {
33+
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
34+
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
35+
: new OTelProtoStandardCodec.OTelProtoDecoder();
36+
}
37+
38+
@Override
39+
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
40+
Reader reader = new InputStreamReader(inputStream);
41+
ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder();
42+
JsonFormat.parser().merge(reader, builder);
43+
ExportTraceServiceRequest request = builder.build();
44+
45+
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
46+
for (Span span : spans) {
47+
eventConsumer.accept(new Record<>(span));
48+
}
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
14+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
15+
import org.opensearch.dataprepper.model.event.Event;
16+
import org.opensearch.dataprepper.model.record.Record;
17+
import org.opensearch.dataprepper.model.trace.Span;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.nio.ByteBuffer;
24+
import java.time.Instant;
25+
import java.util.List;
26+
import java.util.function.Consumer;
27+
28+
public class OTelTracesProtoBufDecoder implements ByteDecoder {
29+
30+
private static final Logger LOG = LoggerFactory.getLogger(OTelTracesProtoBufDecoder.class);
31+
private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024);
32+
33+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
34+
private final boolean lengthPrefixedEncoding;
35+
36+
public OTelTracesProtoBufDecoder(OTelOutputFormat otelOutputFormat, boolean lengthPrefixedEncoding) {
37+
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
38+
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
39+
: new OTelProtoStandardCodec.OTelProtoDecoder();
40+
this.lengthPrefixedEncoding = lengthPrefixedEncoding;
41+
}
42+
43+
private void parseRequest(final ExportTraceServiceRequest request, final Instant timeReceivedMs,
44+
Consumer<Record<Event>> eventConsumer) {
45+
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
46+
for (Span span : spans) {
47+
eventConsumer.accept(new Record<>(span));
48+
}
49+
}
50+
51+
@Override
52+
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
53+
if (!lengthPrefixedEncoding) {
54+
int available = inputStream.available();
55+
if (available > MAX_REQUEST_LEN) {
56+
throw new IllegalArgumentException(
57+
String.format("Buffer length %d exceeds max allowed buffer length of %d", available, MAX_REQUEST_LEN));
58+
}
59+
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream);
60+
parseRequest(request, timeReceivedMs, eventConsumer);
61+
return;
62+
}
63+
64+
byte[] lenBytes = new byte[4];
65+
while (inputStream.read(lenBytes, 0, 4) == 4) {
66+
ByteBuffer lengthBuffer = ByteBuffer.wrap(lenBytes);
67+
int len = lengthBuffer.getInt();
68+
if (len > MAX_REQUEST_LEN) {
69+
throw new IllegalArgumentException(
70+
String.format("Buffer length %d exceeds max allowed buffer length of %d", len, MAX_REQUEST_LEN));
71+
}
72+
byte[] buffer = new byte[len];
73+
if (inputStream.read(buffer, 0, len) != len) {
74+
LOG.warn("Failed to read {} bytes", len);
75+
continue;
76+
}
77+
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(buffer);
78+
parseRequest(request, timeReceivedMs, eventConsumer);
79+
}
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.EnumSource;
16+
17+
import static org.hamcrest.CoreMatchers.equalTo;
18+
import static org.hamcrest.CoreMatchers.nullValue;
19+
import static org.hamcrest.MatcherAssert.assertThat;
20+
21+
class OTelTracesFormatOptionTest {
22+
23+
@ParameterizedTest
24+
@EnumSource(OTelTracesFormatOption.class)
25+
void fromFormatName_returns_correct_value(OTelTracesFormatOption option) {
26+
assertThat(OTelTracesFormatOption.fromFormatName(option.getFormatName()), equalTo(option));
27+
}
28+
29+
@Test
30+
void fromFormatName_returns_null_for_unknown() {
31+
assertThat(OTelTracesFormatOption.fromFormatName("unknown"), nullValue());
32+
}
33+
34+
@Test
35+
void json_has_correct_name() {
36+
assertThat(OTelTracesFormatOption.JSON.getFormatName(), equalTo("json"));
37+
}
38+
39+
@Test
40+
void protobuf_has_correct_name() {
41+
assertThat(OTelTracesFormatOption.PROTOBUF.getFormatName(), equalTo("protobuf"));
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.codec;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
import static org.hamcrest.CoreMatchers.equalTo;
16+
import static org.hamcrest.MatcherAssert.assertThat;
17+
18+
class OTelTracesInputCodecConfigTest {
19+
20+
@Test
21+
void default_format_is_json() {
22+
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
23+
assertThat(config.getFormat(), equalTo(OTelTracesFormatOption.JSON));
24+
}
25+
26+
@Test
27+
void default_otel_format_is_opensearch() {
28+
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
29+
assertThat(config.getOTelOutputFormat(), equalTo(OTelOutputFormat.OPENSEARCH));
30+
}
31+
32+
@Test
33+
void default_length_prefixed_encoding_is_false() {
34+
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
35+
assertThat(config.getLengthPrefixedEncoding(), equalTo(false));
36+
}
37+
38+
@Test
39+
void isValidFormat_returns_true_with_default() {
40+
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
41+
assertThat(config.isValidFormat(), equalTo(true));
42+
}
43+
}

0 commit comments

Comments
 (0)