Skip to content

Commit 33a9e67

Browse files
dougqhclaude
andcommitted
Move client-side stats computation off the span-finish thread
ConflatingMetricsAggregator.publish() was consuming ~17% of foreground CPU (ConcurrentHashMap 12%, TraceHealthMetrics 3%, LongAdder 2%) by running MetricKey construction, ConcurrentHashMap lookups, and Batch management synchronously on the span-finish thread. This change extracts lightweight SpanStatsData DTOs on the foreground thread and defers all expensive work (MetricKey construction, map lookups, health metrics) to the existing background Aggregator thread via the MPSC inbox queue. The pending/keys maps are downgraded from ConcurrentHashMap to plain HashMap since they are now single-threaded. Benchmark shows 64-span trace foreground cost reduced from 2.86us to 2.04us (~29% reduction). tag: no release note tag: ai generated Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4666c89 commit 33a9e67

6 files changed

Lines changed: 462 additions & 161 deletions

File tree

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package datadog.trace.common.metrics;
2+
3+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
4+
import static java.util.concurrent.TimeUnit.SECONDS;
5+
6+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
7+
import datadog.trace.api.WellKnownTags;
8+
import datadog.trace.core.CoreSpan;
9+
import datadog.trace.core.monitor.HealthMetrics;
10+
import datadog.trace.util.Strings;
11+
import java.nio.ByteBuffer;
12+
import java.util.ArrayList;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import java.util.Set;
16+
import org.openjdk.jmh.annotations.Benchmark;
17+
import org.openjdk.jmh.annotations.BenchmarkMode;
18+
import org.openjdk.jmh.annotations.Fork;
19+
import org.openjdk.jmh.annotations.Level;
20+
import org.openjdk.jmh.annotations.Measurement;
21+
import org.openjdk.jmh.annotations.Mode;
22+
import org.openjdk.jmh.annotations.OutputTimeUnit;
23+
import org.openjdk.jmh.annotations.Scope;
24+
import org.openjdk.jmh.annotations.Setup;
25+
import org.openjdk.jmh.annotations.State;
26+
import org.openjdk.jmh.annotations.TearDown;
27+
import org.openjdk.jmh.annotations.Threads;
28+
import org.openjdk.jmh.annotations.Warmup;
29+
import org.openjdk.jmh.infra.Blackhole;
30+
31+
/**
32+
* Measures the foreground thread cost of publishing span stats. With the background-stats
33+
* optimization, the foreground thread should only extract lightweight SpanStatsData and offer to
34+
* the inbox queue, while the expensive MetricKey construction and HashMap operations happen on the
35+
* background aggregator thread.
36+
*/
37+
@State(Scope.Benchmark)
38+
@Warmup(iterations = 3, time = 5, timeUnit = SECONDS)
39+
@Measurement(iterations = 5, time = 5, timeUnit = SECONDS)
40+
@BenchmarkMode(Mode.AverageTime)
41+
@OutputTimeUnit(MICROSECONDS)
42+
@Fork(value = 1)
43+
public class SpanFinishWithStatsBenchmark {
44+
45+
private static final Set<String> PEER_TAGS = Collections.singleton("peer.hostname");
46+
47+
private final DDAgentFeaturesDiscovery featuresDiscovery =
48+
new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
49+
PEER_TAGS, Collections.emptySet());
50+
51+
private ConflatingMetricsAggregator aggregator;
52+
53+
private final List<CoreSpan<?>> smallTrace = generateTrace(4);
54+
private final List<CoreSpan<?>> mediumTrace = generateTrace(16);
55+
private final List<CoreSpan<?>> largeTrace = generateTrace(64);
56+
57+
@Setup(Level.Trial)
58+
public void setup() {
59+
aggregator =
60+
new ConflatingMetricsAggregator(
61+
new WellKnownTags("", "", "", "", "", ""),
62+
Collections.emptySet(),
63+
featuresDiscovery,
64+
HealthMetrics.NO_OP,
65+
new NullSink(),
66+
2048,
67+
2048,
68+
false);
69+
aggregator.start();
70+
}
71+
72+
@TearDown(Level.Trial)
73+
public void teardown() {
74+
if (aggregator != null) {
75+
aggregator.close();
76+
}
77+
}
78+
79+
static List<CoreSpan<?>> generateTrace(int len) {
80+
final List<CoreSpan<?>> trace = new ArrayList<>();
81+
for (int i = 0; i < len; i++) {
82+
SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1);
83+
span.setTag("peer.hostname", Strings.random(10));
84+
trace.add(span);
85+
}
86+
return trace;
87+
}
88+
89+
static class NullSink implements Sink {
90+
@Override
91+
public void register(EventListener listener) {}
92+
93+
@Override
94+
public void accept(int messageCount, ByteBuffer buffer) {}
95+
}
96+
97+
@Benchmark
98+
public void publishSmallTrace(Blackhole blackhole) {
99+
blackhole.consume(aggregator.publish(smallTrace));
100+
}
101+
102+
@Benchmark
103+
public void publishMediumTrace(Blackhole blackhole) {
104+
blackhole.consume(aggregator.publish(mediumTrace));
105+
}
106+
107+
@Benchmark
108+
public void publishLargeTrace(Blackhole blackhole) {
109+
blackhole.consume(aggregator.publish(largeTrace));
110+
}
111+
112+
/** Multi-threaded benchmark to measure contention under concurrent publishing. */
113+
@State(Scope.Benchmark)
114+
@Warmup(iterations = 3, time = 5, timeUnit = SECONDS)
115+
@Measurement(iterations = 5, time = 5, timeUnit = SECONDS)
116+
@BenchmarkMode(Mode.Throughput)
117+
@OutputTimeUnit(MICROSECONDS)
118+
@Threads(8)
119+
@Fork(value = 1)
120+
public static class ConcurrentPublish {
121+
122+
private ConflatingMetricsAggregator aggregator;
123+
private final List<CoreSpan<?>> trace = generateTrace(16);
124+
125+
@Setup(Level.Trial)
126+
public void setup() {
127+
DDAgentFeaturesDiscovery features =
128+
new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
129+
PEER_TAGS, Collections.emptySet());
130+
aggregator =
131+
new ConflatingMetricsAggregator(
132+
new WellKnownTags("", "", "", "", "", ""),
133+
Collections.emptySet(),
134+
features,
135+
HealthMetrics.NO_OP,
136+
new NullSink(),
137+
2048,
138+
2048,
139+
false);
140+
aggregator.start();
141+
}
142+
143+
@TearDown(Level.Trial)
144+
public void teardown() {
145+
if (aggregator != null) {
146+
aggregator.close();
147+
}
148+
}
149+
150+
@Benchmark
151+
public void publishConcurrent(Blackhole blackhole) {
152+
blackhole.consume(aggregator.publish(trace));
153+
}
154+
}
155+
}

dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,28 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) {
4646
return this;
4747
}
4848

49+
/**
50+
* Record a single duration value with embedded tags. Called from the background aggregator thread
51+
* when processing SpanStatsData (no Batch intermediary needed since the aggregation is
52+
* single-threaded).
53+
*/
54+
public void recordDuration(long taggedDuration) {
55+
this.hitCount++;
56+
long duration = taggedDuration;
57+
if ((duration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) {
58+
duration ^= TOP_LEVEL_TAG;
59+
++topLevelCount;
60+
}
61+
if ((duration & ERROR_TAG) == ERROR_TAG) {
62+
duration ^= ERROR_TAG;
63+
errorLatencies.accept(duration);
64+
++errorCount;
65+
} else {
66+
okLatencies.accept(duration);
67+
}
68+
this.duration += duration;
69+
}
70+
4971
public int getErrorCount() {
5072
return errorCount;
5173
}

0 commit comments

Comments
 (0)