Skip to content

Commit 66fe66c

Browse files
committed
Review feedback: support collecting multiple traces into one payload
1 parent d5ce7d2 commit 66fe66c

4 files changed

Lines changed: 60 additions & 24 deletions

File tree

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package datadog.trace.core.otlp.trace;
22

3-
import datadog.trace.core.DDSpan;
3+
import datadog.trace.core.CoreSpan;
44
import datadog.trace.core.otlp.common.OtlpPayload;
55
import java.util.List;
66

7-
/** Collects trace spans ready for export. */
7+
/** Collects traces ready for export. */
88
public interface OtlpTraceCollector {
9-
OtlpTraceCollector NOOP_COLLECTOR = spans -> OtlpPayload.EMPTY;
9+
OtlpTraceCollector NOOP_COLLECTOR = () -> OtlpPayload.EMPTY;
1010

11-
OtlpPayload collectSpans(List<DDSpan> spans);
11+
OtlpPayload collectTraces();
12+
13+
default void addTrace(List<? extends CoreSpan<?>> spans) {}
1214
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,19 +259,27 @@ private static int spanKind(CharSequence spanKind) {
259259

260260
public static class MetaWriter implements MetadataConsumer {
261261
private final StreamingBuffer buf;
262-
private boolean firstSpan = true;
262+
263+
private boolean includeProcessTags;
264+
private boolean includeSamplingTags;
263265

264266
public MetaWriter(StreamingBuffer buf) {
265267
this.buf = buf;
266268
}
267269

268-
public void reset() {
269-
firstSpan = true;
270+
/** Call this to ensure process tags are written out for the next span. */
271+
public void includeProcessTags() {
272+
includeProcessTags = true;
273+
}
274+
275+
/** Call this to ensure sampling tags are written out for the next span. */
276+
public void includeSamplingTags() {
277+
includeSamplingTags = true;
270278
}
271279

272280
@Override
273281
public void accept(Metadata metadata) {
274-
if ((firstSpan || metadata.topLevel()) && metadata.hasSamplingPriority()) {
282+
if ((includeSamplingTags || metadata.topLevel()) && metadata.hasSamplingPriority()) {
275283
writeSpanTag(buf, SAMPLING_PRIORITY_KEY, metadata.samplingPriority());
276284
}
277285
if (metadata.measured()) {
@@ -297,13 +305,15 @@ public void accept(Metadata metadata) {
297305
if (metadata.getOrigin() != null) {
298306
writeSpanTag(buf, ORIGIN_KEY, metadata.getOrigin());
299307
}
300-
if (firstSpan && metadata.processTags() != null) {
308+
if (includeProcessTags && metadata.processTags() != null) {
301309
writeSpanTag(buf, PROCESS_TAGS_KEY, metadata.processTags());
302310
}
303311

304312
metadata.getTags().forEach(tagEntry -> writeSpanTag(buf, tagEntry));
305313

306-
firstSpan = false;
314+
// reset for next span
315+
includeProcessTags = false;
316+
includeSamplingTags = false;
307317
}
308318
}
309319
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import datadog.communication.serialization.GrowableBuffer;
1010
import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink;
1111
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
12+
import datadog.trace.core.CoreSpan;
1213
import datadog.trace.core.DDSpan;
1314
import datadog.trace.core.otlp.common.OtlpCommonProto;
1415
import datadog.trace.core.otlp.common.OtlpPayload;
@@ -48,6 +49,8 @@ public final class OtlpTraceProtoCollector implements OtlpTraceCollector {
4849
private final List<byte[]> scopedChunks = new ArrayList<>();
4950
private final List<byte[]> spanChunks = new ArrayList<>();
5051

52+
private boolean payloadStarted;
53+
5154
// total number of chunked bytes at different nesting levels
5255
private int payloadBytes;
5356
private int scopedBytes;
@@ -56,19 +59,31 @@ public final class OtlpTraceProtoCollector implements OtlpTraceCollector {
5659
private OtelInstrumentationScope currentScope;
5760
private DDSpan currentSpan;
5861

62+
/** Adds the given trace spans to the collector. */
63+
@Override
64+
public void addTrace(List<? extends CoreSpan<?>> spans) {
65+
if (!payloadStarted) {
66+
start();
67+
metaWriter.includeProcessTags();
68+
payloadStarted = true;
69+
}
70+
71+
for (int i = 0, len = spans.size(); i < len; i++) {
72+
if (i == 0 || i == len - 1) {
73+
metaWriter.includeSamplingTags();
74+
}
75+
visitSpan(spans.get(i));
76+
}
77+
}
78+
5979
/**
60-
* Collects trace spans and marshalls them into a chunked payload.
80+
* Marshalls the traces collected so far into a chunked payload.
6181
*
6282
* <p>This payload is only valid for the calling thread until the next collection.
6383
*/
6484
@Override
65-
public OtlpPayload collectSpans(List<DDSpan> spans) {
66-
OtlpCommonProto.recalibrateCaches();
67-
start();
85+
public OtlpPayload collectTraces() {
6886
try {
69-
// for now put all spans under the default scope
70-
visitScopedSpans(DEFAULT_TRACE_SCOPE);
71-
spans.forEach(this::visitSpan);
7287
return completePayload();
7388
} finally {
7489
stop();
@@ -77,14 +92,22 @@ public OtlpPayload collectSpans(List<DDSpan> spans) {
7792

7893
/** Prepare temporary elements to collect trace data. */
7994
private void start() {
95+
8096
// clear payloadChunks in case it wasn't fully consumed via OtlpPayload
8197
payloadChunks.clear();
98+
99+
// remove stale entries from caches
100+
OtlpCommonProto.recalibrateCaches();
101+
102+
// for now put all spans under the default scope
103+
visitScopedSpans(DEFAULT_TRACE_SCOPE);
82104
}
83105

84106
/** Cleanup elements used to collect trace data. */
85107
private void stop() {
108+
payloadStarted = false;
109+
86110
buf.reset();
87-
metaWriter.reset();
88111

89112
// leave payloadChunks in place so it can be consumed via OtlpPayload
90113
scopedChunks.clear();
@@ -105,12 +128,12 @@ private void visitScopedSpans(OtelInstrumentationScope scope) {
105128
currentScope = scope;
106129
}
107130

108-
private void visitSpan(DDSpan span) {
131+
private void visitSpan(CoreSpan<?> span) {
109132
if (currentSpan != null) {
110133
completeSpan();
111134
}
112-
currentSpan = span;
113-
span.getLinks().forEach(this::visitSpanLink);
135+
currentSpan = (DDSpan) span;
136+
currentSpan.getLinks().forEach(this::visitSpanLink);
114137
}
115138

116139
private void visitSpanLink(AgentSpanLink spanLink) {

dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import org.junit.jupiter.params.provider.MethodSource;
4646

4747
/**
48-
* Tests for {@link OtlpTraceProto} via {@link OtlpTraceProtoCollector#collectSpans}.
48+
* Tests for {@link OtlpTraceProto} via {@link OtlpTraceProtoCollector#collectTraces}.
4949
*
5050
* <p>Each test case builds real {@link DDSpan} instances via a shared {@link CoreTracer}, collects
5151
* them using {@link OtlpTraceProtoCollector}, drains the resulting chunked payload into a
@@ -550,10 +550,11 @@ static Stream<Arguments> cases() {
550550

551551
@ParameterizedTest(name = "{0}")
552552
@MethodSource("cases")
553-
void testCollectSpans(String caseName, List<SpanSpec> specs) throws IOException {
553+
void testCollectTraces(String caseName, List<SpanSpec> specs) throws IOException {
554554
List<DDSpan> spans = buildSpans(specs);
555555

556-
OtlpPayload payload = OtlpTraceProtoCollector.INSTANCE.collectSpans(spans);
556+
OtlpTraceProtoCollector.INSTANCE.addTrace(spans);
557+
OtlpPayload payload = OtlpTraceProtoCollector.INSTANCE.collectTraces();
557558

558559
if (spans.isEmpty()) {
559560
assertEquals(0, payload.getContentLength(), "empty span list must produce empty payload");

0 commit comments

Comments
 (0)