Skip to content

Commit a23d5ad

Browse files
LaszloKovacs9001simonelbaz
authored andcommitted
feat(otlp-sink): Add support for metrics and logs with configurable SigV4 and headers (#6768)
* feat(otlp-sink): Add support for metrics and logs with configurable SigV4 and headers Extend the OTLP sink plugin to support all three signal types (traces, metrics, logs) with full OTLP protocol encoding, configurable SigV4 signing, and custom HTTP headers. Based on the work from @viquer in #6488, with additional enhancements. Multi-signal support: - Extended OTelProtoCodec with convertToResourceMetrics() and convertToResourceLogs() for encoding all signal types - Added generic OtlpSignalHandler<T> interface with type-safe implementations for traces, metrics, and logs - Per-signal-type buffer architecture for optimal batching - Signal type determined automatically at runtime from event type - Generalized sink from Record<Span> to Record<Event> Configurable SigV4 signing: - Added service_name field to OtlpSinkConfig (default: xray) - SigV4Signer uses configurable service name Additional headers: - Added additional_headers map config to OtlpSinkConfig - Protected header blocklist prevents overriding signed headers - Headers injected after SigV4 signing (not included in signature) Per-signal metrics: - Added per-signal counters (rejectedTracesCount, failedMetricsCount, etc.) - Aggregate counters (rejectedRecordsCount, failedRecordsCount) retained Region from AWS config: - getAwsRegion() checks aws.region first, falls back to endpoint parsing Breaking changes (plugin is @experimental): - OtlpSink changed from AbstractSink<Record<Span>> to AbstractSink<Record<Event>> - Metric counters renamed: rejectedSpansCount to rejectedRecordsCount, failedSpansCount to failedRecordsCount Signed-off-by: Roberto Ramirez Vique <viquer@amazon.com> Signed-off-by: Laszlo Kovacs <laszlokv@amazon.com> * fix(otlp-sink): Address code review findings Fix EventHandle leak when encodeEvent() returns null or throws in OtlpSinkBuffer.runTyped() — release handle in both paths. Fix SeverityNumber.forNumber() null safety in OTelProtoStandardCodec to prevent NPE on unrecognized severity numbers. Cache per-signal metric counters at construction time in OtlpSinkMetrics to avoid string concatenation and counter lookup on every increment call in the hot path. Remove stale OTLP_PATH constant and fallback URI from SigV4Signer since endpoint is a required config field. Replace fully-qualified class names with imports in OTelProtoCodec OTelProtoEncoder interface (ResourceMetrics, ResourceLogs, Log). Update README: replace stale flush-on-signal-change batching description with accurate per-signal buffer architecture, add service_name to configuration options table. Replace Thread.sleep(300) with Awaitility in OtlpHttpSenderTest for deterministic async assertions. Replace OutOfMemoryError with Error in OtlpSinkBufferMultiSignalTest to avoid triggering JVM OOM handlers in CI. Signed-off-by: Laszlo Kovacs <laszlokv@amazon.com> * fix(otlp-sink): Keep rejectedSpansCount/failedSpansCount metric names Rename TRACE metrics label from 'Traces' to 'Spans' to preserve backwards compatibility with existing metric allowlists and dashboards. Per-signal counters now emit rejectedSpansCount, rejectedMetricsCount, rejectedLogsCount (and corresponding failed* counters). Remove stale migration note from README since the original metric names are preserved. Signed-off-by: Laszlo Kovacs <laszlokv@amazon.com> --------- Signed-off-by: Roberto Ramirez Vique <viquer@amazon.com> Signed-off-by: Laszlo Kovacs <laszlokv@amazon.com>
1 parent 6ce36ff commit a23d5ad

29 files changed

Lines changed: 3209 additions & 1094 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
2727
*.iml
2828
.kiro
2929
.vscode
30+
.project
31+
.classpath
32+
.settings/
3033

3134
# Manual testing files (should not be committed)
3235
MANUAL_TEST.md

data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,25 @@
55

66
package org.opensearch.dataprepper.plugins.otel.codec;
77

8-
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
98
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
9+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1010
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
11-
import io.opentelemetry.proto.trace.v1.ResourceSpans;
12-
import io.opentelemetry.proto.resource.v1.Resource;
11+
import io.opentelemetry.proto.logs.v1.ResourceLogs;
1312
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
1413
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
14+
import io.opentelemetry.proto.resource.v1.Resource;
15+
import io.opentelemetry.proto.trace.v1.ResourceSpans;
1516
import org.apache.commons.codec.DecoderException;
16-
17+
import org.opensearch.dataprepper.model.log.Log;
18+
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
1719
import org.opensearch.dataprepper.model.metric.Metric;
1820
import org.opensearch.dataprepper.model.record.Record;
19-
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
2021
import org.opensearch.dataprepper.model.trace.Span;
2122

2223
import java.io.UnsupportedEncodingException;
2324
import java.time.Instant;
24-
import java.util.Collection;
2525
import java.util.ArrayList;
26+
import java.util.Collection;
2627
import java.util.HashMap;
2728
import java.util.List;
2829
import java.util.Map;
@@ -150,6 +151,10 @@ Collection<Record<? extends Metric>> parseExportMetricsServiceRequest(
150151

151152
public interface OTelProtoEncoder {
152153
ResourceSpans convertToResourceSpans(final Span span) throws UnsupportedEncodingException, DecoderException;
154+
155+
ResourceMetrics convertToResourceMetrics(final Metric metric) throws UnsupportedEncodingException, DecoderException;
156+
157+
ResourceLogs convertToResourceLogs(final Log log) throws UnsupportedEncodingException, DecoderException;
153158
}
154159

155160
}

data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoOpensearchCodec.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import com.fasterxml.jackson.core.JsonProcessingException;
99
import com.fasterxml.jackson.databind.ObjectMapper;
1010
import com.google.protobuf.ByteString;
11-
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1211
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
12+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1313
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
1414
import io.opentelemetry.proto.common.v1.AnyValue;
1515
import io.opentelemetry.proto.common.v1.InstrumentationScope;
@@ -18,15 +18,15 @@
1818
import io.opentelemetry.proto.logs.v1.ResourceLogs;
1919
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
2020
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
21-
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
2221
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
2322
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
23+
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
2424
import io.opentelemetry.proto.resource.v1.Resource;
2525
import io.opentelemetry.proto.trace.v1.ResourceSpans;
2626
import io.opentelemetry.proto.trace.v1.ScopeSpans;
2727
import io.opentelemetry.proto.trace.v1.Status;
28-
import org.apache.commons.codec.binary.Hex;
2928
import org.apache.commons.codec.DecoderException;
29+
import org.apache.commons.codec.binary.Hex;
3030
import org.opensearch.dataprepper.model.log.JacksonOtelLog;
3131
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
3232
import org.opensearch.dataprepper.model.metric.Bucket;
@@ -53,9 +53,6 @@
5353
import org.slf4j.Logger;
5454
import org.slf4j.LoggerFactory;
5555

56-
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertUnixNanosToISO8601;
57-
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertISO8601ToNanos;
58-
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertByteStringToString;
5956
import java.io.UnsupportedEncodingException;
6057
import java.time.Instant;
6158
import java.util.ArrayList;
@@ -73,6 +70,10 @@
7370
import java.util.stream.Collectors;
7471
import java.util.stream.Stream;
7572

73+
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertByteStringToString;
74+
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertISO8601ToNanos;
75+
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCommonUtils.convertUnixNanosToISO8601;
76+
7677
/**
7778
* OTelProtoOpensearchCodec is for encoding/decoding between {@link org.opensearch.dataprepper.model.trace} and {@link io.opentelemetry.proto} in Opensearch friendly way.
7879
*/
@@ -941,6 +942,20 @@ protected AnyValue objectToAnyValue(final Object obj) throws UnsupportedEncoding
941942

942943
return anyValueBuilder.build();
943944
}
945+
946+
@Override
947+
public io.opentelemetry.proto.metrics.v1.ResourceMetrics convertToResourceMetrics(final org.opensearch.dataprepper.model.metric.Metric metric)
948+
throws UnsupportedEncodingException, DecoderException {
949+
throw new UnsupportedOperationException(
950+
"OTelProtoOpensearchCodec does not support metric encoding. Use OTelProtoStandardCodec instead.");
951+
}
952+
953+
@Override
954+
public io.opentelemetry.proto.logs.v1.ResourceLogs convertToResourceLogs(final org.opensearch.dataprepper.model.log.Log log)
955+
throws UnsupportedEncodingException, DecoderException {
956+
throw new UnsupportedOperationException(
957+
"OTelProtoOpensearchCodec does not support log encoding. Use OTelProtoStandardCodec instead.");
958+
}
944959
}
945960

946961
/**

0 commit comments

Comments
 (0)