Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ public final class ConfigDefaults {
static final int DEFAULT_METRICS_OTEL_TIMEOUT = 7_500; // ms
static final int DEFAULT_METRICS_OTEL_CARDINALITY_LIMIT = 2_000;

static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms
public static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms

static final String DEFAULT_OTLP_HTTP_METRICS_ENDPOINT = "v1/metrics";
static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
static final String DEFAULT_OTLP_HTTP_PORT = "4318";
static final String DEFAULT_OTLP_GRPC_PORT = "4317";
public static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
public static final String DEFAULT_OTLP_HTTP_PORT = "4318";
public static final String DEFAULT_OTLP_GRPC_PORT = "4317";

static final int DEFAULT_DOGSTATSD_START_DELAY = 15; // seconds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public static Sampler forConfig(final Config config, final TraceConfig traceConf
log.error("Invalid sampler configuration. Using AllSampler", e);
sampler = new AllSampler();
}
// TODO: if OTLP trace export enabled, select ParentBasedAlwaysOnSampler here
} else if (config.isPrioritySamplingEnabled()) {
if (KEEP.equalsIgnoreCase(config.getPrioritySamplingForce())) {
log.debug("Force Sampling Priority to: SAMPLER_KEEP.");
Expand All @@ -90,9 +89,19 @@ public static Sampler forConfig(final Config config, final TraceConfig traceConf
log.debug("Force Sampling Priority to: SAMPLER_DROP.");
sampler =
new ForcePrioritySampler(PrioritySampling.SAMPLER_DROP, SamplingMechanism.DEFAULT);
} else if (config.isTraceOtlpExporterEnabled()) {
// RateByServiceTraceSampler relies on the Datadog Agent for rate updates.
log.debug(
"OTLP traces export enabled. Using ParentBasedAlwaysOnSampler instead of RateByServiceTraceSampler.");
sampler = new ParentBasedAlwaysOnSampler();
} else {
sampler = new RateByServiceTraceSampler();
}
} else if (config.isTraceOtlpExporterEnabled()) {
// AllSampler does not emit a sampling priority; OTLP export requires one.
log.debug(
"OTLP traces export enabled. Using ParentBasedAlwaysOnSampler instead of AllSampler.");
sampler = new ParentBasedAlwaysOnSampler();
} else {
sampler = new AllSampler();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package datadog.trace.common.writer;

import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDSpanContext;
import datadog.trace.core.otlp.common.OtlpPayload;
import datadog.trace.core.otlp.common.OtlpSender;
import datadog.trace.core.otlp.trace.OtlpTraceCollector;
import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

final class OtlpPayloadDispatcher implements PayloadDispatcher {
private final OtlpTraceCollector collector;
private final OtlpSender sender;

OtlpPayloadDispatcher(OtlpSender sender) {
this(sender, OtlpTraceProtoCollector.INSTANCE);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Allocate a collector per OTLP payload dispatcher

The default dispatcher constructor reuses OtlpTraceProtoCollector.INSTANCE for all OtlpWriter instances, but that collector maintains mutable state and is documented as single-threaded. If more than one OTLP writer exists in-process (e.g., multiple tracers or MultiWriter compositions), serializer threads will concurrently mutate shared collector state, which can corrupt payload assembly or mix traces across writers.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe we ever have a scenario where multiple OtlpWriters are initialized 🤔

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - there should only ever be at most one OtlpWriter initialized.

But since OtlpTraceProtoCollector is only ever used inside this class we could always drop the OtlpTraceProtoCollector.INSTANCE field (as we don't need to share the instance across classes) and construct OtlpTraceProtoCollector in this constructor.

}

OtlpPayloadDispatcher(OtlpSender sender, OtlpTraceCollector collector) {
this.sender = sender;
this.collector = collector;
}

@Override
public void addTrace(List<? extends CoreSpan<?>> trace) {
List<CoreSpan<?>> sampled = null;
for (CoreSpan<?> span : trace) {
if (shouldExport(span)) {
if (sampled == null) {
sampled = new ArrayList<>(trace.size());
}
sampled.add(span);
}
}
if (sampled != null) {
collector.addTrace(sampled);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just call collector.addTrace(trace) like the non-OTLP dispatcher does?

}
}

@Override
public void flush() {
OtlpPayload payload = collector.collectTraces();
if (payload != OtlpPayload.EMPTY) {
sender.send(payload);
}
}

@Override
public void onDroppedTrace(int spanCount) {
// TODO: surface drop counts via HealthMetrics
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've purposefully left this out as its not in the current spec

}

@Override
public Collection<RemoteApi> getApis() {
return Collections.emptyList();
}

private static boolean shouldExport(CoreSpan<?> span) {
// trace-level sampling priority
if (span.samplingPriority() > 0) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Allow OTLP export when sampling priority is UNSET

shouldExport drops any span whose samplingPriority() is not > 0 unless span-sampling tags are present. In configurations that legitimately publish traces without priority sampling (for example DD_PRIORITY_SAMPLING=false with AllSampler), spans are published but keep UNSET priority, so this filter discards every span and OTLP emits nothing. Since tracing already decided to publish upstream, UNSET should not be treated as an automatic drop here.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default payload dispatcher doesn't have this extra sampling check - it just assumes that anything reaching the dispatcher should be exported. Is there a reason we have this additional check for OTLP?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the spec:

only export spans that have been sampled

return true;
}
// span-level sampling priority
return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package datadog.trace.common.writer;

import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_PORT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_TRACES_TIMEOUT;

import datadog.communication.ddagent.DroppingPolicy;
import datadog.trace.api.config.OtlpConfig;
import datadog.trace.common.sampling.SingleSpanSampler;
import datadog.trace.common.writer.ddagent.Prioritization;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.otlp.common.OtlpGrpcSender;
import datadog.trace.core.otlp.common.OtlpHttpSender;
import datadog.trace.core.otlp.common.OtlpSender;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OtlpWriter extends RemoteWriter {

private static final int BUFFER_SIZE = 1024;
Comment thread
mtoffl01 marked this conversation as resolved.
private static final String TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
private static final String DEFAULT_OTLP_HTTP_ENDPOINT =
"http://localhost:" + DEFAULT_OTLP_HTTP_PORT + TRACES_SIGNAL_PATH;

public static OtlpWriterBuilder builder() {
return new OtlpWriterBuilder();
}

private final OtlpSender sender;

OtlpWriter(
TraceProcessingWorker worker,
PayloadDispatcher dispatcher,
OtlpSender sender,
HealthMetrics healthMetrics,
int flushTimeout,
TimeUnit flushTimeoutUnit,
boolean alwaysFlush) {
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
this.sender = sender;
}

@Override
public void close() {
super.close();
sender.shutdown();
}

public static class OtlpWriterBuilder {
private String endpoint = DEFAULT_OTLP_HTTP_ENDPOINT;
private Map<String, String> headers = Collections.emptyMap();
private int timeoutMillis = DEFAULT_OTLP_TRACES_TIMEOUT;
private OtlpConfig.Protocol protocol = OtlpConfig.Protocol.HTTP_PROTOBUF;
private OtlpConfig.Compression compression = OtlpConfig.Compression.NONE;
private int traceBufferSize = BUFFER_SIZE;
private HealthMetrics healthMetrics = HealthMetrics.NO_OP;
private int flushIntervalMilliseconds = 1000;
private int flushTimeout = 1;
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
private boolean alwaysFlush = false;
private SingleSpanSampler singleSpanSampler;
private OtlpSender sender;
Comment thread
mtoffl01 marked this conversation as resolved.

public OtlpWriterBuilder endpoint(String endpoint) {
this.endpoint = endpoint;
return this;
}

public OtlpWriterBuilder headers(Map<String, String> headers) {
this.headers = headers;
return this;
}

public OtlpWriterBuilder timeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
return this;
}

public OtlpWriterBuilder protocol(OtlpConfig.Protocol protocol) {
this.protocol = protocol;
return this;
}

public OtlpWriterBuilder compression(OtlpConfig.Compression compression) {
this.compression = compression;
return this;
}

public OtlpWriterBuilder traceBufferSize(int traceBufferSize) {
this.traceBufferSize = traceBufferSize;
return this;
}

public OtlpWriterBuilder healthMetrics(HealthMetrics healthMetrics) {
this.healthMetrics = healthMetrics;
return this;
}

public OtlpWriterBuilder flushIntervalMilliseconds(int flushIntervalMilliseconds) {
this.flushIntervalMilliseconds = flushIntervalMilliseconds;
return this;
}

public OtlpWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
this.flushTimeout = flushTimeout;
this.flushTimeoutUnit = flushTimeoutUnit;
return this;
}

public OtlpWriterBuilder alwaysFlush(boolean alwaysFlush) {
this.alwaysFlush = alwaysFlush;
return this;
}

public OtlpWriterBuilder spanSamplingRules(SingleSpanSampler singleSpanSampler) {
this.singleSpanSampler = singleSpanSampler;
return this;
}

OtlpWriterBuilder sender(OtlpSender sender) {
this.sender = sender;
return this;
}

public OtlpWriter build() {
if (sender == null) {
sender =
protocol == OtlpConfig.Protocol.GRPC
? new OtlpGrpcSender(
endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression)
Comment on lines +129 to +131
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Send OTLP gRPC traces to TraceService export path

In OtlpWriterBuilder.build, the gRPC branch passes TRACES_SIGNAL_PATH (/v1/traces) to OtlpGrpcSender, but that sender emits application/grpc requests and needs a gRPC method path (as shown by metrics using /opentelemetry.proto.collector.metrics.v1.MetricsService/Export). With the current path, OTLP collectors will reject the request (typically 404/UNIMPLEMENTED), so trace export fails whenever otlp.traces.protocol=grpc is used.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I believe the GRPC sender should use

"/opentelemetry.proto.collector.trace.v1.TraceService/Export"

while the HTTP sender should use

"/v1/traces"

: new OtlpHttpSender(
endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression);
}

final OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender);
final TraceProcessingWorker worker =
new TraceProcessingWorker(
traceBufferSize,
healthMetrics,
dispatcher,
DroppingPolicy.DISABLED,
Prioritization.ENSURE_TRACE,
flushIntervalMilliseconds,
TimeUnit.MILLISECONDS,
singleSpanSampler);

return new OtlpWriter(
worker, dispatcher, sender, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.DD_INTAKE_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.LOGGING_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.MULTI_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.OTLP_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.PRINTING_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.TRACE_STRUCTURE_WRITER_TYPE;
import static datadog.trace.common.writer.ddagent.Prioritization.ENSURE_TRACE;
Expand Down Expand Up @@ -57,6 +58,8 @@ public static Writer createWriter(
final HealthMetrics healthMetrics,
String configuredType) {

int flushIntervalMilliseconds = Math.round(config.getTraceFlushIntervalSeconds() * 1000);

if (LOGGING_WRITER_TYPE.equals(configuredType)) {
return new LoggingWriter();
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
Expand All @@ -67,6 +70,17 @@ public static Writer createWriter(
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
return new MultiWriter(
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
} else if (OTLP_WRITER_TYPE.equals(configuredType)) {
return OtlpWriter.builder()
.endpoint(config.getOtlpTracesEndpoint())
.headers(config.getOtlpTracesHeaders())
.protocol(config.getOtlpTracesProtocol())
.compression(config.getOtlpTracesCompression())
.timeoutMillis(config.getOtlpTracesTimeout())
.healthMetrics(healthMetrics)
.spanSamplingRules(singleSpanSampler)
.flushIntervalMilliseconds(flushIntervalMilliseconds)
.build();
}

if (!DD_AGENT_WRITER_TYPE.equals(configuredType)
Expand All @@ -84,7 +98,6 @@ public static Writer createWriter(
"Using 'EnsureTrace' prioritization type. (Do not use this type if your application is running in production mode)");
}

int flushIntervalMilliseconds = Math.round(config.getTraceFlushIntervalSeconds() * 1000);
DDAgentFeaturesDiscovery featuresDiscovery = commObjects.featuresDiscovery(config);

// CI Visibility with bazel support wants to write traces into JSON files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,17 @@ private static void writeSpanTag(StreamingBuffer buf, TagMap.EntryReader tagEntr

private static void writeSpanTag(
StreamingBuffer buf, UTF8BytesString key, UTF8BytesString value) {
if (value == null) {
return;
}
writeTag(buf, 9, LEN_WIRE_TYPE);
writeAttribute(buf, key, value);
}

private static void writeSpanTag(StreamingBuffer buf, UTF8BytesString key, CharSequence value) {
if (value == null) {
return;
}
writeTag(buf, 9, LEN_WIRE_TYPE);
if (value instanceof UTF8BytesString) {
writeAttribute(buf, key, (UTF8BytesString) value);
Expand Down
Loading
Loading