Skip to content

Commit 566162e

Browse files
committed
Support for otel_traces codec to create Span Events from OTEL Traces
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 506a04f commit 566162e

12 files changed

Lines changed: 582 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import com.fasterxml.jackson.annotation.JsonValue;
10+
11+
import java.util.Arrays;
12+
import java.util.Map;
13+
import java.util.stream.Collectors;
14+
15+
public enum OTelTraceFormatOption {
16+
JSON("json"),
17+
PROTOBUF("protobuf");
18+
19+
private static final Map<String, OTelTraceFormatOption> NAMES_MAP = Arrays.stream(OTelTraceFormatOption.values())
20+
.collect(Collectors.toMap(
21+
value -> value.optionName,
22+
value -> value
23+
));
24+
25+
private final String optionName;
26+
27+
OTelTraceFormatOption(final String optionName) {
28+
this.optionName = optionName;
29+
}
30+
31+
@JsonValue
32+
public String getFormatName() {
33+
return optionName;
34+
}
35+
36+
@JsonCreator
37+
public static OTelTraceFormatOption fromFormatName(final String optionName) {
38+
return NAMES_MAP.get(optionName);
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
9+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
10+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
11+
import org.opensearch.dataprepper.model.codec.InputCodec;
12+
import org.opensearch.dataprepper.model.event.Event;
13+
import org.opensearch.dataprepper.model.record.Record;
14+
15+
import java.io.IOException;
16+
import java.io.InputStream;
17+
import java.util.Objects;
18+
import java.util.function.Consumer;
19+
20+
@DataPrepperPlugin(name = "otel_traces", pluginType = InputCodec.class, pluginConfigurationType = OTelTraceInputCodecConfig.class)
21+
public class OTelTraceInputCodec implements InputCodec {
22+
23+
private final ByteDecoder decoder;
24+
25+
@DataPrepperPluginConstructor
26+
public OTelTraceInputCodec(final OTelTraceInputCodecConfig config) {
27+
Objects.requireNonNull(config);
28+
OTelTraceFormatOption format = config.getFormat();
29+
OTelOutputFormat otelFormat = config.getOTelOutputFormat();
30+
31+
if (format == OTelTraceFormatOption.JSON) {
32+
decoder = new OTelTraceJsonDecoder(otelFormat);
33+
} else if (format == OTelTraceFormatOption.PROTOBUF) {
34+
decoder = new OTelTraceProtoBufDecoder(otelFormat, config.getLengthPrefixedEncoding());
35+
} else {
36+
throw new RuntimeException("The format " + config.getFormat() + " is not supported.");
37+
}
38+
}
39+
40+
@Override
41+
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
42+
decoder.parse(inputStream, null, eventConsumer);
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import com.fasterxml.jackson.annotation.JsonClassDescription;
9+
import com.fasterxml.jackson.annotation.JsonProperty;
10+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
11+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
12+
import jakarta.validation.constraints.AssertTrue;
13+
import jakarta.validation.constraints.NotNull;
14+
15+
@JsonPropertyOrder
16+
@JsonClassDescription("The `otel_traces` codec parses trace files that follow the OpenTelemetry Protocol Specification. "
17+
+ "It creates a Data Prepper Span event for each span record along with the resource attributes in the file.")
18+
public class OTelTraceInputCodecConfig {
19+
20+
static final OTelTraceFormatOption DEFAULT_FORMAT = OTelTraceFormatOption.JSON;
21+
static final OTelOutputFormat DEFAULT_OTEL_FORMAT = OTelOutputFormat.OPENSEARCH;
22+
23+
@JsonProperty(value = "format", defaultValue = "json")
24+
@JsonPropertyDescription("Specifies the format of the OTel traces. Valid options are 'json' and 'protobuf'.")
25+
@NotNull
26+
private OTelTraceFormatOption format = DEFAULT_FORMAT;
27+
28+
@JsonProperty(value = "otel_format", defaultValue = "opensearch")
29+
@JsonPropertyDescription("Specifies the output format of the decoded spans.")
30+
@NotNull
31+
private OTelOutputFormat otelFormat = DEFAULT_OTEL_FORMAT;
32+
33+
@JsonProperty(value = "length_prefixed_encoding", defaultValue = "false")
34+
@JsonPropertyDescription("Specifies if the length precedes the data in protobuf format.")
35+
private boolean lengthPrefixedEncoding;
36+
37+
public OTelTraceFormatOption getFormat() {
38+
return format;
39+
}
40+
41+
public OTelOutputFormat getOTelOutputFormat() {
42+
return otelFormat;
43+
}
44+
45+
@AssertTrue(message = "Not a valid format.")
46+
boolean isValidFormat() {
47+
return format != null;
48+
}
49+
50+
public boolean getLengthPrefixedEncoding() {
51+
return lengthPrefixedEncoding;
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import com.google.protobuf.util.JsonFormat;
9+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
10+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
11+
import org.opensearch.dataprepper.model.event.Event;
12+
import org.opensearch.dataprepper.model.record.Record;
13+
import org.opensearch.dataprepper.model.trace.Span;
14+
15+
import java.io.IOException;
16+
import java.io.InputStream;
17+
import java.io.InputStreamReader;
18+
import java.io.Reader;
19+
import java.time.Instant;
20+
import java.util.List;
21+
import java.util.function.Consumer;
22+
23+
public class OTelTraceJsonDecoder implements ByteDecoder {
24+
25+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
26+
27+
public OTelTraceJsonDecoder(OTelOutputFormat otelOutputFormat) {
28+
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
29+
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
30+
: new OTelProtoStandardCodec.OTelProtoDecoder();
31+
}
32+
33+
@Override
34+
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
35+
Reader reader = new InputStreamReader(inputStream);
36+
ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder();
37+
JsonFormat.parser().merge(reader, builder);
38+
ExportTraceServiceRequest request = builder.build();
39+
40+
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
41+
for (Span span : spans) {
42+
eventConsumer.accept(new Record<>(span));
43+
}
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
9+
import org.opensearch.dataprepper.model.codec.ByteDecoder;
10+
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.record.Record;
12+
import org.opensearch.dataprepper.model.trace.Span;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.nio.ByteBuffer;
19+
import java.time.Instant;
20+
import java.util.List;
21+
import java.util.function.Consumer;
22+
23+
public class OTelTraceProtoBufDecoder implements ByteDecoder {
24+
25+
private static final Logger LOG = LoggerFactory.getLogger(OTelTraceProtoBufDecoder.class);
26+
private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024);
27+
28+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
29+
private final boolean lengthPrefixedEncoding;
30+
31+
public OTelTraceProtoBufDecoder(OTelOutputFormat otelOutputFormat, boolean lengthPrefixedEncoding) {
32+
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
33+
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
34+
: new OTelProtoStandardCodec.OTelProtoDecoder();
35+
this.lengthPrefixedEncoding = lengthPrefixedEncoding;
36+
}
37+
38+
private void parseRequest(final ExportTraceServiceRequest request, final Instant timeReceivedMs,
39+
Consumer<Record<Event>> eventConsumer) {
40+
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
41+
for (Span span : spans) {
42+
eventConsumer.accept(new Record<>(span));
43+
}
44+
}
45+
46+
@Override
47+
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
48+
if (!lengthPrefixedEncoding) {
49+
int available = inputStream.available();
50+
if (available > MAX_REQUEST_LEN) {
51+
throw new IllegalArgumentException(
52+
String.format("Buffer length %d exceeds max allowed buffer length of %d", available, MAX_REQUEST_LEN));
53+
}
54+
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream);
55+
parseRequest(request, timeReceivedMs, eventConsumer);
56+
return;
57+
}
58+
59+
byte[] lenBytes = new byte[4];
60+
while (inputStream.read(lenBytes, 0, 4) == 4) {
61+
ByteBuffer lengthBuffer = ByteBuffer.wrap(lenBytes);
62+
int len = lengthBuffer.getInt();
63+
if (len > MAX_REQUEST_LEN) {
64+
throw new IllegalArgumentException(
65+
String.format("Buffer length %d exceeds max allowed buffer length of %d", len, MAX_REQUEST_LEN));
66+
}
67+
byte[] buffer = new byte[len];
68+
if (inputStream.read(buffer, 0, len) != len) {
69+
LOG.warn("Failed to read {} bytes", len);
70+
continue;
71+
}
72+
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(buffer);
73+
parseRequest(request, timeReceivedMs, eventConsumer);
74+
}
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.params.ParameterizedTest;
10+
import org.junit.jupiter.params.provider.EnumSource;
11+
12+
import static org.hamcrest.CoreMatchers.equalTo;
13+
import static org.hamcrest.CoreMatchers.notNullValue;
14+
import static org.hamcrest.CoreMatchers.nullValue;
15+
import static org.hamcrest.MatcherAssert.assertThat;
16+
17+
class OTelTraceFormatOptionTest {
18+
19+
@ParameterizedTest
20+
@EnumSource(OTelTraceFormatOption.class)
21+
void fromFormatName_returns_correct_value(OTelTraceFormatOption option) {
22+
assertThat(OTelTraceFormatOption.fromFormatName(option.getFormatName()), equalTo(option));
23+
}
24+
25+
@Test
26+
void fromFormatName_returns_null_for_unknown() {
27+
assertThat(OTelTraceFormatOption.fromFormatName("unknown"), nullValue());
28+
}
29+
30+
@Test
31+
void json_has_correct_name() {
32+
assertThat(OTelTraceFormatOption.JSON.getFormatName(), equalTo("json"));
33+
}
34+
35+
@Test
36+
void protobuf_has_correct_name() {
37+
assertThat(OTelTraceFormatOption.PROTOBUF.getFormatName(), equalTo("protobuf"));
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.hamcrest.CoreMatchers.equalTo;
11+
import static org.hamcrest.MatcherAssert.assertThat;
12+
13+
class OTelTraceInputCodecConfigTest {
14+
15+
@Test
16+
void default_format_is_json() {
17+
OTelTraceInputCodecConfig config = new OTelTraceInputCodecConfig();
18+
assertThat(config.getFormat(), equalTo(OTelTraceFormatOption.JSON));
19+
}
20+
21+
@Test
22+
void default_otel_format_is_opensearch() {
23+
OTelTraceInputCodecConfig config = new OTelTraceInputCodecConfig();
24+
assertThat(config.getOTelOutputFormat(), equalTo(OTelOutputFormat.OPENSEARCH));
25+
}
26+
27+
@Test
28+
void default_length_prefixed_encoding_is_false() {
29+
OTelTraceInputCodecConfig config = new OTelTraceInputCodecConfig();
30+
assertThat(config.getLengthPrefixedEncoding(), equalTo(false));
31+
}
32+
33+
@Test
34+
void isValidFormat_returns_true_with_default() {
35+
OTelTraceInputCodecConfig config = new OTelTraceInputCodecConfig();
36+
assertThat(config.isValidFormat(), equalTo(true));
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.otel.codec;
7+
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.api.extension.ExtendWith;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
12+
import static org.hamcrest.CoreMatchers.instanceOf;
13+
import static org.hamcrest.MatcherAssert.assertThat;
14+
import static org.junit.jupiter.api.Assertions.assertNotNull;
15+
import static org.junit.jupiter.api.Assertions.assertThrows;
16+
import static org.mockito.Mockito.mock;
17+
import static org.mockito.Mockito.when;
18+
19+
@ExtendWith(MockitoExtension.class)
20+
class OTelTraceInputCodecTest {
21+
22+
@Test
23+
void constructor_with_json_format_creates_codec() {
24+
OTelTraceInputCodecConfig config = mock(OTelTraceInputCodecConfig.class);
25+
when(config.getFormat()).thenReturn(OTelTraceFormatOption.JSON);
26+
when(config.getOTelOutputFormat()).thenReturn(OTelOutputFormat.OPENSEARCH);
27+
28+
OTelTraceInputCodec codec = new OTelTraceInputCodec(config);
29+
assertNotNull(codec);
30+
}
31+
32+
@Test
33+
void constructor_with_protobuf_format_creates_codec() {
34+
OTelTraceInputCodecConfig config = mock(OTelTraceInputCodecConfig.class);
35+
when(config.getFormat()).thenReturn(OTelTraceFormatOption.PROTOBUF);
36+
when(config.getOTelOutputFormat()).thenReturn(OTelOutputFormat.OPENSEARCH);
37+
when(config.getLengthPrefixedEncoding()).thenReturn(false);
38+
39+
OTelTraceInputCodec codec = new OTelTraceInputCodec(config);
40+
assertNotNull(codec);
41+
}
42+
43+
@Test
44+
void constructor_with_null_config_throws() {
45+
assertThrows(NullPointerException.class, () -> new OTelTraceInputCodec(null));
46+
}
47+
}

0 commit comments

Comments
 (0)