diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 6fa778e74d3..34b537fbb48 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -47,7 +47,6 @@ public final class ConfigDefaults { public static final boolean DEFAULT_STARTUP_LOGS_ENABLED = true; static final boolean DEFAULT_INJECT_DATADOG_ATTRIBUTE = true; - static final boolean DEFAULT_WRITER_BAGGAGE_INJECT = true; static final String DEFAULT_SITE = "datadoghq.com"; static final boolean DEFAULT_CODE_ORIGIN_FOR_SPANS_INTERFACE_SUPPORT = false; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java index d9781c7261b..c79f2817d86 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java @@ -17,6 +17,7 @@ public final class TracerConfig { public static final String ID_GENERATION_STRATEGY = "id.generation.strategy"; public static final String WRITER_TYPE = "writer.type"; public static final String WRITER_BAGGAGE_INJECT = "writer.baggage.inject"; + public static final String WRITER_LINKS_INJECT = "writer.links.inject"; public static final String PRIORITIZATION_TYPE = "prioritization.type"; public static final String TRACE_AGENT_URL = "trace.agent.url"; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index adfd16bd257..7dd3a62849f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -209,6 +209,7 @@ public static final CoreTracerBuilder builder() { private final TimeSource timeSource; private final ProfilingContextIntegration profilingContextIntegration; private final boolean injectBaggageAsTags; + private final boolean injectLinksAsTags; private final boolean flushOnClose; private final Collection shutdownListeners = new CopyOnWriteArrayList<>(); @@ -326,6 +327,7 @@ public static class CoreTracerBuilder { private boolean reportInTracerFlare; private boolean pollForTracingConfiguration; private boolean injectBaggageAsTags; + private boolean injectLinksAsTags; private boolean flushOnClose; public CoreTracerBuilder serviceName(String serviceName) { @@ -471,6 +473,11 @@ public CoreTracerBuilder injectBaggageAsTags(boolean injectBaggageAsTags) { return this; } + public CoreTracerBuilder injectLinksAsTags(boolean injectLinksAsTags) { + this.injectLinksAsTags = injectLinksAsTags; + return this; + } + public CoreTracerBuilder flushOnClose(boolean flushOnClose) { this.flushOnClose = flushOnClose; return this; @@ -506,6 +513,7 @@ public CoreTracerBuilder config(final Config config) { partialFlushMinSpans(config.getPartialFlushMinSpans()); strictTraceWrites(config.isTraceStrictWritesEnabled()); injectBaggageAsTags(config.isInjectBaggageAsTagsEnabled()); + injectLinksAsTags(config.isInjectLinksAsTagsEnabled()); flushOnClose(config.isCiVisibilityEnabled()); return this; } @@ -538,6 +546,7 @@ public CoreTracer build() { reportInTracerFlare, pollForTracingConfiguration, injectBaggageAsTags, + injectLinksAsTags, flushOnClose); } } @@ -569,6 +578,7 @@ private CoreTracer( final boolean reportInTracerFlare, final boolean pollForTracingConfiguration, final boolean injectBaggageAsTags, + final boolean injectLinksAsTags, final boolean flushOnClose) { this( config, @@ -597,6 +607,7 @@ private CoreTracer( reportInTracerFlare, pollForTracingConfiguration, injectBaggageAsTags, + injectLinksAsTags, flushOnClose); } @@ -628,6 +639,7 @@ private CoreTracer( final boolean reportInTracerFlare, final boolean pollForTracingConfiguration, final boolean injectBaggageAsTags, + final boolean injectLinksAsTags, final boolean flushOnClose) { assert localRootSpanTags != null; @@ -864,6 +876,7 @@ private CoreTracer( propagationTagsFactory = PropagationTags.factory(config); this.profilingContextIntegration = profilingContextIntegration; this.injectBaggageAsTags = injectBaggageAsTags; + this.injectLinksAsTags = injectLinksAsTags; this.flushOnClose = flushOnClose; this.allowInferredServices = SpanNaming.instance().namingSchema().allowInferredServices(); if (profilingContextIntegration != ProfilingContextIntegration.NoOp.INSTANCE) { @@ -2119,7 +2132,8 @@ protected static final DDSpanContext buildSpanContext( tracer.disableSamplingMechanismValidation, propagationTags, tracer.profilingContextIntegration, - tracer.injectBaggageAsTags); + tracer.injectBaggageAsTags, + tracer.injectLinksAsTags); // By setting the tags on the context we apply decorators to any tags that have been set via // the builder. This is the order that the tags were added previously, but maybe the `tags` diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index bc8e14ead06..b0e8926dbb3 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -883,7 +883,7 @@ public TraceConfig traceConfig() { return context.getTraceCollector().getTraceConfig(); } - List getLinks() { + public List getLinks() { return this.links; } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index c01cdaf6b71..5285ba14de3 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -184,6 +184,7 @@ public class DDSpanContext private final ProfilingContextIntegration profilingContextIntegration; private final boolean injectBaggageAsTags; + private final boolean injectLinksAsTags; private volatile int encodedOperationName; private volatile int encodedResourceName; @@ -238,6 +239,7 @@ public DDSpanContext( disableSamplingMechanismValidation, propagationTags, ProfilingContextIntegration.NoOp.INSTANCE, + true, true); } @@ -261,7 +263,8 @@ public DDSpanContext( final PathwayContext pathwayContext, final boolean disableSamplingMechanismValidation, final PropagationTags propagationTags, - final boolean injectBaggageAsTags) { + final boolean injectBaggageAsTags, + final boolean injectLinksAsTags) { this( traceId, spanId, @@ -286,7 +289,8 @@ public DDSpanContext( disableSamplingMechanismValidation, propagationTags, ProfilingContextIntegration.NoOp.INSTANCE, - injectBaggageAsTags); + injectBaggageAsTags, + injectLinksAsTags); } public DDSpanContext( @@ -313,7 +317,8 @@ public DDSpanContext( final boolean disableSamplingMechanismValidation, final PropagationTags propagationTags, final ProfilingContextIntegration profilingContextIntegration, - final boolean injectBaggageAsTags) { + final boolean injectBaggageAsTags, + final boolean injectLinksAsTags) { assert traceCollector != null; this.traceCollector = traceCollector; @@ -370,6 +375,7 @@ public DDSpanContext( : traceCollector.getTracer().getPropagationTagsFactory().empty(); this.propagationTags.updateTraceIdHighOrderBits(this.traceId.toHighOrderLong()); this.injectBaggageAsTags = injectBaggageAsTags; + this.injectLinksAsTags = injectLinksAsTags; if (origin != null) { setOrigin(origin); } @@ -1177,10 +1183,14 @@ void processTagsAndBaggage( // Tags TagsPostProcessorFactory.lazyProcessor().processTags(unsafeTags, this, restrictedSpan); - String linksTag = DDSpanLink.toTag(restrictedSpan.getLinks()); - if (linksTag != null) { - unsafeTags.put(SPAN_LINKS, linksTag); + // Links + if (injectLinksAsTags) { + String linksTag = DDSpanLink.toTag(restrictedSpan.getLinks()); + if (linksTag != null) { + unsafeTags.set(SPAN_LINKS, linksTag); + } } + // Baggage Map baggageItemsWithPropagationTags; if (injectBaggageAsTags) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java index 724521c77d2..2aa6de29335 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java @@ -15,6 +15,7 @@ import datadog.communication.serialization.SimpleUtf8Cache; import datadog.communication.serialization.StreamingBuffer; import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; import java.nio.ByteBuffer; import java.util.List; @@ -45,6 +46,12 @@ private OtlpCommonProto() {} ? new GenerationalUtf8Cache(Config.get().getTagValueUtf8CacheSize()) : null; + public static void recalibrateCaches() { + if (VALUE_CACHE != null) { + VALUE_CACHE.recalibrate(); + } + } + public static int sizeVarInt(int value) { return 1 + (31 - Integer.numberOfLeadingZeros(value)) / 7; } @@ -153,8 +160,13 @@ public static void writeInstrumentationScope( } @SuppressWarnings("unchecked") - public static void writeAttribute(StreamingBuffer buf, int type, String key, Object value) { - byte[] keyUtf8 = keyUtf8(key); + public static void writeAttribute(StreamingBuffer buf, int type, CharSequence key, Object value) { + byte[] keyUtf8; + if (key instanceof UTF8BytesString) { + keyUtf8 = ((UTF8BytesString) key).getUtf8Bytes(); + } else { + keyUtf8 = keyUtf8(key.toString()); + } switch (type) { case STRING: writeStringAttribute(buf, keyUtf8, valueUtf8((String) value)); @@ -163,10 +175,10 @@ public static void writeAttribute(StreamingBuffer buf, int type, String key, Obj writeBooleanAttribute(buf, keyUtf8, (boolean) value); break; case LONG: - writeLongAttribute(buf, keyUtf8, (long) value); + writeLongAttribute(buf, keyUtf8, ((Number) value).longValue()); break; case DOUBLE: - writeDoubleAttribute(buf, keyUtf8, (double) value); + writeDoubleAttribute(buf, keyUtf8, ((Number) value).doubleValue()); break; case STRING_ARRAY: writeStringArrayAttribute(buf, keyUtf8, (List) value); @@ -175,16 +187,29 @@ public static void writeAttribute(StreamingBuffer buf, int type, String key, Obj writeBooleanArrayAttribute(buf, keyUtf8, (List) value); break; case LONG_ARRAY: - writeLongArrayAttribute(buf, keyUtf8, (List) value); + writeLongArrayAttribute(buf, keyUtf8, (List) value); break; case DOUBLE_ARRAY: - writeDoubleArrayAttribute(buf, keyUtf8, (List) value); + writeDoubleArrayAttribute(buf, keyUtf8, (List) value); break; default: throw new IllegalArgumentException("Unknown attribute type: " + type); } } + public static void writeAttribute( + StreamingBuffer buf, UTF8BytesString key, UTF8BytesString value) { + writeStringAttribute(buf, key.getUtf8Bytes(), value.getUtf8Bytes()); + } + + public static void writeAttribute(StreamingBuffer buf, UTF8BytesString key, String value) { + writeStringAttribute(buf, key.getUtf8Bytes(), valueUtf8(value)); + } + + public static void writeAttribute(StreamingBuffer buf, UTF8BytesString key, long value) { + writeLongAttribute(buf, key.getUtf8Bytes(), value); + } + private static byte[] keyUtf8(String key) { return KEY_CACHE != null ? KEY_CACHE.getUtf8(key) : key.getBytes(UTF_8); } @@ -301,10 +326,10 @@ private static void writeBooleanArrayAttribute( } private static void writeLongArrayAttribute( - StreamingBuffer buf, byte[] keyUtf8, List values) { + StreamingBuffer buf, byte[] keyUtf8, List values) { long[] longValues = new long[values.size()]; for (int i = 0; i < longValues.length; i++) { - longValues[i] = values.get(i); // avoid repeated unboxing later + longValues[i] = values.get(i).longValue(); // avoid repeated unboxing later } int arraySize = 0; for (long longValue : longValues) { @@ -332,7 +357,7 @@ private static void writeLongArrayAttribute( } private static void writeDoubleArrayAttribute( - StreamingBuffer buf, byte[] keyUtf8, List values) { + StreamingBuffer buf, byte[] keyUtf8, List values) { int arraySize = 11 * values.size(); int valueSize = 1 + sizeVarInt(arraySize) + arraySize; int keyValueSize = @@ -345,11 +370,11 @@ private static void writeDoubleArrayAttribute( writeVarInt(buf, valueSize); writeTag(buf, 5, LEN_WIRE_TYPE); writeVarInt(buf, arraySize); - for (double value : values) { + for (Number value : values) { writeTag(buf, 1, LEN_WIRE_TYPE); buf.put((byte) 9); writeTag(buf, 4, I64_WIRE_TYPE); - writeI64(buf, value); + writeI64(buf, value.doubleValue()); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java index 25fdec6b538..2a2141cfe8b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java @@ -25,6 +25,7 @@ import datadog.trace.bootstrap.otlp.metrics.OtlpMetricVisitor; import datadog.trace.bootstrap.otlp.metrics.OtlpMetricsVisitor; import datadog.trace.bootstrap.otlp.metrics.OtlpScopedMetricsVisitor; +import datadog.trace.core.otlp.common.OtlpCommonProto; import datadog.trace.core.otlp.common.OtlpPayload; import java.util.ArrayDeque; import java.util.ArrayList; @@ -33,7 +34,7 @@ import java.util.function.Consumer; /** - * Collects OpenTelemetry metrics and marshalls them into a chunked 'metrics.proto' payload. + * Collects OpenTelemetry metrics and marshals them into a chunked 'metrics.proto' payload. * *

This collector is designed to be called by a single thread. To minimize allocations each * collection returns a payload only to be used by the calling thread until the next collection. @@ -83,7 +84,7 @@ public OtlpMetricsProtoCollector(TimeSource timeSource) { } /** - * Collects OpenTelemetry metrics and marshalls them into a chunked payload. + * Collects OpenTelemetry metrics and marshals them into a chunked payload. * *

This payload is only valid for the calling thread until the next collection. */ @@ -108,15 +109,18 @@ private void start() { startNanos = endNanos; endNanos = timeSource.getCurrentTimeNanos(); - // clear payloadChunks in case it wasn't fully consumed via OtlpMetricsPayload + // clear payloadChunks in case it wasn't fully consumed via OtlpPayload payloadChunks.clear(); + + // remove stale entries from caches + OtlpCommonProto.recalibrateCaches(); } /** Cleanup elements used to collect metrics data. */ private void stop() { buf.reset(); - // leave payloadChunks in place so it can be consumed via OtlpMetricsPayload + // leave payloadChunks in place so it can be consumed via OtlpPayload scopedChunks.clear(); metricChunks.clear(); @@ -146,6 +150,31 @@ public OtlpMetricVisitor visitMetric(OtelInstrumentDescriptor metric) { return this; } + @Override + public void visitAttribute(int type, String key, Object value) { + // add attribute to the data point currently being collected + writeTag(buf, currentMetric.getType() == HISTOGRAM ? 9 : 7, LEN_WIRE_TYPE); + writeAttribute(buf, type, key, value); + } + + @Override + public void visitDataPoint(OtlpDataPoint point) { + OtelInstrumentType metricType = currentMetric.getType(); + + // gauges don't have a start time (no aggregation temporality) + if (metricType != GAUGE && metricType != OBSERVABLE_GAUGE) { + writeTag(buf, 2, I64_WIRE_TYPE); + writeI64(buf, startNanos); + } + writeTag(buf, 3, I64_WIRE_TYPE); + writeI64(buf, endNanos); + + // add complete data point message to the metric chunks + byte[] pointMessage = recordDataPointMessage(buf, point); + metricChunks.add(pointMessage); + metricBytes += pointMessage.length; + } + // called once we've processed all scopes and metric messages private OtlpPayload completePayload() { if (currentScope != null) { @@ -204,29 +233,4 @@ private void completeMetric() { metricChunks.clear(); metricBytes = 0; } - - @Override - public void visitAttribute(int type, String key, Object value) { - // add attribute to the data point currently being collected - writeTag(buf, currentMetric.getType() == HISTOGRAM ? 9 : 7, LEN_WIRE_TYPE); - writeAttribute(buf, type, key, value); - } - - @Override - public void visitDataPoint(OtlpDataPoint point) { - OtelInstrumentType metricType = currentMetric.getType(); - - // gauges don't have a start time (no aggregation temporality) - if (metricType != GAUGE && metricType != OBSERVABLE_GAUGE) { - writeTag(buf, 2, I64_WIRE_TYPE); - writeI64(buf, startNanos); - } - writeTag(buf, 3, I64_WIRE_TYPE); - writeI64(buf, endNanos); - - // add complete data point message to the metric chunks - byte[] pointMessage = recordDataPointMessage(buf, point); - metricChunks.add(pointMessage); - metricBytes += pointMessage.length; - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java new file mode 100644 index 00000000000..3a7f2d9b3a0 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java @@ -0,0 +1,14 @@ +package datadog.trace.core.otlp.trace; + +import datadog.trace.core.CoreSpan; +import datadog.trace.core.otlp.common.OtlpPayload; +import java.util.List; + +/** Collects traces ready for export. */ +public interface OtlpTraceCollector { + OtlpTraceCollector NOOP_COLLECTOR = () -> OtlpPayload.EMPTY; + + OtlpPayload collectTraces(); + + default void addTrace(List> spans) {} +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java new file mode 100644 index 00000000000..67469ac1173 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java @@ -0,0 +1,318 @@ +package datadog.trace.core.otlp.trace; + +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.DD_MEASURED; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.DD_PARTIAL_VERSION; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.DD_TOP_LEVEL; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.DD_WAS_LONG_RUNNING; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; +import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN; +import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE; +import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG; +import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING; +import static datadog.trace.common.writer.RemoteMapper.HTTP_STATUS; +import static datadog.trace.common.writer.ddagent.TraceMapper.ORIGIN_KEY; +import static datadog.trace.common.writer.ddagent.TraceMapper.PROCESS_TAGS_KEY; +import static datadog.trace.common.writer.ddagent.TraceMapper.SAMPLING_PRIORITY_KEY; +import static datadog.trace.common.writer.ddagent.TraceMapper.THREAD_ID; +import static datadog.trace.common.writer.ddagent.TraceMapper.THREAD_NAME; +import static datadog.trace.core.otlp.common.OtlpCommonProto.I32_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.VARINT_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; +import static datadog.trace.core.otlp.common.OtlpCommonProto.sizeVarInt; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI32; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeInstrumentationScope; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeString; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeVarInt; +import static java.nio.charset.StandardCharsets.UTF_8; + +import datadog.communication.serialization.GrowableBuffer; +import datadog.communication.serialization.StreamingBuffer; +import datadog.trace.api.Config; +import datadog.trace.api.DDTags; +import datadog.trace.api.DDTraceId; +import datadog.trace.api.TagMap; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.core.DDSpan; +import datadog.trace.core.Metadata; +import datadog.trace.core.MetadataConsumer; +import datadog.trace.core.PendingTrace; +import datadog.trace.core.propagation.PropagationTags; + +/** Provides optimized writers for OpenTelemetry's "trace.proto" wire protocol. */ +public final class OtlpTraceProto { + + private static final UTF8BytesString SERVICE_NAME = UTF8BytesString.create("service.name"); + private static final UTF8BytesString RESOURCE_NAME = UTF8BytesString.create("resource.name"); + private static final UTF8BytesString OPERATION_NAME = UTF8BytesString.create("operation.name"); + private static final UTF8BytesString SPAN_TYPE = UTF8BytesString.create("span.type"); + + static final int NO_TRACE_FLAGS = 0x00000000; + static final int SAMPLED_TRACE_FLAG = 0x00000001; + static final int REMOTE_TRACE_FLAG = 0x00000300; + + private OtlpTraceProto() {} + + /** + * Records the first part of a scoped spans message where we know its nested span messages will + * follow in one or more byte-arrays that add up to the given number of remaining bytes. + */ + public static byte[] recordScopedSpansMessage( + GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) { + + writeTag(buf, 1, LEN_WIRE_TYPE); + writeInstrumentationScope(buf, scope); + if (scope.getSchemaUrl() != null) { + writeTag(buf, 3, LEN_WIRE_TYPE); + writeString(buf, scope.getSchemaUrl().getUtf8Bytes()); + } + + return recordMessage(buf, 2, remainingBytes); + } + + /** + * Records the first part of a span message where we know its nested span-links will follow in one + * or more byte-arrays that add up to the given number of remaining bytes. + */ + public static byte[] recordSpanMessage( + GrowableBuffer buf, DDSpan span, MetaWriter metaWriter, int remainingBytes) { + PropagationTags propagationTags = span.context().getPropagationTags(); + + writeTag(buf, 1, LEN_WIRE_TYPE); + writeTraceId(buf, span.getTraceId()); + + writeTag(buf, 2, LEN_WIRE_TYPE); + writeSpanId(buf, span.getSpanId()); + + String tracestate = propagationTags.getW3CTracestate(); + if (tracestate != null) { + writeTag(buf, 3, LEN_WIRE_TYPE); + writeString(buf, tracestate); + } + + if (span.getParentId() != 0) { + writeTag(buf, 4, LEN_WIRE_TYPE); + writeSpanId(buf, span.getParentId()); + } + + int traceFlags = NO_TRACE_FLAGS; + if (span.samplingPriority() > 0) { + traceFlags |= SAMPLED_TRACE_FLAG; + } + if (span.context().isRemote()) { + traceFlags |= REMOTE_TRACE_FLAG; + } + if (traceFlags != NO_TRACE_FLAGS) { + writeTag(buf, 16, I32_WIRE_TYPE); + writeI32(buf, traceFlags); + } + + writeTag(buf, 5, LEN_WIRE_TYPE); + CharSequence spanName = span.getResourceName(); + if (spanName instanceof UTF8BytesString) { + writeString(buf, ((UTF8BytesString) spanName).getUtf8Bytes()); + } else { + writeString(buf, spanName.toString()); + } + + writeTag(buf, 6, VARINT_WIRE_TYPE); + writeVarInt(buf, spanKind(span.context().getSpanKindString())); + + writeTag(buf, 7, I64_WIRE_TYPE); + writeI64(buf, span.getStartTime()); + + writeTag(buf, 8, I64_WIRE_TYPE); + writeI64(buf, span.getStartTime() + PendingTrace.getDurationNano(span)); + + if (!Config.get().getServiceName().equals(span.getServiceName())) { + writeSpanTag(buf, SERVICE_NAME, span.getServiceName()); + } + writeSpanTag(buf, RESOURCE_NAME, span.getResourceName()); + writeSpanTag(buf, OPERATION_NAME, span.getOperationName()); + writeSpanTag(buf, SPAN_TYPE, span.getSpanType()); + + span.processTagsAndBaggage(metaWriter); + + if (span.isError()) { + int stateSize = 2; + byte[] errorUtf8 = null; + Object errorMessage = span.getTag(DDTags.ERROR_MSG); + if (errorMessage instanceof String) { + errorUtf8 = ((String) errorMessage).getBytes(UTF_8); + stateSize += 1 + sizeVarInt(errorUtf8.length) + errorUtf8.length; + } + writeTag(buf, 15, LEN_WIRE_TYPE); + writeVarInt(buf, stateSize); + if (errorUtf8 != null) { + writeTag(buf, 2, LEN_WIRE_TYPE); + writeString(buf, errorUtf8); + } + writeTag(buf, 3, VARINT_WIRE_TYPE); + writeVarInt(buf, 2); + } + + return recordMessage(buf, 2, remainingBytes); + } + + /** Completes recording of a span-link message and packs it into its own byte-array. */ + public static byte[] recordSpanLinkMessage(GrowableBuffer buf, AgentSpanLink spanLink) { + + writeTag(buf, 1, LEN_WIRE_TYPE); + writeTraceId(buf, spanLink.traceId()); + + writeTag(buf, 2, LEN_WIRE_TYPE); + writeSpanId(buf, spanLink.spanId()); + + writeTag(buf, 3, LEN_WIRE_TYPE); + writeString(buf, spanLink.traceState()); + + spanLink + .attributes() + .asMap() + .forEach( + (key, value) -> { + writeTag(buf, 4, LEN_WIRE_TYPE); + writeAttribute(buf, STRING, key, value); + }); + + writeTag(buf, 6, I32_WIRE_TYPE); + writeI32(buf, spanLink.traceFlags()); + + return recordMessage(buf, 13); + } + + public static void writeTraceId(StreamingBuffer buf, DDTraceId traceId) { + writeVarInt(buf, 16); + writeI64(buf, traceId.toLong()); + writeI64(buf, traceId.toHighOrderLong()); + } + + public static void writeSpanId(StreamingBuffer buf, long spanId) { + writeVarInt(buf, 8); + writeI64(buf, spanId); + } + + private static void writeSpanTag(StreamingBuffer buf, TagMap.EntryReader tagEntry) { + writeTag(buf, 9, LEN_WIRE_TYPE); + switch (tagEntry.type()) { + case TagMap.EntryReader.BOOLEAN: + writeAttribute(buf, BOOLEAN, tagEntry.tag(), tagEntry.objectValue()); + break; + case TagMap.EntryReader.INT: + case TagMap.EntryReader.LONG: + writeAttribute(buf, LONG, tagEntry.tag(), tagEntry.objectValue()); + break; + case TagMap.EntryReader.FLOAT: + case TagMap.EntryReader.DOUBLE: + writeAttribute(buf, DOUBLE, tagEntry.tag(), tagEntry.objectValue()); + break; + default: + writeAttribute(buf, STRING, tagEntry.tag(), tagEntry.stringValue()); + } + } + + private static void writeSpanTag( + StreamingBuffer buf, UTF8BytesString key, UTF8BytesString value) { + writeTag(buf, 9, LEN_WIRE_TYPE); + writeAttribute(buf, key, value); + } + + private static void writeSpanTag(StreamingBuffer buf, UTF8BytesString key, CharSequence value) { + writeTag(buf, 9, LEN_WIRE_TYPE); + if (value instanceof UTF8BytesString) { + writeAttribute(buf, key, (UTF8BytesString) value); + } else { + writeAttribute(buf, key, value.toString()); + } + } + + private static void writeSpanTag(StreamingBuffer buf, UTF8BytesString key, long value) { + writeTag(buf, 9, LEN_WIRE_TYPE); + writeAttribute(buf, key, value); + } + + private static int spanKind(CharSequence spanKind) { + if (spanKind == null) { + return 0; // UNSPECIFIED + } else if (SPAN_KIND_SERVER.contentEquals(spanKind)) { + return 2; // SERVER + } else if (SPAN_KIND_CLIENT.contentEquals(spanKind)) { + return 3; // CLIENT + } else if (SPAN_KIND_PRODUCER.contentEquals(spanKind)) { + return 4; // PRODUCER + } else if (SPAN_KIND_CONSUMER.contentEquals(spanKind)) { + return 5; // CONSUMER + } else { + return 1; // INTERNAL + } + } + + public static class MetaWriter implements MetadataConsumer { + private final StreamingBuffer buf; + + private boolean includeProcessTags; + private boolean includeSamplingTags; + + public MetaWriter(StreamingBuffer buf) { + this.buf = buf; + } + + /** Call this to ensure process tags are written out for the next span. */ + public void includeProcessTags() { + includeProcessTags = true; + } + + /** Call this to ensure sampling tags are written out for the next span. */ + public void includeSamplingTags() { + includeSamplingTags = true; + } + + @Override + public void accept(Metadata metadata) { + if ((includeSamplingTags || metadata.topLevel()) && metadata.hasSamplingPriority()) { + writeSpanTag(buf, SAMPLING_PRIORITY_KEY, metadata.samplingPriority()); + } + if (metadata.measured()) { + writeSpanTag(buf, DD_MEASURED, 1); + } + if (metadata.topLevel()) { + writeSpanTag(buf, DD_TOP_LEVEL, 1); + } + + if (metadata.longRunningVersion() != 0) { + if (metadata.longRunningVersion() > 0) { + writeSpanTag(buf, DD_PARTIAL_VERSION, metadata.longRunningVersion()); + } else { + writeSpanTag(buf, DD_WAS_LONG_RUNNING, 1); + } + } + + writeSpanTag(buf, THREAD_ID, metadata.getThreadId()); + writeSpanTag(buf, THREAD_NAME, metadata.getThreadName()); + if (metadata.getHttpStatusCode() != null) { + writeSpanTag(buf, HTTP_STATUS, metadata.getHttpStatusCode()); + } + if (metadata.getOrigin() != null) { + writeSpanTag(buf, ORIGIN_KEY, metadata.getOrigin()); + } + if (includeProcessTags && metadata.processTags() != null) { + writeSpanTag(buf, PROCESS_TAGS_KEY, metadata.processTags()); + } + + metadata.getTags().forEach(buf, OtlpTraceProto::writeSpanTag); + + // reset for next span + includeProcessTags = false; + includeSamplingTags = false; + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java new file mode 100644 index 00000000000..2802a008a55 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java @@ -0,0 +1,201 @@ +package datadog.trace.core.otlp.trace; + +import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; +import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE; +import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordScopedSpansMessage; +import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanLinkMessage; +import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanMessage; + +import datadog.communication.serialization.GrowableBuffer; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; +import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.DDSpan; +import datadog.trace.core.otlp.common.OtlpCommonProto; +import datadog.trace.core.otlp.common.OtlpPayload; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; + +/** + * Collects Datadog traces and marshals them into a chunked 'trace.proto' payload. + * + *

This collector is designed to be called by a single thread. To minimize allocations each + * collection returns a payload only to be used by the calling thread until the next collection. + * (The payload should be copied before passing it onto another thread.) + * + *

We use a single temporary buffer to prepare message chunks at different nesting levels. First + * we chunk all span-links for a given span. Once the span is complete we add the first part of the + * span message and its chunked span-links to the scoped chunks. Once the scope is complete we add + * the first part of the scoped spans message and all its chunks (span messages and any span-links) + * to the payload. Once all the span data has been chunked we add the enclosing resource span + * message to the start of the payload. + */ +public final class OtlpTraceProtoCollector implements OtlpTraceCollector { + + public static final OtlpTraceProtoCollector INSTANCE = new OtlpTraceProtoCollector(); + + private static final OtelInstrumentationScope DEFAULT_TRACE_SCOPE = + new OtelInstrumentationScope("", null, null); + + private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf"; + + private final GrowableBuffer buf = new GrowableBuffer(512); + private final OtlpTraceProto.MetaWriter metaWriter = new OtlpTraceProto.MetaWriter(buf); + + // temporary collections of chunks at different nesting levels + private final Deque payloadChunks = new ArrayDeque<>(); + private final List scopedChunks = new ArrayList<>(); + private final List spanChunks = new ArrayList<>(); + + private boolean payloadStarted; + + // total number of chunked bytes at different nesting levels + private int payloadBytes; + private int scopedBytes; + private int spanBytes; + + private OtelInstrumentationScope currentScope; + private DDSpan currentSpan; + + /** Adds the given trace spans to the collector. */ + @Override + public void addTrace(List> spans) { + if (!payloadStarted) { + start(); + metaWriter.includeProcessTags(); + payloadStarted = true; + } + + for (int i = 0, len = spans.size(); i < len; i++) { + if (i == 0 || i == len - 1) { + metaWriter.includeSamplingTags(); + } + visitSpan(spans.get(i)); + } + } + + /** + * Marshals the traces collected so far into a chunked payload. + * + *

This payload is only valid for the calling thread until the next collection. + */ + @Override + public OtlpPayload collectTraces() { + try { + return completePayload(); + } finally { + stop(); + } + } + + /** Prepare temporary elements to collect trace data. */ + private void start() { + + // clear payloadChunks in case it wasn't fully consumed via OtlpPayload + payloadChunks.clear(); + + // remove stale entries from caches + OtlpCommonProto.recalibrateCaches(); + + // for now put all spans under the default scope + visitScopedSpans(DEFAULT_TRACE_SCOPE); + } + + /** Cleanup elements used to collect trace data. */ + private void stop() { + payloadStarted = false; + + buf.reset(); + + // leave payloadChunks in place so it can be consumed via OtlpPayload + scopedChunks.clear(); + spanChunks.clear(); + + payloadBytes = 0; + scopedBytes = 0; + spanBytes = 0; + + currentScope = null; + currentSpan = null; + } + + private void visitScopedSpans(OtelInstrumentationScope scope) { + if (currentScope != null) { + completeScope(); + } + currentScope = scope; + } + + private void visitSpan(CoreSpan span) { + if (currentSpan != null) { + completeSpan(); + } + currentSpan = (DDSpan) span; + currentSpan.getLinks().forEach(this::visitSpanLink); + } + + private void visitSpanLink(AgentSpanLink spanLink) { + byte[] spanLinkMessage = recordSpanLinkMessage(buf, spanLink); + spanChunks.add(spanLinkMessage); + spanBytes += spanLinkMessage.length; + } + + // called once we've processed all scopes and span messages + private OtlpPayload completePayload() { + if (currentScope != null) { + completeScope(); + } + + if (payloadBytes == 0) { + return OtlpPayload.EMPTY; + } + + // prepend the canned resource chunk + payloadChunks.addFirst(RESOURCE_MESSAGE); + payloadBytes += RESOURCE_MESSAGE.length; + + // finally prepend the total length of all collected chunks + byte[] prefix = recordMessage(buf, 1, payloadBytes); + payloadChunks.addFirst(prefix); + payloadBytes += prefix.length; + + return new OtlpPayload(payloadChunks, payloadBytes, PROTOBUF_CONTENT_TYPE); + } + + // called once we've processed all spans in a specific scope + private void completeScope() { + if (currentSpan != null) { + completeSpan(); + } + + // add scoped spans message prefix to its nested chunks and promote to payload + if (scopedBytes > 0) { + byte[] scopedPrefix = recordScopedSpansMessage(buf, currentScope, scopedBytes); + payloadChunks.add(scopedPrefix); + payloadChunks.addAll(scopedChunks); + payloadBytes += scopedPrefix.length + scopedBytes; + } + + // reset temporary elements for next scope + currentScope = null; + scopedChunks.clear(); + scopedBytes = 0; + } + + // called once we've processed all span-links in a specific span + private void completeSpan() { + + // add span message prefix to its nested chunks and promote to scoped + byte[] spanPrefix = recordSpanMessage(buf, currentSpan, metaWriter, spanBytes); + scopedChunks.add(spanPrefix); + scopedChunks.addAll(spanChunks); + scopedBytes += spanPrefix.length + spanBytes; + + // reset temporary elements for next span + currentSpan = null; + spanChunks.clear(); + spanBytes = 0; + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/DDSpanSerializationTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/DDSpanSerializationTest.groovy index bd308b30731..3613743e9ff 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/DDSpanSerializationTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/DDSpanSerializationTest.groovy @@ -186,7 +186,8 @@ class DDSpanSerializationTest extends DDCoreSpecification { NoopPathwayContext.INSTANCE, false, null, - injectBaggage) + injectBaggage, + true) context.setAllTags(tags) def span = DDSpan.create("test", 0, context, null) CaptureBuffer capture = new CaptureBuffer() @@ -262,7 +263,8 @@ class DDSpanSerializationTest extends DDCoreSpecification { NoopPathwayContext.INSTANCE, false, null, - injectBaggage) + injectBaggage, + true) context.setAllTags(tags) def span = DDSpan.create("test", 0, context, null) CaptureBuffer capture = new CaptureBuffer() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/test/DDCoreSpecification.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/test/DDCoreSpecification.groovy index a0ec1e5f840..5aaf7c5011a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/test/DDCoreSpecification.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/test/DDCoreSpecification.groovy @@ -113,6 +113,7 @@ abstract class DDCoreSpecification extends DDSpecification { false, propagationTags, ProfilingContextIntegration.NoOp.INSTANCE, + true, true) def span = DDSpan.create("test", timestamp, context, null) diff --git a/dd-trace-core/src/test/java/datadog/trace/core/DDCoreJavaSpecification.java b/dd-trace-core/src/test/java/datadog/trace/core/DDCoreJavaSpecification.java index f4aa7688818..c3122aa6a31 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/DDCoreJavaSpecification.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/DDCoreJavaSpecification.java @@ -124,6 +124,7 @@ protected DDSpan buildSpan( false, propagationTags, ProfilingContextIntegration.NoOp.INSTANCE, + true, true); DDSpan span = DDSpan.create("test", timestamp, context, null); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java new file mode 100644 index 00000000000..8768eb31147 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java @@ -0,0 +1,1189 @@ +package datadog.trace.core.otlp.trace; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; +import static java.util.Arrays.asList; +import static java.util.Arrays.copyOfRange; +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.WireFormat; +import datadog.trace.api.DD128bTraceId; +import datadog.trace.api.DDTraceId; +import datadog.trace.api.TracePropagationStyle; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.api.sampling.SamplingMechanism; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.SpanAttributes; +import datadog.trace.bootstrap.instrumentation.api.SpanLink; +import datadog.trace.common.writer.LoggingWriter; +import datadog.trace.core.CoreTracer; +import datadog.trace.core.DDSpan; +import datadog.trace.core.otlp.common.OtlpPayload; +import datadog.trace.core.propagation.ExtractedContext; +import datadog.trace.core.propagation.PropagationTags; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for {@link OtlpTraceProto} via {@link OtlpTraceProtoCollector#collectTraces}. + * + *

Each test case builds real {@link DDSpan} instances via a shared {@link CoreTracer}, collects + * them using {@link OtlpTraceProtoCollector}, drains the resulting chunked payload into a + * contiguous byte array, and then parses it back using protobuf's {@link CodedInputStream} to + * verify the wire encoding against the OpenTelemetry trace proto schema. + * + *

Relevant proto field numbers (from {@code opentelemetry/proto/trace/v1/trace.proto}): + * + *

+ *   TracesData      { ResourceSpans resource_spans = 1; }
+ *   ResourceSpans   { Resource resource = 1; ScopeSpans scope_spans = 2; }
+ *   ScopeSpans      { InstrumentationScope scope = 1; Span spans = 2; string schema_url = 3; }
+ *   InstrumentationScope { string name = 1; string version = 2; }
+ *   Span            { bytes trace_id = 1; bytes span_id = 2; string trace_state = 3;
+ *                     bytes parent_span_id = 4; string name = 5; SpanKind kind = 6;
+ *                     fixed64 start_time_unix_nano = 7; fixed64 end_time_unix_nano = 8;
+ *                     KeyValue attributes = 9; Link links = 13; Status status = 15;
+ *                     fixed32 flags = 16; }
+ *   Status          { string message = 2; StatusCode code = 3; }
+ *   Link            { bytes trace_id = 1; bytes span_id = 2; string trace_state = 3;
+ *                     KeyValue attributes = 4; fixed32 flags = 6; }
+ * 
+ */ +class OtlpTraceProtoTest { + + static final CoreTracer TRACER = CoreTracer.builder().writer(new LoggingWriter()).build(); + + // ── spec classes (test-data descriptors) ────────────────────────────────── + + static final class SpanSpec { + /** Span resource name → Span.name (proto field 5). */ + final String resourceName; + + /** Passed to {@code startSpan} → attribute "operation.name". */ + final String operationName; + + /** Span type → attribute "span.type". */ + final String spanType; + + /** Span kind tag value; {@code null} → UNSPECIFIED (kind=0). */ + final String spanKind; + + /** Start time in microseconds since epoch → start_time_unix_nano = startMicros * 1000. */ + final long startMicros; + + /** Finish time in microseconds since epoch → end_time_unix_nano = finishMicros * 1000. */ + final long finishMicros; + + /** If true, marks the span as an error → status.code=ERROR(2). */ + final boolean error; + + /** Optional error message → status.message; ignored when {@code error} is false. */ + final String errorMessage; + + /** Sampling priority to set; 0 = not set explicitly. */ + final int samplingPriority; + + /** Override service name; {@code null} → use tracer default. */ + final String serviceName; + + /** Additional tags to set on the span, exercising string/long/boolean/double paths. */ + final Map extraTags; + + /** + * If ≥ 0, index into the already-built span list to use as parent; creates a child span. If -1, + * the span is a root span. + */ + final int parentIndex; + + /** + * Links to add to this span (one {@link SpanLink} per entry). Each link targets a span that + * precedes this one in the list. An empty array means no links. + */ + final LinkSpec[] links; + + /** If true, the span is measured (sets the {@code _dd.measured} attribute). */ + boolean measured; + + /** Non-zero HTTP status code to set via {@code setHttpStatusCode}; 0 = not set. */ + int httpStatusCode; + + /** + * If true, starts the span under a synthetic {@link ExtractedContext} carrying a known 128-bit + * trace ID, exercising the high-order bytes of {@code writeTraceId}. + */ + boolean use128BitTraceId; + + /** Trace origin carried in the extracted parent context; {@code null} = no origin. */ + String origin; + + SpanSpec( + String resourceName, + String operationName, + String spanType, + String spanKind, + long startMicros, + long finishMicros, + boolean error, + String errorMessage, + int samplingPriority, + String serviceName, + Map extraTags, + int parentIndex, + LinkSpec... links) { + this.resourceName = resourceName; + this.operationName = operationName; + this.spanType = spanType; + this.spanKind = spanKind; + this.startMicros = startMicros; + this.finishMicros = finishMicros; + this.error = error; + this.errorMessage = errorMessage; + this.samplingPriority = samplingPriority; + this.serviceName = serviceName; + this.extraTags = extraTags; + this.parentIndex = parentIndex; + this.links = links; + } + + SpanSpec measured() { + this.measured = true; + return this; + } + + SpanSpec httpStatusCode(int code) { + this.httpStatusCode = code; + return this; + } + + SpanSpec use128BitTraceId() { + this.use128BitTraceId = true; + return this; + } + + /** + * Sets the origin propagated via an {@link ExtractedContext} parent so that {@code + * metadata.getOrigin()} is non-null and the {@code _dd.origin} attribute is written. + */ + SpanSpec origin(String origin) { + this.origin = origin; + return this; + } + } + + /** + * Descriptor for a single span link: target span index, optional attributes, tracestate, and + * flags. + */ + static final class LinkSpec { + final int targetIndex; + final SpanAttributes attributes; + final String traceState; + final byte traceFlags; + + LinkSpec(int targetIndex) { + this(targetIndex, SpanAttributes.EMPTY, "", SpanLink.DEFAULT_FLAGS); + } + + LinkSpec(int targetIndex, SpanAttributes attributes) { + this(targetIndex, attributes, "", SpanLink.DEFAULT_FLAGS); + } + + LinkSpec(int targetIndex, SpanAttributes attributes, String traceState, byte traceFlags) { + this.targetIndex = targetIndex; + this.attributes = attributes; + this.traceState = traceState; + this.traceFlags = traceFlags; + } + } + + // ── shorthand builders ──────────────────────────────────────────────────── + + private static final long BASE_MICROS = 1_700_000_000_000_000L; + private static final long DURATION_MICROS = 500_000L; // 500 ms + + /** + * A known 128-bit trace ID used by {@link SpanSpec#use128BitTraceId} test cases. High-order bits + * are non-zero so the test can assert the proto encodes them correctly. + */ + static final DD128bTraceId TRACE_ID_128BIT = + DD128bTraceId.from(0x0123456789abcdefL, 0xfedcba9876543210L); + + private static SpanSpec span(String resourceName, String operationName, String spanType) { + return new SpanSpec( + resourceName, + operationName, + spanType, + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + new HashMap<>(), + -1); + } + + private static SpanSpec kindSpan(String resourceName, String kind) { + return new SpanSpec( + resourceName, + "op." + kind, + "web", + kind, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + new HashMap<>(), + -1); + } + + private static SpanSpec sampledSpan(String resourceName) { + return new SpanSpec( + resourceName, + "op.sampled", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + PrioritySampling.USER_KEEP, + null, + new HashMap<>(), + -1); + } + + private static SpanSpec errorSpan(String resourceName, String errorMessage) { + return new SpanSpec( + resourceName, + "op.error", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + true, + errorMessage, + 0, + null, + new HashMap<>(), + -1); + } + + private static SpanSpec taggedSpan(String resourceName, Map extraTags) { + return new SpanSpec( + resourceName, + "op.tagged", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + extraTags, + -1); + } + + private static SpanSpec childSpan(String resourceName, int parentIndex) { + return new SpanSpec( + resourceName, + "op.child", + "web", + null, + BASE_MICROS + 10_000, + BASE_MICROS + DURATION_MICROS - 10_000, + false, + null, + 0, + null, + new HashMap<>(), + parentIndex); + } + + private static SpanSpec serviceSpan(String resourceName, String serviceName) { + return new SpanSpec( + resourceName, + "op.service", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + serviceName, + new HashMap<>(), + -1); + } + + /** A span with {@link SpanLink}s pointing to the spans at the given {@code targetIndices}. */ + private static SpanSpec linkedSpan(String resourceName, int... targetIndices) { + LinkSpec[] links = new LinkSpec[targetIndices.length]; + for (int i = 0; i < targetIndices.length; i++) { + links[i] = new LinkSpec(targetIndices[i]); + } + return new SpanSpec( + resourceName, + "op.linked", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + new HashMap<>(), + -1, + links); + } + + /** + * A span with one {@link SpanLink} pointing to the span at {@code targetIndex}, carrying the + * given {@link SpanAttributes}. + */ + private static SpanSpec linkedSpanWithAttrs( + String resourceName, int targetIndex, SpanAttributes attributes) { + return new SpanSpec( + resourceName, + "op.linked", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + new HashMap<>(), + -1, + new LinkSpec(targetIndex, attributes)); + } + + /** A span with one {@link SpanLink} carrying the given W3C tracestate string. */ + private static SpanSpec linkedSpanWithTracestate( + String resourceName, int targetIndex, String traceState) { + return new SpanSpec( + resourceName, + "op.linked", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + new HashMap<>(), + -1, + new LinkSpec(targetIndex, SpanAttributes.EMPTY, traceState, SpanLink.DEFAULT_FLAGS)); + } + + /** A span with one {@link SpanLink} carrying the given trace flags. */ + private static SpanSpec linkedSpanWithFlags( + String resourceName, int targetIndex, byte traceFlags) { + return new SpanSpec( + resourceName, + "op.linked", + "web", + null, + BASE_MICROS, + BASE_MICROS + DURATION_MICROS, + false, + null, + 0, + null, + new HashMap<>(), + -1, + new LinkSpec(targetIndex, SpanAttributes.EMPTY, "", traceFlags)); + } + + private static Map tags(Object... keyValues) { + Map map = new HashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + map.put((String) keyValues[i], keyValues[i + 1]); + } + return map; + } + + // ── test cases ───────────────────────────────────────────────────────────── + + static Stream cases() { + return Stream.of( + // ── empty ───────────────────────────────────────────────────────────── + Arguments.of("empty — no spans produces empty payload", emptyList()), + + // ── span kinds ──────────────────────────────────────────────────────── + Arguments.of( + "minimal span — default UNSPECIFIED kind", + asList(span("GET /api/users", "servlet.request", "web"))), + Arguments.of("internal span kind", asList(kindSpan("GET /api/users", SPAN_KIND_INTERNAL))), + Arguments.of("server span kind", asList(kindSpan("GET /api/users", SPAN_KIND_SERVER))), + Arguments.of("client span kind", asList(kindSpan("redis.get", SPAN_KIND_CLIENT))), + Arguments.of("producer span kind", asList(kindSpan("kafka.produce", SPAN_KIND_PRODUCER))), + Arguments.of("consumer span kind", asList(kindSpan("kafka.consume", SPAN_KIND_CONSUMER))), + + // ── sampling flags ──────────────────────────────────────────────────── + Arguments.of( + "sampled span — SAMPLED flag set in flags field", asList(sampledSpan("GET /health"))), + + // ── error status ────────────────────────────────────────────────────── + Arguments.of( + "error span — status.code=ERROR, no message", + asList(errorSpan("POST /api/data", null))), + Arguments.of( + "error span with message — status.message set", + asList(errorSpan("POST /api/data", "NullPointerException: value was null"))), + + // ── tag types ───────────────────────────────────────────────────────── + Arguments.of( + "span with string tag", asList(taggedSpan("tagged.op", tags("http.method", "GET")))), + Arguments.of( + "span with long tag", asList(taggedSpan("tagged.op", tags("http.status_code", 200L)))), + Arguments.of( + "span with boolean tag", + asList(taggedSpan("tagged.op", tags("http.ssl", Boolean.TRUE)))), + Arguments.of( + "span with double tag", + asList(taggedSpan("tagged.op", tags("net.bytes_sent", 1024.5)))), + Arguments.of( + "span with multiple mixed tag types", + asList( + taggedSpan( + "multi.tagged", + tags( + "http.method", + "POST", + "http.status_code", + 201L, + "http.ssl", + Boolean.FALSE, + "latency.ms", + 3.14)))), + + // ── parent–child relationship ───────────────────────────────────────── + Arguments.of( + "child span — parent_span_id must be set", + asList(span("parent.op", "parent.op", "web"), childSpan("child.op", 0))), + + // ── custom service name ─────────────────────────────────────────────── + Arguments.of( + "span with different service name — service.name attribute written", + asList(serviceSpan("GET /users", "my-custom-service"))), + + // ── span links ──────────────────────────────────────────────────────── + Arguments.of( + "span with one link — link encodes target trace_id and span_id", + asList(span("anchor.op", "anchor.op", "web"), linkedSpan("linked.op", 0))), + Arguments.of( + "span with multiple links to different spans", + asList( + span("target.a", "op.a", "web"), + span("target.b", "op.b", "web"), + linkedSpan("multi.linked", 0, 1))), + Arguments.of( + "span link with attributes — link attributes written to proto", + asList( + span("anchor.op", "anchor.op", "web"), + linkedSpanWithAttrs( + "attr.linked", + 0, + SpanAttributes.builder().put("link.source", "test").build()))), + Arguments.of( + "span link with tracestate — Link.trace_state field written", + asList( + span("anchor.op", "anchor.op", "web"), + linkedSpanWithTracestate("tracestate.linked", 0, "vendor=abc;p=123"))), + Arguments.of( + "span link with non-default flags — extra flag bit preserved alongside SAMPLED", + asList( + span("anchor.op", "anchor.op", "web"), + linkedSpanWithFlags("flags.linked", 0, (byte) 0x02))), + + // ── metadata paths ──────────────────────────────────────────────────── + Arguments.of( + "measured span — _dd.measured attribute written", + asList(span("measured.op", "op.measured", "web").measured())), + Arguments.of( + "span with http status code — http.status_code written via setHttpStatusCode", + asList(span("GET /resource", "servlet.request", "web").httpStatusCode(404))), + Arguments.of( + "span with origin — _dd.origin attribute written", + asList(span("GET /api", "servlet.request", "web").origin("rum"))), + Arguments.of( + "span with 128-bit trace ID — high-order trace_id bytes non-zero", + asList(span("GET /api", "servlet.request", "web").use128BitTraceId())), + + // ── multiple spans in one payload ───────────────────────────────────── + Arguments.of( + "multiple spans — three spans under the same default scope", + asList( + span("first.span", "op.first", "db"), + span("second.span", "op.second", "web"), + kindSpan("third.span", SPAN_KIND_SERVER)))); + } + + // ── parameterized test ──────────────────────────────────────────────────── + + @ParameterizedTest(name = "{0}") + @MethodSource("cases") + void testCollectTraces(String caseName, List specs) throws IOException { + List spans = buildSpans(specs); + + OtlpTraceProtoCollector.INSTANCE.addTrace(spans); + OtlpPayload payload = OtlpTraceProtoCollector.INSTANCE.collectTraces(); + + if (spans.isEmpty()) { + assertEquals(0, payload.getContentLength(), "empty span list must produce empty payload"); + return; + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getContentLength()); + payload.drain(baos::write); + byte[] bytes = baos.toByteArray(); + assertTrue(bytes.length > 0, "non-empty span list must produce bytes"); + + // ── parse TracesData ───────────────────────────────────────────────── + // Full payload encodes a single TracesData.resource_spans entry (field 1, LEN). + CodedInputStream td = CodedInputStream.newInstance(bytes); + int tdTag = td.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(tdTag), "TracesData.resource_spans is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tdTag)); + CodedInputStream rs = td.readBytes().newCodedInput(); + assertTrue(td.isAtEnd(), "expected exactly one ResourceSpans"); + + // ── parse ResourceSpans ────────────────────────────────────────────── + // Fields: resource=1, scope_spans=2 + boolean resourceFound = false; + CodedInputStream ss = null; + while (!rs.isAtEnd()) { + int rsTag = rs.readTag(); + switch (WireFormat.getTagFieldNumber(rsTag)) { + case 1: + verifyResource(rs.readBytes().newCodedInput()); + resourceFound = true; + break; + case 2: + ss = rs.readBytes().newCodedInput(); + break; + default: + rs.skipField(rsTag); + } + } + assertTrue(resourceFound, "Resource must be present in ResourceSpans"); + assertNotNull(ss, "ScopeSpans must be present in ResourceSpans"); + + // ── parse ScopeSpans ───────────────────────────────────────────────── + // Fields: scope=1, spans=2 (repeated), schema_url=3 + List spanBlobs = new ArrayList<>(); + while (!ss.isAtEnd()) { + int ssTag = ss.readTag(); + switch (WireFormat.getTagFieldNumber(ssTag)) { + case 1: + verifyDefaultScope(ss.readBytes().newCodedInput()); + break; + case 2: + spanBlobs.add(ss.readBytes().toByteArray()); + break; + default: + ss.skipField(ssTag); + } + } + assertEquals(spans.size(), spanBlobs.size(), "span count mismatch in case: " + caseName); + + // ── verify each span ───────────────────────────────────────────────── + for (int i = 0; i < spans.size(); i++) { + verifySpan( + CodedInputStream.newInstance(spanBlobs.get(i)), spans.get(i), specs.get(i), caseName); + } + } + + @Test + void testCollectMultipleTraces() throws IOException { + // Three independent traces — each root span gets its own auto-generated trace ID. + List trace1 = + buildSpans(asList(span("trace1.root", "op.root", "web"), childSpan("trace1.child", 0))); + List trace2 = buildSpans(asList(span("trace2.root", "op.root", "db"))); + List trace3 = + buildSpans( + asList( + span("trace3.a", "op.a", "web"), + span("trace3.b", "op.b", "web"), + span("trace3.c", "op.c", "web"))); + + // Sanity: all three traces must have distinct trace IDs. + DDTraceId traceId1 = trace1.get(0).getTraceId(); + DDTraceId traceId2 = trace2.get(0).getTraceId(); + DDTraceId traceId3 = trace3.get(0).getTraceId(); + assertNotEquals(traceId1, traceId2, "trace IDs must be distinct"); + assertNotEquals(traceId2, traceId3, "trace IDs must be distinct"); + assertNotEquals(traceId1, traceId3, "trace IDs must be distinct"); + + OtlpTraceProtoCollector.INSTANCE.addTrace(trace1); + OtlpTraceProtoCollector.INSTANCE.addTrace(trace2); + OtlpTraceProtoCollector.INSTANCE.addTrace(trace3); + OtlpPayload payload = OtlpTraceProtoCollector.INSTANCE.collectTraces(); + + // Collect all span IDs we expect to find across all three traces. + Set expectedSpanIds = new HashSet<>(); + Set expectedTraceIds = new HashSet<>(); + for (DDSpan s : trace1) { + expectedSpanIds.add(s.getSpanId()); + expectedTraceIds.add(s.getTraceId().toLong()); + } + for (DDSpan s : trace2) { + expectedSpanIds.add(s.getSpanId()); + expectedTraceIds.add(s.getTraceId().toLong()); + } + for (DDSpan s : trace3) { + expectedSpanIds.add(s.getSpanId()); + expectedTraceIds.add(s.getTraceId().toLong()); + } + int totalSpans = trace1.size() + trace2.size() + trace3.size(); // 6 + + ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getContentLength()); + payload.drain(baos::write); + byte[] bytes = baos.toByteArray(); + assertTrue(bytes.length > 0, "multi-trace payload must be non-empty"); + + // Parse TracesData → ResourceSpans → ScopeSpans → extract span_id and trace_id per span. + CodedInputStream td = CodedInputStream.newInstance(bytes); + int tdTag = td.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(tdTag), "TracesData.resource_spans is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tdTag)); + CodedInputStream rs = td.readBytes().newCodedInput(); + assertTrue(td.isAtEnd(), "expected exactly one ResourceSpans"); + + CodedInputStream ss = null; + while (!rs.isAtEnd()) { + int rsTag = rs.readTag(); + if (WireFormat.getTagFieldNumber(rsTag) == 2) { + ss = rs.readBytes().newCodedInput(); + } else { + rs.skipField(rsTag); + } + } + assertNotNull(ss, "ScopeSpans must be present in ResourceSpans"); + + Set parsedSpanIds = new HashSet<>(); + Set parsedTraceIds = new HashSet<>(); + while (!ss.isAtEnd()) { + int ssTag = ss.readTag(); + if (WireFormat.getTagFieldNumber(ssTag) == 2) { + CodedInputStream sp = ss.readBytes().newCodedInput(); + byte[] parsedTraceId = null; + byte[] parsedSpanId = null; + while (!sp.isAtEnd()) { + int spTag = sp.readTag(); + switch (WireFormat.getTagFieldNumber(spTag)) { + case 1: + parsedTraceId = sp.readBytes().toByteArray(); + break; + case 2: + parsedSpanId = sp.readBytes().toByteArray(); + break; + default: + sp.skipField(spTag); + } + } + assertNotNull(parsedSpanId, "span_id must be present in every span"); + assertNotNull(parsedTraceId, "trace_id must be present in every span"); + assertEquals(16, parsedTraceId.length, "trace_id must be 16 bytes"); + assertEquals(8, parsedSpanId.length, "span_id must be 8 bytes"); + parsedSpanIds.add(readLittleEndianLong(parsedSpanId)); + parsedTraceIds.add(readLittleEndianLong(parsedTraceId)); + } else { + ss.skipField(ssTag); + } + } + + assertEquals( + totalSpans, parsedSpanIds.size(), "all spans from all traces must appear in payload"); + assertEquals(expectedSpanIds, parsedSpanIds, "span IDs in payload must match those built"); + assertEquals( + expectedTraceIds.size(), + parsedTraceIds.size(), + "payload must contain spans with all three distinct trace IDs"); + } + + // ── span construction ───────────────────────────────────────────────────── + + /** Builds {@link DDSpan} instances from the given specs, collecting them in order. */ + private static List buildSpans(List specs) { + List spans = new ArrayList<>(specs.size()); + for (SpanSpec spec : specs) { + AgentSpan agentSpan; + if (spec.use128BitTraceId) { + ExtractedContext parent128 = + new ExtractedContext( + TRACE_ID_128BIT, + 0L, + PrioritySampling.UNSET, + null, + PropagationTags.factory().empty(), + TracePropagationStyle.DATADOG); + agentSpan = TRACER.startSpan("test", spec.operationName, parent128, spec.startMicros); + } else if (spec.origin != null) { + ExtractedContext parentWithOrigin = + new ExtractedContext( + DDTraceId.ONE, + 0L, + PrioritySampling.UNSET, + spec.origin, + PropagationTags.factory().empty(), + TracePropagationStyle.DATADOG); + agentSpan = + TRACER.startSpan("test", spec.operationName, parentWithOrigin, spec.startMicros); + } else if (spec.parentIndex >= 0) { + agentSpan = + TRACER.startSpan( + "test", + spec.operationName, + spans.get(spec.parentIndex).context(), + spec.startMicros); + } else { + agentSpan = TRACER.startSpan("test", spec.operationName, spec.startMicros); + } + + agentSpan.setResourceName(spec.resourceName); + agentSpan.setSpanType(spec.spanType); + + if (spec.spanKind != null) { + agentSpan.setTag(SPAN_KIND, spec.spanKind); + } + if (spec.serviceName != null) { + agentSpan.setServiceName(spec.serviceName); + } + if (spec.samplingPriority != 0) { + agentSpan.setSamplingPriority(spec.samplingPriority, SamplingMechanism.DEFAULT); + } + if (spec.error) { + agentSpan.setError(true); + if (spec.errorMessage != null) { + agentSpan.setErrorMessage(spec.errorMessage); + } + } + if (spec.measured) { + agentSpan.setMeasured(true); + } + if (spec.httpStatusCode != 0) { + agentSpan.setHttpStatusCode(spec.httpStatusCode); + } + + spec.extraTags.forEach( + (key, value) -> { + if (value instanceof String) agentSpan.setTag(key, (String) value); + else if (value instanceof Long) agentSpan.setTag(key, (long) (Long) value); + else if (value instanceof Boolean) agentSpan.setTag(key, (boolean) (Boolean) value); + else if (value instanceof Double) agentSpan.setTag(key, (double) (Double) value); + }); + + for (LinkSpec link : spec.links) { + agentSpan.addLink( + SpanLink.from( + spans.get(link.targetIndex).context(), + link.traceFlags, + link.traceState, + link.attributes)); + } + + agentSpan.finish(spec.finishMicros); + spans.add((DDSpan) agentSpan); + } + return spans; + } + + // ── verification helpers ────────────────────────────────────────────────── + + /** + * Parses a {@code Resource} message body and asserts it contains a {@code service.name} + * attribute. + * + *
+   *   Resource { repeated KeyValue attributes = 1; }
+   * 
+ */ + private static void verifyResource(CodedInputStream res) throws IOException { + boolean foundServiceName = false; + while (!res.isAtEnd()) { + int tag = res.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { + String key = readKeyValueKey(res.readBytes().newCodedInput()); + if ("service.name".equals(key)) { + foundServiceName = true; + } + } else { + res.skipField(tag); + } + } + assertTrue(foundServiceName, "Resource must contain a 'service.name' attribute"); + } + + /** + * Parses an {@code InstrumentationScope} message body and asserts the scope name is the empty + * string used by the default trace scope. + * + *
+   *   InstrumentationScope { string name = 1; string version = 2; }
+   * 
+ */ + private static void verifyDefaultScope(CodedInputStream scope) throws IOException { + String parsedName = null; + while (!scope.isAtEnd()) { + int tag = scope.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { + parsedName = scope.readString(); + } else { + scope.skipField(tag); + } + } + assertEquals("", parsedName, "default trace scope must have an empty name"); + } + + /** + * Parses a {@code Span} message body and asserts all fields match the given DDSpan and SpanSpec. + * + *
+   *   Span { trace_id=1, span_id=2, trace_state=3, parent_span_id=4, name=5, kind=6,
+   *          start_time_unix_nano=7, end_time_unix_nano=8, attributes=9 (repeated),
+   *          links=13 (repeated), status=15, flags=16 }
+   * 
+ */ + private static void verifySpan(CodedInputStream sp, DDSpan span, SpanSpec spec, String caseName) + throws IOException { + byte[] parsedTraceId = null; + byte[] parsedSpanId = null; + byte[] parsedParentSpanId = null; + String parsedName = null; + int parsedKind = -1; + long parsedStartNano = -1; + long parsedEndNano = -1; + int parsedFlags = 0; + boolean statusFound = false; + boolean statusIsError = false; + String parsedStatusMessage = null; + Set attrKeys = new HashSet<>(); + int linkCount = 0; + + while (!sp.isAtEnd()) { + int tag = sp.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: + parsedTraceId = sp.readBytes().toByteArray(); + break; + case 2: + parsedSpanId = sp.readBytes().toByteArray(); + break; + case 3: + sp.skipField( + tag); // trace_state: absent for locally-started spans, present when propagated + break; + case 4: + parsedParentSpanId = sp.readBytes().toByteArray(); + break; + case 5: + parsedName = sp.readString(); + break; + case 6: + parsedKind = sp.readEnum(); + break; + case 7: + parsedStartNano = sp.readFixed64(); + break; + case 8: + parsedEndNano = sp.readFixed64(); + break; + case 9: + attrKeys.add(readKeyValueKey(sp.readBytes().newCodedInput())); + break; + case 13: + verifyLink(sp.readBytes().newCodedInput(), spec.links[linkCount], caseName); + linkCount++; + break; + case 15: + { + CodedInputStream status = sp.readBytes().newCodedInput(); + statusFound = true; + while (!status.isAtEnd()) { + int st = status.readTag(); + switch (WireFormat.getTagFieldNumber(st)) { + case 2: + parsedStatusMessage = status.readString(); + break; + case 3: + statusIsError = status.readEnum() == 2; // STATUS_CODE_ERROR = 2 + break; + default: + status.skipField(st); + } + } + break; + } + case 16: + parsedFlags = sp.readFixed32(); + break; + default: + sp.skipField(tag); + } + } + + // ── trace_id (field 1): 16 bytes ───────────────────────────────────────── + assertNotNull(parsedTraceId, "trace_id must be present [" + caseName + "]"); + assertEquals(16, parsedTraceId.length, "trace_id must be 16 bytes [" + caseName + "]"); + if (spec.use128BitTraceId) { + // high-order bytes occupy parsedTraceId[8..15] (little-endian in the wire format) + long highOrderBytes = readLittleEndianLong(copyOfRange(parsedTraceId, 8, 16)); + assertNotEquals( + 0L, + highOrderBytes, + "128-bit trace_id high-order bytes must be non-zero [" + caseName + "]"); + } + + // ── span_id (field 2): 8 bytes, encodes span.getSpanId() ───────────────── + assertNotNull(parsedSpanId, "span_id must be present [" + caseName + "]"); + assertEquals(8, parsedSpanId.length, "span_id must be 8 bytes [" + caseName + "]"); + assertEquals( + span.getSpanId(), + readLittleEndianLong(parsedSpanId), + "span_id mismatch [" + caseName + "]"); + + // ── parent_span_id (field 4) ────────────────────────────────────────────── + if (spec.parentIndex >= 0) { + assertNotNull( + parsedParentSpanId, "parent_span_id must be present for child span [" + caseName + "]"); + assertEquals( + 8, parsedParentSpanId.length, "parent_span_id must be 8 bytes [" + caseName + "]"); + assertEquals( + span.getParentId(), + readLittleEndianLong(parsedParentSpanId), + "parent_span_id mismatch [" + caseName + "]"); + } else { + // root spans either omit the field or write zero bytes + if (parsedParentSpanId != null) { + assertEquals( + 0L, + readLittleEndianLong(parsedParentSpanId), + "root span parent_span_id must be zero [" + caseName + "]"); + } + } + + // ── name (field 5): resource name ───────────────────────────────────────── + assertEquals( + spec.resourceName, parsedName, "Span.name (resource name) mismatch [" + caseName + "]"); + + // ── kind (field 6): SpanKind enum ───────────────────────────────────────── + assertEquals(expectedKind(spec.spanKind), parsedKind, "kind mismatch [" + caseName + "]"); + + // ── start_time_unix_nano (field 7) ──────────────────────────────────────── + assertEquals( + spec.startMicros * 1000L, + parsedStartNano, + "start_time_unix_nano mismatch [" + caseName + "]"); + + // ── end_time_unix_nano (field 8) ────────────────────────────────────────── + assertEquals( + spec.finishMicros * 1000L, parsedEndNano, "end_time_unix_nano mismatch [" + caseName + "]"); + + // ── flags (field 16): SAMPLED flag reflects span.samplingPriority() > 0 ── + // The default tracer sampler keeps all spans (priority > 0), so the SAMPLED flag is set for + // every span. We verify it is set when we've explicitly requested it; we don't assert it is + // absent otherwise because the default sampler may still set a positive priority. + if (spec.samplingPriority > 0) { + assertTrue( + (parsedFlags & OtlpTraceProto.SAMPLED_TRACE_FLAG) != 0, + "SAMPLED flag must be set in flags [" + caseName + "]"); + } + + // ── attributes (field 9): mandatory Datadog attributes ─────────────────── + assertTrue( + attrKeys.contains("resource.name"), + "attributes must include 'resource.name' [" + caseName + "]"); + assertTrue( + attrKeys.contains("operation.name"), + "attributes must include 'operation.name' [" + caseName + "]"); + assertTrue( + attrKeys.contains("span.type"), "attributes must include 'span.type' [" + caseName + "]"); + + // service.name attribute is written only when the span's service differs from the default + if (spec.serviceName != null) { + assertTrue( + attrKeys.contains("service.name"), + "attributes must include 'service.name' when service is overridden [" + caseName + "]"); + } + + // extra user tags must appear as attributes + for (String key : spec.extraTags.keySet()) { + assertTrue( + attrKeys.contains(key), + "attributes must include extra tag '" + key + "' [" + caseName + "]"); + } + + if (spec.measured) { + assertTrue( + attrKeys.contains("_dd.measured"), + "attributes must include '_dd.measured' for measured spans [" + caseName + "]"); + } + if (spec.httpStatusCode != 0) { + assertTrue( + attrKeys.contains("http.status_code"), + "attributes must include 'http.status_code' when set via setHttpStatusCode [" + + caseName + + "]"); + } + if (spec.origin != null) { + assertTrue( + attrKeys.contains("_dd.origin"), + "attributes must include '_dd.origin' when origin is set [" + caseName + "]"); + } + + // ── status (field 15) ───────────────────────────────────────────────────── + if (spec.error) { + assertTrue(statusFound, "status must be present for error span [" + caseName + "]"); + assertTrue(statusIsError, "status.code must be ERROR(2) [" + caseName + "]"); + if (spec.errorMessage != null) { + assertEquals( + spec.errorMessage, parsedStatusMessage, "status.message mismatch [" + caseName + "]"); + } else { + assertNull( + parsedStatusMessage, "status.message must be absent when not set [" + caseName + "]"); + } + } else { + assertFalse(statusFound, "status must be absent for non-error span [" + caseName + "]"); + } + + // ── links (field 13) ────────────────────────────────────────────────────── + assertEquals(spec.links.length, linkCount, "link count mismatch [" + caseName + "]"); + } + + /** + * Parses a {@code Span.Link} message body and verifies trace_id, span_id, and (if expected) link + * attributes are present. + * + *
+   *   Link { bytes trace_id = 1; bytes span_id = 2; string trace_state = 3;
+   *          KeyValue attributes = 4; fixed32 flags = 6; }
+   * 
+ */ + private static void verifyLink(CodedInputStream link, LinkSpec linkSpec, String caseName) + throws IOException { + byte[] traceId = null; + byte[] spanId = null; + String parsedTraceState = null; + Set linkAttrKeys = new HashSet<>(); + int parsedFlags = 0; + while (!link.isAtEnd()) { + int tag = link.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: + traceId = link.readBytes().toByteArray(); + break; + case 2: + spanId = link.readBytes().toByteArray(); + break; + case 3: + parsedTraceState = link.readString(); + break; + case 4: + linkAttrKeys.add(readKeyValueKey(link.readBytes().newCodedInput())); + break; + case 6: + parsedFlags = link.readFixed32(); + break; + default: + link.skipField(tag); + } + } + assertNotNull(traceId, "Link.trace_id must be present [" + caseName + "]"); + assertEquals(16, traceId.length, "Link.trace_id must be 16 bytes [" + caseName + "]"); + assertNotNull(spanId, "Link.span_id must be present [" + caseName + "]"); + assertEquals(8, spanId.length, "Link.span_id must be 8 bytes [" + caseName + "]"); + if (!linkSpec.traceState.isEmpty()) { + assertEquals( + linkSpec.traceState, parsedTraceState, "Link.trace_state mismatch [" + caseName + "]"); + } + // SpanLink.from() ORs in the SAMPLED_FLAG (0x01) when the target context has positive + // sampling priority, which all test anchor spans have via the default tracer sampler. + int expectedFlags = Byte.toUnsignedInt((byte) (linkSpec.traceFlags | 0x01)); + assertEquals(expectedFlags, parsedFlags, "Link.flags mismatch [" + caseName + "]"); + for (String expectedKey : linkSpec.attributes.asMap().keySet()) { + assertTrue( + linkAttrKeys.contains(expectedKey), + "Link attributes must include '" + expectedKey + "' [" + caseName + "]"); + } + } + + // ── proto parsing helpers ───────────────────────────────────────────────── + + /** + * Returns the expected SpanKind enum value for the given Datadog span kind tag value. + * + *
+   *   SPAN_KIND_UNSPECIFIED = 0  (default)
+   *   SPAN_KIND_INTERNAL    = 1
+   *   SPAN_KIND_SERVER      = 2
+   *   SPAN_KIND_CLIENT      = 3
+   *   SPAN_KIND_PRODUCER    = 4
+   *   SPAN_KIND_CONSUMER    = 5
+   * 
+ */ + private static int expectedKind(String spanKind) { + if (SPAN_KIND_INTERNAL.equals(spanKind)) return 1; + if (SPAN_KIND_SERVER.equals(spanKind)) return 2; + if (SPAN_KIND_CLIENT.equals(spanKind)) return 3; + if (SPAN_KIND_PRODUCER.equals(spanKind)) return 4; + if (SPAN_KIND_CONSUMER.equals(spanKind)) return 5; + return 0; // UNSPECIFIED + } + + /** + * Reads a {@code KeyValue} body and returns the key (field 1). The value is skipped; its encoding + * is covered by {@code OtlpCommonProtoTest}. + */ + private static String readKeyValueKey(CodedInputStream kv) throws IOException { + String key = null; + while (!kv.isAtEnd()) { + int tag = kv.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { + key = kv.readString(); + } else { + kv.skipField(tag); + } + } + return key; + } + + /** Reads a little-endian 64-bit integer from the first 8 bytes of the given array. */ + private static long readLittleEndianLong(byte[] bytes) { + long value = 0; + for (int i = 7; i >= 0; i--) { + value = (value << 8) | (bytes[i] & 0xFF); + } + return value; + } +} diff --git a/dd-trace-core/src/test/resources/opentelemetry/proto/trace/v1/trace.proto b/dd-trace-core/src/test/resources/opentelemetry/proto/trace/v1/trace.proto new file mode 100644 index 00000000000..66897fb6fa0 --- /dev/null +++ b/dd-trace-core/src/test/resources/opentelemetry/proto/trace/v1/trace.proto @@ -0,0 +1,359 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.trace.v1; + +import "opentelemetry/proto/common/v1/common.proto"; +import "opentelemetry/proto/resource/v1/resource.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Trace.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.trace.v1"; +option java_outer_classname = "TraceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/trace/v1"; + +// TracesData represents the traces data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP traces data but do +// not implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message TracesData { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceSpans resource_spans = 1; +} + +// A collection of ScopeSpans from a Resource. +message ResourceSpans { + reserved 1000; + + // The resource for the spans in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeSpans that originate from a resource. + repeated ScopeSpans scope_spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the resource data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_spans" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationScope. +message ScopeSpans { + // The instrumentation scope information for the spans in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of Spans that originate from an instrumentation scope. + repeated Span spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the span data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "scope" field and all spans and span + // events in the "spans" field. + string schema_url = 3; +} + +// A Span represents a single operation performed by a single component of the system. +// +// The next available field id is 17. +message Span { + // A unique identifier for a trace. All spans from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes trace_id = 1; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes OR of length + // other than 8 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes span_id = 2; + + // trace_state conveys information about request position in multiple distributed tracing graphs. + // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header + // See also https://github.com/w3c/distributed-tracing for more details about this field. + string trace_state = 3; + + // The `span_id` of this span's parent span. If this is a root span, then this + // field must be empty. The ID is an 8-byte array. + bytes parent_span_id = 4; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether a span's parent + // is remote. The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the span is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // When creating span messages, if the message is logically forwarded from another source + // with an equivalent flags fields (i.e., usually another OTLP span message), the field SHOULD + // be copied as-is. If creating from a source that does not have an equivalent flags field + // (such as a runtime representation of an OpenTelemetry span), the high 22 bits MUST + // be set to zero. + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // + // [Optional]. + fixed32 flags = 16; + + // A description of the span's operation. + // + // For example, the name can be a qualified method name or a file name + // and a line number where the operation is called. A best practice is to use + // the same display name at the same call point in an application. + // This makes it easier to correlate spans in different traces. + // + // This field is semantically required to be set to non-empty string. + // Empty value is equivalent to an unknown span name. + // + // This field is required. + string name = 5; + + // SpanKind is the type of span. Can be used to specify additional relationships between spans + // in addition to a parent/child relationship. + enum SpanKind { + // Unspecified. Do NOT use as default. + // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + SPAN_KIND_UNSPECIFIED = 0; + + // Indicates that the span represents an internal operation within an application, + // as opposed to an operation happening at the boundaries. Default value. + SPAN_KIND_INTERNAL = 1; + + // Indicates that the span covers server-side handling of an RPC or other + // remote network request. + SPAN_KIND_SERVER = 2; + + // Indicates that the span describes a request to some remote service. + SPAN_KIND_CLIENT = 3; + + // Indicates that the span describes a producer sending a message to a broker. + // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + // between producer and consumer spans. A PRODUCER span ends when the message was accepted + // by the broker while the logical processing of the message might span a much longer time. + SPAN_KIND_PRODUCER = 4; + + // Indicates that the span describes consumer receiving a message from a broker. + // Like the PRODUCER kind, there is often no direct critical path latency relationship + // between producer and consumer spans. + SPAN_KIND_CONSUMER = 5; + } + + // Distinguishes between spans generated in a particular context. For example, + // two spans with the same name may be distinguished using `CLIENT` (caller) + // and `SERVER` (callee) to identify queueing latency associated with the span. + SpanKind kind = 6; + + // The start time of the span. On the client side, this is the time + // kept by the local machine where the span execution starts. On the server side, this + // is the time when the server's application handler starts running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 start_time_unix_nano = 7; + + // The end time of the span. On the client side, this is the time + // kept by the local machine where the span execution ends. On the server side, this + // is the time when the server application handler stops running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 end_time_unix_nano = 8; + + // A collection of key/value pairs. Note, global attributes + // like server name can be set using the resource API. Examples of attributes: + // + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "example.com/myattribute": true + // "example.com/score": 10.239 + // + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // The number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 10; + + // Event is a time-stamped annotation of the span, consisting of user-supplied + // text description and key-value pairs. + message Event { + // The time the event occurred. + fixed64 time_unix_nano = 1; + + // The name of the event. + // This field is semantically required to be set to non-empty string. + string name = 2; + + // A collection of attribute key/value pairs on the event. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 3; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 4; + } + + // A collection of Event items. + repeated Event events = 11; + + // The number of dropped events. If the value is 0, then no + // events were dropped. + uint32 dropped_events_count = 12; + + // A pointer from the current span to another span in the same trace or in a + // different trace. For example, this can be used in batching operations, + // where a single batch handler processes multiple requests from different + // traces or when the handler receives a request from a different project. + message Link { + // A unique identifier of a trace that this linked span is part of. The ID is a + // 16-byte array. + bytes trace_id = 1; + + // A unique identifier for the linked span. The ID is an 8-byte array. + bytes span_id = 2; + + // The trace_state associated with the link. + string trace_state = 3; + + // A collection of attribute key/value pairs on the link. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 5; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether the link is remote. + // The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the link is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // When creating new spans, bits 10-31 (most-significant 22-bits) MUST be zero. + // + // [Optional]. + fixed32 flags = 6; + } + + // A collection of Links, which are references from this span to a span + // in the same or different trace. + repeated Link links = 13; + + // The number of dropped links after the maximum size was + // enforced. If this value is 0, then no links were dropped. + uint32 dropped_links_count = 14; + + // An optional final status for this span. Semantically when Status isn't set, it means + // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + Status status = 15; +} + +// The Status type defines a logical error model that is suitable for different +// programming environments, including REST APIs and RPC APIs. +message Status { + reserved 1; + + // A developer-facing human readable error message. + string message = 2; + + // For the semantics of status codes see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status + enum StatusCode { + // The default status. + STATUS_CODE_UNSET = 0; + // The Span has been validated by an Application developer or Operator to + // have completed successfully. + STATUS_CODE_OK = 1; + // The Span contains an error. + STATUS_CODE_ERROR = 2; + }; + + // The status code. + StatusCode code = 3; +} + +// SpanFlags represents constants used to interpret the +// Span.flags field, which is protobuf 'fixed32' type and is to +// be used as bit-fields. Each non-zero value defined in this enum is +// a bit-mask. To extract the bit-field, for example, use an +// expression like: +// +// (span.flags & SPAN_FLAGS_TRACE_FLAGS_MASK) +// +// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. +// +// Note that Span flags were introduced in version 1.1 of the +// OpenTelemetry protocol. Older Span producers do not set this +// field, consequently consumers should not rely on the absence of a +// particular flag bit to indicate the presence of a particular feature. +enum SpanFlags { + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + SPAN_FLAGS_DO_NOT_USE = 0; + + // Bits 0-7 are used for trace flags. + SPAN_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; + + // Bits 8 and 9 are used to indicate that the parent span or link span is remote. + // Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known. + // Bit 9 (`IS_REMOTE`) indicates whether the span or link is remote. + SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK = 0x00000100; + SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK = 0x00000200; + + // Bits 10-31 are reserved for future use. +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 3b7534eafb5..d1d748adfba 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -181,7 +181,6 @@ import static datadog.trace.api.ConfigDefaults.DEFAULT_WEBSOCKET_MESSAGES_INHERIT_SAMPLING; import static datadog.trace.api.ConfigDefaults.DEFAULT_WEBSOCKET_MESSAGES_SEPARATE_TRACES; import static datadog.trace.api.ConfigDefaults.DEFAULT_WEBSOCKET_TAG_SESSION_ID; -import static datadog.trace.api.ConfigDefaults.DEFAULT_WRITER_BAGGAGE_INJECT; import static datadog.trace.api.ConfigSetting.NON_DEFAULT_SEQ_ID; import static datadog.trace.api.DDTags.APM_ENABLED; import static datadog.trace.api.DDTags.HOST_TAG; @@ -683,6 +682,7 @@ import static datadog.trace.api.config.TracerConfig.TRACE_STRICT_WRITES_ENABLED; import static datadog.trace.api.config.TracerConfig.TRACE_X_DATADOG_TAGS_MAX_LENGTH; import static datadog.trace.api.config.TracerConfig.WRITER_BAGGAGE_INJECT; +import static datadog.trace.api.config.TracerConfig.WRITER_LINKS_INJECT; import static datadog.trace.api.config.TracerConfig.WRITER_TYPE; import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY; import static datadog.trace.util.CollectionUtils.tryMakeImmutableList; @@ -837,6 +837,7 @@ public static String getHostName() { private final boolean integrationSynapseLegacyOperationName; private final String writerType; private final boolean injectBaggageAsTagsEnabled; + private final boolean injectLinksAsTagsEnabled; private final boolean agentConfiguredUsingDefault; private final String agentUrl; private final String agentHost; @@ -1432,8 +1433,10 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins integrationSynapseLegacyOperationName = configProvider.getBoolean(INTEGRATION_SYNAPSE_LEGACY_OPERATION_NAME, false); writerType = configProvider.getString(WRITER_TYPE, DEFAULT_AGENT_WRITER_TYPE); + boolean isDatadogTraceWriter = !isTraceOtlpExporterEnabled(); injectBaggageAsTagsEnabled = - configProvider.getBoolean(WRITER_BAGGAGE_INJECT, DEFAULT_WRITER_BAGGAGE_INJECT); + configProvider.getBoolean(WRITER_BAGGAGE_INJECT, isDatadogTraceWriter); + injectLinksAsTagsEnabled = configProvider.getBoolean(WRITER_LINKS_INJECT, isDatadogTraceWriter); String lambdaInitType = getEnv("AWS_LAMBDA_INITIALIZATION_TYPE"); if (lambdaInitType != null && lambdaInitType.equals("snap-start")) { secureRandom = true; @@ -3231,6 +3234,10 @@ public boolean isInjectBaggageAsTagsEnabled() { return injectBaggageAsTagsEnabled; } + public boolean isInjectLinksAsTagsEnabled() { + return injectLinksAsTagsEnabled; + } + public boolean isAgentConfiguredUsingDefault() { return agentConfiguredUsingDefault; } @@ -6302,6 +6309,8 @@ public String toString() { + traceFlushIntervalSeconds + ", injectBaggageAsTagsEnabled=" + injectBaggageAsTagsEnabled + + ", injectLinksAsTagsEnabled=" + + injectLinksAsTagsEnabled + ", logsInjectionEnabled=" + logsInjectionEnabled + ", appLogsCollectionEnabled=" diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index ac7935039e3..44d51c5f7f1 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -11241,6 +11241,14 @@ "aliases": [] } ], + "DD_WRITER_LINKS_INJECT": [ + { + "version": "A", + "type": "boolean", + "default": "true", + "aliases": [] + } + ], "DD_WRITER_TYPE": [ { "version": "A",