diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesFormatOption.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesFormatOption.java new file mode 100644 index 0000000000..e4fa397ea8 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesFormatOption.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum OTelTracesFormatOption { + JSON("json"), + PROTOBUF("protobuf"); + + private static final Map NAMES_MAP = Arrays.stream(OTelTracesFormatOption.values()) + .collect(Collectors.toMap( + value -> value.optionName, + value -> value + )); + + private final String optionName; + + OTelTracesFormatOption(final String optionName) { + this.optionName = optionName; + } + + @JsonValue + public String getFormatName() { + return optionName; + } + + @JsonCreator + public static OTelTracesFormatOption fromFormatName(final String optionName) { + return NAMES_MAP.get(optionName); + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodec.java new file mode 100644 index 0000000000..e914ef340d --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodec.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.function.Consumer; + +@DataPrepperPlugin(name = "otel_traces", pluginType = InputCodec.class, pluginConfigurationType = OTelTracesInputCodecConfig.class) +public class OTelTracesInputCodec implements InputCodec { + + private final ByteDecoder decoder; + + @DataPrepperPluginConstructor + public OTelTracesInputCodec(final OTelTracesInputCodecConfig config) { + Objects.requireNonNull(config); + OTelTracesFormatOption format = config.getFormat(); + OTelOutputFormat otelFormat = config.getOTelOutputFormat(); + + if (format == OTelTracesFormatOption.JSON) { + decoder = new OTelTracesJsonDecoder(otelFormat); + } else if (format == OTelTracesFormatOption.PROTOBUF) { + decoder = new OTelTracesProtoBufDecoder(otelFormat, config.getLengthPrefixedEncoding()); + } else { + throw new RuntimeException("The format " + config.getFormat() + " is not supported."); + } + } + + @Override + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + decoder.parse(inputStream, null, eventConsumer); + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecConfig.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecConfig.java new file mode 100644 index 0000000000..41163e05a3 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotNull; + +@JsonPropertyOrder +@JsonClassDescription("The `otel_traces` codec parses trace files that follow the OpenTelemetry Protocol Specification. " + + "It creates a Data Prepper Span event for each span record along with the resource attributes in the file.") +public class OTelTracesInputCodecConfig { + + static final OTelTracesFormatOption DEFAULT_FORMAT = OTelTracesFormatOption.JSON; + static final OTelOutputFormat DEFAULT_OTEL_FORMAT = OTelOutputFormat.OPENSEARCH; + + @JsonProperty(value = "format", defaultValue = "json") + @JsonPropertyDescription("Specifies the format of the OTel traces. Valid options are 'json' and 'protobuf'.") + @NotNull + private OTelTracesFormatOption format = DEFAULT_FORMAT; + + @JsonProperty(value = "otel_format", defaultValue = "opensearch") + @JsonPropertyDescription("Specifies the output format of the decoded spans.") + @NotNull + private OTelOutputFormat otelFormat = DEFAULT_OTEL_FORMAT; + + @JsonProperty(value = "length_prefixed_encoding", defaultValue = "false") + @JsonPropertyDescription("Specifies if the length precedes the data in protobuf format.") + private boolean lengthPrefixedEncoding; + + public OTelTracesFormatOption getFormat() { + return format; + } + + public OTelOutputFormat getOTelOutputFormat() { + return otelFormat; + } + + @AssertTrue(message = "Not a valid format.") + boolean isValidFormat() { + return format != null; + } + + public boolean getLengthPrefixedEncoding() { + return lengthPrefixedEncoding; + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesJsonDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesJsonDecoder.java new file mode 100644 index 0000000000..614ced784c --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesJsonDecoder.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import com.google.protobuf.util.JsonFormat; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.time.Instant; +import java.util.List; +import java.util.function.Consumer; + +public class OTelTracesJsonDecoder implements ByteDecoder { + + private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; + + public OTelTracesJsonDecoder(OTelOutputFormat otelOutputFormat) { + otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH + ? new OTelProtoOpensearchCodec.OTelProtoDecoder() + : new OTelProtoStandardCodec.OTelProtoDecoder(); + } + + @Override + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { + Reader reader = new InputStreamReader(inputStream); + ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder(); + JsonFormat.parser().merge(reader, builder); + ExportTraceServiceRequest request = builder.build(); + + List spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs); + for (Span span : spans) { + eventConsumer.accept(new Record<>(span)); + } + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesProtoBufDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesProtoBufDecoder.java new file mode 100644 index 0000000000..006bcffacd --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesProtoBufDecoder.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.List; +import java.util.function.Consumer; + +public class OTelTracesProtoBufDecoder implements ByteDecoder { + + private static final Logger LOG = LoggerFactory.getLogger(OTelTracesProtoBufDecoder.class); + private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024); + + private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; + private final boolean lengthPrefixedEncoding; + + public OTelTracesProtoBufDecoder(OTelOutputFormat otelOutputFormat, boolean lengthPrefixedEncoding) { + otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH + ? new OTelProtoOpensearchCodec.OTelProtoDecoder() + : new OTelProtoStandardCodec.OTelProtoDecoder(); + this.lengthPrefixedEncoding = lengthPrefixedEncoding; + } + + private void parseRequest(final ExportTraceServiceRequest request, final Instant timeReceivedMs, + Consumer> eventConsumer) { + List spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs); + for (Span span : spans) { + eventConsumer.accept(new Record<>(span)); + } + } + + @Override + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { + if (!lengthPrefixedEncoding) { + int available = inputStream.available(); + if (available > MAX_REQUEST_LEN) { + throw new IllegalArgumentException( + String.format("Buffer length %d exceeds max allowed buffer length of %d", available, MAX_REQUEST_LEN)); + } + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream); + parseRequest(request, timeReceivedMs, eventConsumer); + return; + } + + byte[] lenBytes = new byte[4]; + while (inputStream.read(lenBytes, 0, 4) == 4) { + ByteBuffer lengthBuffer = ByteBuffer.wrap(lenBytes); + int len = lengthBuffer.getInt(); + if (len > MAX_REQUEST_LEN) { + throw new IllegalArgumentException( + String.format("Buffer length %d exceeds max allowed buffer length of %d", len, MAX_REQUEST_LEN)); + } + byte[] buffer = new byte[len]; + if (inputStream.read(buffer, 0, len) != len) { + LOG.warn("Failed to read {} bytes", len); + continue; + } + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(buffer); + parseRequest(request, timeReceivedMs, eventConsumer); + } + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesFormatOptionTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesFormatOptionTest.java new file mode 100644 index 0000000000..f47ad5e7f0 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesFormatOptionTest.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class OTelTracesFormatOptionTest { + + @ParameterizedTest + @EnumSource(OTelTracesFormatOption.class) + void fromFormatName_returns_correct_value(OTelTracesFormatOption option) { + assertThat(OTelTracesFormatOption.fromFormatName(option.getFormatName()), equalTo(option)); + } + + @Test + void fromFormatName_returns_null_for_unknown() { + assertThat(OTelTracesFormatOption.fromFormatName("unknown"), nullValue()); + } + + @Test + void json_has_correct_name() { + assertThat(OTelTracesFormatOption.JSON.getFormatName(), equalTo("json")); + } + + @Test + void protobuf_has_correct_name() { + assertThat(OTelTracesFormatOption.PROTOBUF.getFormatName(), equalTo("protobuf")); + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecConfigTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecConfigTest.java new file mode 100644 index 0000000000..74a3af593b --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecConfigTest.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class OTelTracesInputCodecConfigTest { + + @Test + void default_format_is_json() { + OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig(); + assertThat(config.getFormat(), equalTo(OTelTracesFormatOption.JSON)); + } + + @Test + void default_otel_format_is_opensearch() { + OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig(); + assertThat(config.getOTelOutputFormat(), equalTo(OTelOutputFormat.OPENSEARCH)); + } + + @Test + void default_length_prefixed_encoding_is_false() { + OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig(); + assertThat(config.getLengthPrefixedEncoding(), equalTo(false)); + } + + @Test + void isValidFormat_returns_true_with_default() { + OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig(); + assertThat(config.isValidFormat(), equalTo(true)); + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecTest.java new file mode 100644 index 0000000000..0a3262d923 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesInputCodecTest.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.trace.Span; +import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest; +import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile; +import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite; + +import java.io.InputStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@DataPrepperPluginTest(pluginName = "otel_traces", pluginType = InputCodec.class) +class OTelTracesInputCodecTest extends BaseDataPrepperPluginStandardTestSuite { + + private static final String TEST_REQUEST_TRACES_FILE = "test-request-multiple-traces.json"; + + @Test + void parse_produces_valid_spans( + @PluginConfigurationFile("otel_traces_codec_json_format.yaml") final InputCodec codec) throws Exception { + InputStream inputStream = OTelTracesInputCodecTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_TRACES_FILE); + codec.parse(inputStream, record -> { + Span span = (Span) record.getData(); + assertThat(span.getServiceName(), is("analytics-service1")); + assertThat(span.getTraceId().isEmpty(), is(false)); + assertThat(span.getSpanId().isEmpty(), is(false)); + assertThat(span.getName().isEmpty(), is(false)); + assertThat(span.getStartTime().isEmpty(), is(false)); + assertThat(span.getEndTime().isEmpty(), is(false)); + }); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesJsonDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesJsonDecoderTest.java new file mode 100644 index 0000000000..a6e758dc51 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesJsonDecoderTest.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; + +import java.io.InputStream; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OTelTracesJsonDecoderTest { + + private static final String TEST_REQUEST_TRACES_FILE = "test-request-multiple-traces.json"; + + public OTelTracesJsonDecoder createObjectUnderTest(OTelOutputFormat outputFormat) { + return new OTelTracesJsonDecoder(outputFormat); + } + + private void validateSpan(Span span) { + assertThat(span.getServiceName(), is("analytics-service1")); + assertThat(span.getTraceId(), notNullValue()); + assertThat(span.getSpanId(), notNullValue()); + assertThat(span.getName(), notNullValue()); + assertThat(span.getStartTime(), notNullValue()); + assertThat(span.getEndTime(), notNullValue()); + assertThat(span.getDurationInNanos(), notNullValue()); + } + + @Test + public void testParse() throws Exception { + InputStream inputStream = OTelTracesJsonDecoderTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_TRACES_FILE); + List> parsedRecords = new ArrayList<>(); + + createObjectUnderTest(OTelOutputFormat.OPENSEARCH).parse(inputStream, Instant.now(), parsedRecords::add); + + assertThat(parsedRecords.size(), equalTo(4)); + for (Record record : parsedRecords) { + validateSpan((Span) record.getData()); + } + } + + @Test + public void testParseWithInvalidJson_ThrowsException() { + InputStream invalidStream = new java.io.ByteArrayInputStream("{ invalid json }}}".getBytes()); + + assertThrows(Exception.class, () -> + createObjectUnderTest(OTelOutputFormat.OPENSEARCH).parse(invalidStream, Instant.now(), record -> {})); + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesProtoBufDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesProtoBufDecoderTest.java new file mode 100644 index 0000000000..e2c4c2fa5f --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTracesProtoBufDecoderTest.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import com.google.protobuf.util.JsonFormat; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OTelTracesProtoBufDecoderTest { + + private static final String TEST_REQUEST_JSON_TRACES_FILE = "test-request-multiple-traces.json"; + + public OTelTracesProtoBufDecoder createObjectUnderTest(OTelOutputFormat outputFormat, boolean lengthPrefixedEncoding) { + return new OTelTracesProtoBufDecoder(outputFormat, lengthPrefixedEncoding); + } + + private void validateSpan(Span span) { + assertThat(span.getServiceName(), is("analytics-service1")); + assertThat(span.getTraceId(), notNullValue()); + assertThat(span.getSpanId(), notNullValue()); + assertThat(span.getName(), notNullValue()); + assertThat(span.getStartTime(), notNullValue()); + assertThat(span.getEndTime(), notNullValue()); + assertThat(span.getDurationInNanos(), notNullValue()); + } + + @Test + public void testParse() throws Exception { + final ExportTraceServiceRequest request = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_JSON_TRACES_FILE); + InputStream inputStream = new ByteArrayInputStream(request.toByteArray()); + List> parsedRecords = new ArrayList<>(); + + createObjectUnderTest(OTelOutputFormat.OPENSEARCH, false).parse(inputStream, Instant.now(), parsedRecords::add); + + assertThat(parsedRecords.size(), equalTo(4)); + for (Record record : parsedRecords) { + validateSpan((Span) record.getData()); + } + } + + @Test + public void testParseWithLengthPrefixedEncoding() throws Exception { + final ExportTraceServiceRequest request = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_JSON_TRACES_FILE); + byte[] requestBytes = request.toByteArray(); + + // Generate length-prefixed protobuf on-the-fly (mimics OTel file exporter proto format) + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (int i = 0; i < 3; i++) { + ByteBuffer lengthBuffer = ByteBuffer.allocate(4); + lengthBuffer.putInt(requestBytes.length); + outputStream.write(lengthBuffer.array()); + outputStream.write(requestBytes); + } + + List> parsedRecords = new ArrayList<>(); + createObjectUnderTest(OTelOutputFormat.OPENSEARCH, true) + .parse(new ByteArrayInputStream(outputStream.toByteArray()), Instant.now(), parsedRecords::add); + + // 3 requests × 4 spans each = 12 spans + assertThat(parsedRecords.size(), equalTo(12)); + validateSpan((Span) parsedRecords.get(0).getData()); + } + + @Test + public void testParseWithLargeDynamicRequest_ThrowsException() throws Exception { + List spans = new ArrayList<>(); + for (int i = 0; i < 4 * 1024 * 1024; i++) { + spans.add(io.opentelemetry.proto.trace.v1.Span.newBuilder().build()); + } + ExportTraceServiceRequest request = ExportTraceServiceRequest.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addAllSpans(spans) + .build())) + .build(); + + InputStream inputStream = new ByteArrayInputStream(request.toByteArray()); + + assertThrows(IllegalArgumentException.class, () -> + createObjectUnderTest(OTelOutputFormat.OPENSEARCH, false) + .parse(inputStream, Instant.now(), record -> {})); + } + + private ExportTraceServiceRequest buildExportTraceServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + + private String getFileAsJsonString(String requestJsonFileName) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + OTelTracesProtoBufDecoderTest.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/org/opensearch/dataprepper/plugins/otel/codec/otel_traces_codec_json_format.yaml b/data-prepper-plugins/otel-proto-common/src/test/resources/org/opensearch/dataprepper/plugins/otel/codec/otel_traces_codec_json_format.yaml new file mode 100644 index 0000000000..e0d108cd06 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/org/opensearch/dataprepper/plugins/otel/codec/otel_traces_codec_json_format.yaml @@ -0,0 +1,15 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +otel-pipeline: + source: + unused: + processor: + - otel_traces: + format: json + otel_format: opensearch + sink: + - unused: