-
Notifications
You must be signed in to change notification settings - Fork 333
Add OtlpWriter for OTLP traces export #11200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
145f884
d183f39
d69e491
e7e3a95
af97659
015e272
8194971
a5cd8fe
1615566
46887fe
eb2b116
b02aab1
594b4b8
491533b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| package datadog.trace.common.writer; | ||
|
|
||
| import datadog.trace.core.CoreSpan; | ||
| import datadog.trace.core.DDSpanContext; | ||
| import datadog.trace.core.otlp.common.OtlpPayload; | ||
| import datadog.trace.core.otlp.common.OtlpSender; | ||
| import datadog.trace.core.otlp.trace.OtlpTraceCollector; | ||
| import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| final class OtlpPayloadDispatcher implements PayloadDispatcher { | ||
| private final OtlpTraceCollector collector; | ||
| private final OtlpSender sender; | ||
|
|
||
| OtlpPayloadDispatcher(OtlpSender sender) { | ||
| this(sender, OtlpTraceProtoCollector.INSTANCE); | ||
| } | ||
|
|
||
| OtlpPayloadDispatcher(OtlpSender sender, OtlpTraceCollector collector) { | ||
| this.sender = sender; | ||
| this.collector = collector; | ||
| } | ||
|
|
||
| @Override | ||
| public void addTrace(List<? extends CoreSpan<?>> trace) { | ||
| List<CoreSpan<?>> sampled = null; | ||
| for (CoreSpan<?> span : trace) { | ||
| if (shouldExport(span)) { | ||
| if (sampled == null) { | ||
| sampled = new ArrayList<>(trace.size()); | ||
| } | ||
| sampled.add(span); | ||
| } | ||
| } | ||
| if (sampled != null) { | ||
| collector.addTrace(sampled); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason we can't just call |
||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void flush() { | ||
| OtlpPayload payload = collector.collectTraces(); | ||
| if (payload != OtlpPayload.EMPTY) { | ||
| sender.send(payload); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onDroppedTrace(int spanCount) { | ||
| // TODO: surface drop counts via HealthMetrics | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've purposefully left this out as its not in the current spec |
||
| } | ||
|
|
||
| @Override | ||
| public Collection<RemoteApi> getApis() { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| private static boolean shouldExport(CoreSpan<?> span) { | ||
| // trace-level sampling priority | ||
| if (span.samplingPriority() > 0) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default payload dispatcher doesn't have this extra sampling check - it just assumes that anything reaching the dispatcher should be exported. Is there a reason we have this additional check for OTLP?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the spec:
|
||
| return true; | ||
| } | ||
| // span-level sampling priority | ||
| return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| package datadog.trace.common.writer; | ||
|
|
||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_PORT; | ||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_TRACES_ENDPOINT; | ||
| import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_TRACES_TIMEOUT; | ||
|
|
||
| import datadog.communication.ddagent.DroppingPolicy; | ||
| import datadog.trace.api.config.OtlpConfig; | ||
| import datadog.trace.common.sampling.SingleSpanSampler; | ||
| import datadog.trace.common.writer.ddagent.Prioritization; | ||
| import datadog.trace.core.monitor.HealthMetrics; | ||
| import datadog.trace.core.otlp.common.OtlpGrpcSender; | ||
| import datadog.trace.core.otlp.common.OtlpHttpSender; | ||
| import datadog.trace.core.otlp.common.OtlpSender; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| public class OtlpWriter extends RemoteWriter { | ||
|
|
||
| private static final int BUFFER_SIZE = 1024; | ||
|
mtoffl01 marked this conversation as resolved.
|
||
| private static final String TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_HTTP_TRACES_ENDPOINT; | ||
| private static final String DEFAULT_OTLP_HTTP_ENDPOINT = | ||
| "http://localhost:" + DEFAULT_OTLP_HTTP_PORT + TRACES_SIGNAL_PATH; | ||
|
|
||
| public static OtlpWriterBuilder builder() { | ||
| return new OtlpWriterBuilder(); | ||
| } | ||
|
|
||
| private final OtlpSender sender; | ||
|
|
||
| OtlpWriter( | ||
| TraceProcessingWorker worker, | ||
| PayloadDispatcher dispatcher, | ||
| OtlpSender sender, | ||
| HealthMetrics healthMetrics, | ||
| int flushTimeout, | ||
| TimeUnit flushTimeoutUnit, | ||
| boolean alwaysFlush) { | ||
| super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush); | ||
| this.sender = sender; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| super.close(); | ||
| sender.shutdown(); | ||
| } | ||
|
|
||
| public static class OtlpWriterBuilder { | ||
| private String endpoint = DEFAULT_OTLP_HTTP_ENDPOINT; | ||
| private Map<String, String> headers = Collections.emptyMap(); | ||
| private int timeoutMillis = DEFAULT_OTLP_TRACES_TIMEOUT; | ||
| private OtlpConfig.Protocol protocol = OtlpConfig.Protocol.HTTP_PROTOBUF; | ||
| private OtlpConfig.Compression compression = OtlpConfig.Compression.NONE; | ||
| private int traceBufferSize = BUFFER_SIZE; | ||
| private HealthMetrics healthMetrics = HealthMetrics.NO_OP; | ||
| private int flushIntervalMilliseconds = 1000; | ||
| private int flushTimeout = 1; | ||
| private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS; | ||
| private boolean alwaysFlush = false; | ||
| private SingleSpanSampler singleSpanSampler; | ||
| private OtlpSender sender; | ||
|
mtoffl01 marked this conversation as resolved.
|
||
|
|
||
| public OtlpWriterBuilder endpoint(String endpoint) { | ||
| this.endpoint = endpoint; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder headers(Map<String, String> headers) { | ||
| this.headers = headers; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder timeoutMillis(int timeoutMillis) { | ||
| this.timeoutMillis = timeoutMillis; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder protocol(OtlpConfig.Protocol protocol) { | ||
| this.protocol = protocol; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder compression(OtlpConfig.Compression compression) { | ||
| this.compression = compression; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder traceBufferSize(int traceBufferSize) { | ||
| this.traceBufferSize = traceBufferSize; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder healthMetrics(HealthMetrics healthMetrics) { | ||
| this.healthMetrics = healthMetrics; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder flushIntervalMilliseconds(int flushIntervalMilliseconds) { | ||
| this.flushIntervalMilliseconds = flushIntervalMilliseconds; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) { | ||
| this.flushTimeout = flushTimeout; | ||
| this.flushTimeoutUnit = flushTimeoutUnit; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder alwaysFlush(boolean alwaysFlush) { | ||
| this.alwaysFlush = alwaysFlush; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriterBuilder spanSamplingRules(SingleSpanSampler singleSpanSampler) { | ||
| this.singleSpanSampler = singleSpanSampler; | ||
| return this; | ||
| } | ||
|
|
||
| OtlpWriterBuilder sender(OtlpSender sender) { | ||
| this.sender = sender; | ||
| return this; | ||
| } | ||
|
|
||
| public OtlpWriter build() { | ||
| if (sender == null) { | ||
| sender = | ||
| protocol == OtlpConfig.Protocol.GRPC | ||
| ? new OtlpGrpcSender( | ||
| endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression) | ||
|
Comment on lines
+129
to
+131
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - I believe the GRPC sender should use while the HTTP sender should use |
||
| : new OtlpHttpSender( | ||
| endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression); | ||
| } | ||
|
|
||
| final OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender); | ||
| final TraceProcessingWorker worker = | ||
| new TraceProcessingWorker( | ||
| traceBufferSize, | ||
| healthMetrics, | ||
| dispatcher, | ||
| DroppingPolicy.DISABLED, | ||
| Prioritization.ENSURE_TRACE, | ||
| flushIntervalMilliseconds, | ||
| TimeUnit.MILLISECONDS, | ||
| singleSpanSampler); | ||
|
|
||
| return new OtlpWriter( | ||
| worker, dispatcher, sender, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default dispatcher constructor reuses
OtlpTraceProtoCollector.INSTANCEfor allOtlpWriterinstances, but that collector maintains mutable state and is documented as single-threaded. If more than one OTLP writer exists in-process (e.g., multiple tracers orMultiWritercompositions), serializer threads will concurrently mutate shared collector state, which can corrupt payload assembly or mix traces across writers.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we ever have a scenario where multiple OtlpWriters are initialized 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct - there should only ever be at most one
OtlpWriterinitialized.But since
OtlpTraceProtoCollectoris only ever used inside this class we could always drop theOtlpTraceProtoCollector.INSTANCEfield (as we don't need to share the instance across classes) and constructOtlpTraceProtoCollectorin this constructor.