Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum OTelTracesFormatOption {
JSON("json"),
PROTOBUF("protobuf");

private static final Map<String, OTelTracesFormatOption> NAMES_MAP = Arrays.stream(OTelTracesFormatOption.values())
.collect(Collectors.toMap(
value -> value.optionName,
value -> value
));

private final String optionName;

OTelTracesFormatOption(final String optionName) {
this.optionName = optionName;
}

@JsonValue
public String getFormatName() {
return optionName;
}

@JsonCreator
public static OTelTracesFormatOption fromFormatName(final String optionName) {
return NAMES_MAP.get(optionName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "otel_traces", pluginType = InputCodec.class, pluginConfigurationType = OTelTracesInputCodecConfig.class)
public class OTelTracesInputCodec implements InputCodec {

private final ByteDecoder decoder;

@DataPrepperPluginConstructor
public OTelTracesInputCodec(final OTelTracesInputCodecConfig config) {
Objects.requireNonNull(config);
OTelTracesFormatOption format = config.getFormat();
OTelOutputFormat otelFormat = config.getOTelOutputFormat();

if (format == OTelTracesFormatOption.JSON) {
decoder = new OTelTracesJsonDecoder(otelFormat);
} else if (format == OTelTracesFormatOption.PROTOBUF) {
decoder = new OTelTracesProtoBufDecoder(otelFormat, config.getLengthPrefixedEncoding());
} else {
throw new RuntimeException("The format " + config.getFormat() + " is not supported.");
}
}

@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
decoder.parse(inputStream, null, eventConsumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;

@JsonPropertyOrder
@JsonClassDescription("The `otel_traces` codec parses trace files that follow the OpenTelemetry Protocol Specification. "
+ "It creates a Data Prepper Span event for each span record along with the resource attributes in the file.")
public class OTelTracesInputCodecConfig {

static final OTelTracesFormatOption DEFAULT_FORMAT = OTelTracesFormatOption.JSON;
static final OTelOutputFormat DEFAULT_OTEL_FORMAT = OTelOutputFormat.OPENSEARCH;

@JsonProperty(value = "format", defaultValue = "json")
@JsonPropertyDescription("Specifies the format of the OTel traces. Valid options are 'json' and 'protobuf'.")
@NotNull
private OTelTracesFormatOption format = DEFAULT_FORMAT;

@JsonProperty(value = "otel_format", defaultValue = "opensearch")
@JsonPropertyDescription("Specifies the output format of the decoded spans.")
@NotNull
private OTelOutputFormat otelFormat = DEFAULT_OTEL_FORMAT;

@JsonProperty(value = "length_prefixed_encoding", defaultValue = "false")
@JsonPropertyDescription("Specifies if the length precedes the data in protobuf format.")
private boolean lengthPrefixedEncoding;

public OTelTracesFormatOption getFormat() {
return format;
}

public OTelOutputFormat getOTelOutputFormat() {
return otelFormat;
}

@AssertTrue(message = "Not a valid format.")
boolean isValidFormat() {
return format != null;
}

public boolean getLengthPrefixedEncoding() {
return lengthPrefixedEncoding;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.Span;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.time.Instant;
import java.util.List;
import java.util.function.Consumer;

public class OTelTracesJsonDecoder implements ByteDecoder {

private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;

public OTelTracesJsonDecoder(OTelOutputFormat otelOutputFormat) {
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
: new OTelProtoStandardCodec.OTelProtoDecoder();
}

@Override
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
Reader reader = new InputStreamReader(inputStream);
ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder();
JsonFormat.parser().merge(reader, builder);
ExportTraceServiceRequest request = builder.build();

List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
for (Span span : spans) {
eventConsumer.accept(new Record<>(span));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.Span;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.function.Consumer;

public class OTelTracesProtoBufDecoder implements ByteDecoder {

private static final Logger LOG = LoggerFactory.getLogger(OTelTracesProtoBufDecoder.class);
private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024);

private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
private final boolean lengthPrefixedEncoding;

public OTelTracesProtoBufDecoder(OTelOutputFormat otelOutputFormat, boolean lengthPrefixedEncoding) {
otelProtoDecoder = otelOutputFormat == OTelOutputFormat.OPENSEARCH
? new OTelProtoOpensearchCodec.OTelProtoDecoder()
: new OTelProtoStandardCodec.OTelProtoDecoder();
this.lengthPrefixedEncoding = lengthPrefixedEncoding;
}

private void parseRequest(final ExportTraceServiceRequest request, final Instant timeReceivedMs,
Consumer<Record<Event>> eventConsumer) {
List<Span> spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs);
for (Span span : spans) {
eventConsumer.accept(new Record<>(span));
}
}

@Override
public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
if (!lengthPrefixedEncoding) {
int available = inputStream.available();
if (available > MAX_REQUEST_LEN) {
throw new IllegalArgumentException(
String.format("Buffer length %d exceeds max allowed buffer length of %d", available, MAX_REQUEST_LEN));
}
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream);
parseRequest(request, timeReceivedMs, eventConsumer);
return;
}

byte[] lenBytes = new byte[4];
while (inputStream.read(lenBytes, 0, 4) == 4) {
ByteBuffer lengthBuffer = ByteBuffer.wrap(lenBytes);
int len = lengthBuffer.getInt();
if (len > MAX_REQUEST_LEN) {
throw new IllegalArgumentException(
String.format("Buffer length %d exceeds max allowed buffer length of %d", len, MAX_REQUEST_LEN));
}
byte[] buffer = new byte[len];
if (inputStream.read(buffer, 0, len) != len) {
LOG.warn("Failed to read {} bytes", len);
continue;
}
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(buffer);
parseRequest(request, timeReceivedMs, eventConsumer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;

class OTelTracesFormatOptionTest {

@ParameterizedTest
@EnumSource(OTelTracesFormatOption.class)
void fromFormatName_returns_correct_value(OTelTracesFormatOption option) {
assertThat(OTelTracesFormatOption.fromFormatName(option.getFormatName()), equalTo(option));
}

@Test
void fromFormatName_returns_null_for_unknown() {
assertThat(OTelTracesFormatOption.fromFormatName("unknown"), nullValue());
}

@Test
void json_has_correct_name() {
assertThat(OTelTracesFormatOption.JSON.getFormatName(), equalTo("json"));
}

@Test
void protobuf_has_correct_name() {
assertThat(OTelTracesFormatOption.PROTOBUF.getFormatName(), equalTo("protobuf"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

class OTelTracesInputCodecConfigTest {

@Test
void default_format_is_json() {
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
assertThat(config.getFormat(), equalTo(OTelTracesFormatOption.JSON));
}

@Test
void default_otel_format_is_opensearch() {
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
assertThat(config.getOTelOutputFormat(), equalTo(OTelOutputFormat.OPENSEARCH));
}

@Test
void default_length_prefixed_encoding_is_false() {
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
assertThat(config.getLengthPrefixedEncoding(), equalTo(false));
}

@Test
void isValidFormat_returns_true_with_default() {
OTelTracesInputCodecConfig config = new OTelTracesInputCodecConfig();
assertThat(config.isValidFormat(), equalTo(true));
}
}
Loading
Loading