diff --git a/dd-trace-core/src/jmh/java/datadog/trace/core/SpanTagBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/core/SpanTagBenchmark.java new file mode 100644 index 00000000000..e1bf0943188 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/core/SpanTagBenchmark.java @@ -0,0 +1,161 @@ +package datadog.trace.core; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@Fork(2) +public class SpanTagBenchmark { + + static final CoreTracer TRACER = CoreTracer.builder().build(); + + @State(Scope.Thread) + public static class SpanPerThread { + AgentSpan span; + + @Setup(Level.Invocation) + public void setup() { + span = TRACER.startSpan("benchmark", "tag-benchmark"); + } + } + + @State(Scope.Benchmark) + public static class SharedSpan { + AgentSpan span; + + @Setup(Level.Iteration) + public void setup() { + span = TRACER.startSpan("benchmark", "tag-benchmark-shared"); + } + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setStringTag_ownerThread(SpanPerThread state) { + state.span.setTag("key", "value"); + return state.span; + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setIntTag_ownerThread(SpanPerThread state) { + state.span.setTag("key", 42); + return state.span; + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setTenTags_ownerThread(SpanPerThread state) { + state.span.setTag("k0", "v0"); + state.span.setTag("k1", "v1"); + state.span.setTag("k2", "v2"); + state.span.setTag("k3", "v3"); + state.span.setTag("k4", "v4"); + state.span.setTag("k5", 5); + state.span.setTag("k6", 6L); + state.span.setTag("k7", 7.0); + state.span.setTag("k8", true); + state.span.setTag("k9", "v9"); + return state.span; + } + + @Benchmark + @Threads(8) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setStringTag_crossThread(SharedSpan state) { + state.span.setTag("key", "value"); + return state.span; + } + + // Multi-threaded owner-thread benchmarks: 8 threads each working on their own span. + // Measures the macro impact of the optimization when many threads tag spans concurrently, + // the realistic web-server scenario. + + @Benchmark + @Threads(8) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setStringTag_ownerThread_8T(SpanPerThread state) { + state.span.setTag("key", "value"); + return state.span; + } + + @Benchmark + @Threads(8) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setIntTag_ownerThread_8T(SpanPerThread state) { + state.span.setTag("key", 42); + return state.span; + } + + @Benchmark + @Threads(8) + @OutputTimeUnit(NANOSECONDS) + public AgentSpan setTenTags_ownerThread_8T(SpanPerThread state) { + state.span.setTag("k0", "v0"); + state.span.setTag("k1", "v1"); + state.span.setTag("k2", "v2"); + state.span.setTag("k3", "v3"); + state.span.setTag("k4", "v4"); + state.span.setTag("k5", 5); + state.span.setTag("k6", 6L); + state.span.setTag("k7", 7.0); + state.span.setTag("k8", true); + state.span.setTag("k9", "v9"); + return state.span; + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(MICROSECONDS) + public AgentSpan fullLifecycle_tenTags(SpanPerThread state) { + state.span.setTag("k0", "v0"); + state.span.setTag("k1", "v1"); + state.span.setTag("k2", "v2"); + state.span.setTag("k3", "v3"); + state.span.setTag("k4", "v4"); + state.span.setTag("k5", 5); + state.span.setTag("k6", 6L); + state.span.setTag("k7", 7.0); + state.span.setTag("k8", true); + state.span.setTag("k9", "v9"); + state.span.finish(); + return state.span; + } + + @Benchmark + @Threads(8) + @OutputTimeUnit(MICROSECONDS) + public AgentSpan fullLifecycle_tenTags_8T(SpanPerThread state) { + state.span.setTag("k0", "v0"); + state.span.setTag("k1", "v1"); + state.span.setTag("k2", "v2"); + state.span.setTag("k3", "v3"); + state.span.setTag("k4", "v4"); + state.span.setTag("k5", 5); + state.span.setTag("k6", 6L); + state.span.setTag("k7", 7.0); + state.span.setTag("k8", true); + state.span.setTag("k9", "v9"); + state.span.finish(); + return state.span; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 2c2e5d9d1f2..a8b262155e7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -155,6 +155,7 @@ public boolean isFinished() { private void finishAndAddToTrace(final long durationNano) { // ensure a min duration of 1 if (DURATION_NANO_UPDATER.compareAndSet(this, 0, Math.max(1, durationNano))) { + context.transitionToShared(); setLongRunningVersion(-this.longRunningVersion); SpanWrapper wrapper = getWrapper(); if (wrapper != null) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index 2b8dbccb738..80cf12d704a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -111,6 +111,10 @@ public class DDSpanContext */ private final TagMap unsafeTags; + void transitionToShared() { + unsafeTags.transitionToShared(); + } + /** The service name is required, otherwise the span are dropped by the agent */ private volatile String serviceName; @@ -336,6 +340,11 @@ public DDSpanContext( final Thread current = Thread.currentThread(); this.threadId = current.getId(); this.threadName = THREAD_NAMES.computeIfAbsent(current.getName(), Functions.UTF8_ENCODE); + // Enable thread-owned tag optimization: skip synchronization when the owner thread writes. + // Disable for long-running spans because the writer thread reads tags on unfinished spans. + if (!traceCollector.longRunningSpansEnabled()) { + this.unsafeTags.setOwnerThread(current); + } this.disableSamplingMechanismValidation = disableSamplingMechanismValidation; this.propagationTags = @@ -600,13 +609,14 @@ public int getSamplingPriority() { } public void setSpanSamplingPriority(double rate, int limit) { - synchronized (unsafeTags) { - unsafeSetTag(SPAN_SAMPLING_MECHANISM_TAG, SamplingMechanism.SPAN_SAMPLING_RATE); - unsafeSetTag(SPAN_SAMPLING_RULE_RATE_TAG, rate); - if (limit != Integer.MAX_VALUE) { - unsafeSetTag(SPAN_SAMPLING_MAX_PER_SECOND_TAG, limit); - } - } + unsafeTags.withLock( + () -> { + unsafeSetTag(SPAN_SAMPLING_MECHANISM_TAG, SamplingMechanism.SPAN_SAMPLING_RATE); + unsafeSetTag(SPAN_SAMPLING_RULE_RATE_TAG, rate); + if (limit != Integer.MAX_VALUE) { + unsafeSetTag(SPAN_SAMPLING_MAX_PER_SECOND_TAG, limit); + } + }); } /** @@ -723,33 +733,23 @@ public void setOrigin(final CharSequence origin) { } public void setMetric(final CharSequence key, final Number value) { - synchronized (unsafeTags) { - unsafeSetTag(key.toString(), value); - } + unsafeSetTag(key.toString(), value); } public void setMetric(final CharSequence key, final int value) { - synchronized (unsafeTags) { - unsafeTags.set(key.toString(), value); - } + unsafeTags.set(key.toString(), value); } public void setMetric(final CharSequence key, final long value) { - synchronized (unsafeTags) { - unsafeTags.set(key.toString(), value); - } + unsafeTags.set(key.toString(), value); } public void setMetric(final CharSequence key, final float value) { - synchronized (unsafeTags) { - unsafeTags.set(key.toString(), value); - } + unsafeTags.set(key.toString(), value); } public void setMetric(final CharSequence key, final double value) { - synchronized (unsafeTags) { - unsafeTags.set(key.toString(), value); - } + unsafeTags.set(key.toString(), value); } public void setMetric(final TagMap.EntryReader entry) { @@ -757,15 +757,11 @@ public void setMetric(final TagMap.EntryReader entry) { return; } - synchronized (unsafeTags) { - unsafeTags.set(entry); - } + unsafeTags.set(entry); } public void removeTag(String tag) { - synchronized (unsafeTags) { - unsafeTags.remove(tag); - } + unsafeTags.remove(tag); } /** @@ -782,13 +778,9 @@ public void setTag(final String tag, final Object value) { return; } if (null == value) { - synchronized (unsafeTags) { - unsafeTags.remove(tag); - } + unsafeTags.remove(tag); } else if (!tagInterceptor.interceptTag(this, tag, value)) { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -797,13 +789,9 @@ public void setTag(final String tag, final String value) { return; } if (null == value) { - synchronized (unsafeTags) { - unsafeTags.remove(tag); - } + unsafeTags.remove(tag); } else if (!tagInterceptor.interceptTag(this, tag, value)) { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -817,9 +805,7 @@ public void setTag(TagMap.EntryReader entry) { precheckIntercept(entry.tag()) && tagInterceptor.interceptTag(this, entry.tag(), entry.objectValue()); if (!intercepted) { - synchronized (unsafeTags) { - unsafeTags.set(entry); - } + unsafeTags.set(entry); } } @@ -849,9 +835,7 @@ private boolean precheckIntercept(String tag) { */ private void setBox(String tag, Object box) { if (!tagInterceptor.interceptTag(this, tag, box)) { - synchronized (unsafeTags) { - unsafeTags.set(tag, box); - } + unsafeTags.set(tag, box); } } @@ -862,9 +846,7 @@ public void setTag(final String tag, final boolean value) { if (precheckIntercept(tag)) { this.setBox(tag, value); } else { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -875,9 +857,7 @@ public void setTag(final String tag, final int value) { if (precheckIntercept(tag)) { this.setBox(tag, value); } else { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -889,9 +869,7 @@ public void setTag(final String tag, final long value) { boolean intercepted = tagInterceptor.needsIntercept(tag) && tagInterceptor.interceptTag(this, tag, value); if (!intercepted) { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -902,9 +880,7 @@ public void setTag(final String tag, final float value) { if (precheckIntercept(tag)) { this.setBox(tag, value); } else { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -915,9 +891,7 @@ public void setTag(final String tag, final double value) { if (precheckIntercept(tag)) { this.setBox(tag, value); } else { - synchronized (unsafeTags) { - unsafeTags.set(tag, value); - } + unsafeTags.set(tag, value); } } @@ -930,25 +904,26 @@ void setAllTags(final TagMap map, boolean needsIntercept) { return; } - synchronized (unsafeTags) { - if (needsIntercept) { - // forEach out-performs the iterator of TagMap - // Taking advantage of ability to pass through other context arguments - // to avoid using a capturing lambda - map.forEach( - this, - (ctx, tagEntry) -> { - String tag = tagEntry.tag(); - Object value = tagEntry.objectValue(); - - if (!ctx.tagInterceptor.interceptTag(ctx, tag, value)) { - ctx.unsafeTags.set(tagEntry); - } - }); - } else { - unsafeTags.putAll(map); - } - } + unsafeTags.withLock( + () -> { + if (needsIntercept) { + // forEach out-performs the iterator of TagMap + // Taking advantage of ability to pass through other context arguments + // to avoid using a capturing lambda + map.forEach( + this, + (ctx, tagEntry) -> { + String tag = tagEntry.tag(); + Object value = tagEntry.objectValue(); + + if (!ctx.tagInterceptor.interceptTag(ctx, tag, value)) { + ctx.unsafeTags.set(tagEntry); + } + }); + } else { + unsafeTags.putAll(map); + } + }); } void setAllTags(final TagMap.Ledger ledger) { @@ -956,22 +931,23 @@ void setAllTags(final TagMap.Ledger ledger) { return; } - synchronized (unsafeTags) { - for (final TagMap.EntryChange entryChange : ledger) { - if (entryChange.isRemoval()) { - unsafeTags.remove(entryChange.tag()); - } else { - TagMap.Entry entry = (TagMap.Entry) entryChange; + unsafeTags.withLock( + () -> { + for (final TagMap.EntryChange entryChange : ledger) { + if (entryChange.isRemoval()) { + unsafeTags.remove(entryChange.tag()); + } else { + TagMap.Entry entry = (TagMap.Entry) entryChange; - String tag = entry.tag(); - Object value = entry.objectValue(); + String tag = entry.tag(); + Object value = entry.objectValue(); - if (!tagInterceptor.interceptTag(this, tag, value)) { - unsafeTags.set(entry); + if (!tagInterceptor.interceptTag(this, tag, value)) { + unsafeTags.set(entry); + } + } } - } - } - } + }); } void setAllTags(final Map map) { @@ -980,13 +956,14 @@ void setAllTags(final Map map) { } else if (map instanceof TagMap) { setAllTags((TagMap) map); } else if (!map.isEmpty()) { - synchronized (unsafeTags) { - for (final Map.Entry tag : map.entrySet()) { - if (!tagInterceptor.interceptTag(this, tag.getKey(), tag.getValue())) { - unsafeSetTag(tag.getKey(), tag.getValue()); - } - } - } + unsafeTags.withLock( + () -> { + for (final Map.Entry tag : map.entrySet()) { + if (!tagInterceptor.interceptTag(this, tag.getKey(), tag.getValue())) { + unsafeSetTag(tag.getKey(), tag.getValue()); + } + } + }); } } @@ -1016,10 +993,7 @@ Object getTag(final String key) { case Tags.HTTP_STATUS: return 0 == httpStatusCode ? null : (int) httpStatusCode; default: - Object value; - synchronized (unsafeTags) { - value = unsafeGetTag(key); - } + Object value = unsafeGetTag(key); // maintain previously observable type of http url :| return value == null ? null : Tags.HTTP_URL.equals(key) ? value.toString() : value; } @@ -1038,25 +1012,26 @@ public Object unsafeGetTag(final String tag) { @Deprecated public TagMap getTags() { - synchronized (unsafeTags) { - TagMap tags = unsafeTags.copy(); - - tags.put(DDTags.THREAD_ID, threadId); - // maintain previously observable type of the thread name :| - tags.put(DDTags.THREAD_NAME, threadName.toString()); - if (samplingPriority != PrioritySampling.UNSET) { - tags.put(SAMPLE_RATE_KEY, samplingPriority); - } - if (httpStatusCode != 0) { - tags.put(Tags.HTTP_STATUS, (int) httpStatusCode); - } - // maintain previously observable type of http url :| - Object value = tags.get(Tags.HTTP_URL); - if (value != null) { - tags.put(Tags.HTTP_URL, value.toString()); - } - return tags.freeze(); - } + return unsafeTags.withLock( + () -> { + TagMap tags = unsafeTags.copy(); + + tags.put(DDTags.THREAD_ID, threadId); + // maintain previously observable type of the thread name :| + tags.put(DDTags.THREAD_NAME, threadName.toString()); + if (samplingPriority != PrioritySampling.UNSET) { + tags.put(SAMPLE_RATE_KEY, samplingPriority); + } + if (httpStatusCode != 0) { + tags.put(Tags.HTTP_STATUS, (int) httpStatusCode); + } + // maintain previously observable type of http url :| + Object value = tags.get(Tags.HTTP_URL); + if (value != null) { + tags.put(Tags.HTTP_URL, value.toString()); + } + return tags.freeze(); + }); } /** @@ -1088,9 +1063,8 @@ public void setMetaStruct(final String field, final T value) { } void earlyProcessTags(AppendableSpanLinks links) { - synchronized (unsafeTags) { - TagsPostProcessorFactory.eagerProcessor().processTags(unsafeTags, this, links); - } + unsafeTags.withLock( + () -> TagsPostProcessorFactory.eagerProcessor().processTags(unsafeTags, this, links)); } void processTagsAndBaggage( @@ -1100,41 +1074,44 @@ void processTagsAndBaggage( // This is a compromise to avoid... // - creating an extra wrapper object that would create significant allocation // - implementing an interface to read the spans that require making the read method public - synchronized (unsafeTags) { - // Tags - TagsPostProcessorFactory.lazyProcessor().processTags(unsafeTags, this, restrictedSpan); - - String linksTag = DDSpanLink.toTag(restrictedSpan.getLinks()); - if (linksTag != null) { - unsafeTags.put(SPAN_LINKS, linksTag); - } - // Baggage - Map baggageItemsWithPropagationTags; - if (injectBaggageAsTags) { - baggageItemsWithPropagationTags = new HashMap<>(baggageItems); - if (w3cBaggage != null) { - injectW3CBaggageTags(baggageItemsWithPropagationTags); - } - propagationTags.fillTagMap(baggageItemsWithPropagationTags); - } else { - baggageItemsWithPropagationTags = propagationTags.createTagMap(); - } + unsafeTags.withLock( + () -> { + // Tags + TagsPostProcessorFactory.lazyProcessor().processTags(unsafeTags, this, restrictedSpan); + + String linksTag = DDSpanLink.toTag(restrictedSpan.getLinks()); + if (linksTag != null) { + unsafeTags.put(SPAN_LINKS, linksTag); + } + // Baggage + Map baggageItemsWithPropagationTags; + if (injectBaggageAsTags) { + baggageItemsWithPropagationTags = new HashMap<>(baggageItems); + if (w3cBaggage != null) { + injectW3CBaggageTags(baggageItemsWithPropagationTags); + } + propagationTags.fillTagMap(baggageItemsWithPropagationTags); + } else { + baggageItemsWithPropagationTags = propagationTags.createTagMap(); + } - consumer.accept( - new Metadata( - threadId, - threadName, - unsafeTags, - baggageItemsWithPropagationTags, - samplingPriority != PrioritySampling.UNSET ? samplingPriority : getSamplingPriority(), - measured, - topLevel, - httpStatusCode == 0 ? null : HTTP_STATUSES.get(httpStatusCode), - // Get origin from rootSpan.context - getOrigin(), - longRunningVersion, - ProcessTags.getTagsForSerialization())); - } + consumer.accept( + new Metadata( + threadId, + threadName, + unsafeTags, + baggageItemsWithPropagationTags, + samplingPriority != PrioritySampling.UNSET + ? samplingPriority + : getSamplingPriority(), + measured, + topLevel, + httpStatusCode == 0 ? null : HTTP_STATUSES.get(httpStatusCode), + // Get origin from rootSpan.context + getOrigin(), + longRunningVersion, + ProcessTags.getTagsForSerialization())); + }); } void injectW3CBaggageTags(Map baggageItemsWithPropagationTags) { @@ -1190,9 +1167,7 @@ public String toString() { s.append(" *measured*"); } - synchronized (unsafeTags) { - s.append(" tags=").append(new TreeMap<>(getTags())); - } + s.append(" tags=").append(new TreeMap<>(getTags())); return s.toString(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java index ad872164718..9082afd92de 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java @@ -171,6 +171,11 @@ public long getCurrentTimeNano() { return tracer.getTimeWithNanoTicks(nanoTicks); } + @Override + boolean longRunningSpansEnabled() { + return pendingTraceBuffer.longRunningSpansEnabled(); + } + @Override void touch() { LAST_REFERENCED.lazySet(this, timeSource.getNanoTicks()); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/StreamingTraceCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/StreamingTraceCollector.java index e886682f05b..de9e44a95ea 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/StreamingTraceCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/StreamingTraceCollector.java @@ -48,6 +48,11 @@ private StreamingTraceCollector( this.healthMetrics = healthMetrics; } + @Override + boolean longRunningSpansEnabled() { + return false; + } + @Override void touch() { // do nothing diff --git a/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java index 777fc3889bf..0f8621d9589 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java @@ -96,6 +96,8 @@ long getEndToEndStartTime() { return endToEndStartTime; } + abstract boolean longRunningSpansEnabled(); + abstract void touch(); abstract void registerSpan(final DDSpan span); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/DDSpanContextConcurrencyTest.java b/dd-trace-core/src/test/java/datadog/trace/core/DDSpanContextConcurrencyTest.java new file mode 100644 index 00000000000..abd485c2530 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/DDSpanContextConcurrencyTest.java @@ -0,0 +1,757 @@ +package datadog.trace.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +public class DDSpanContextConcurrencyTest { + + static final CoreTracer TRACER = CoreTracer.builder().build(); + + @AfterAll + static void cleanup() { + TRACER.close(); + } + + @Test + @DisplayName("Owner thread: set 1000 tags, finish, verify all present") + void ownerThreadVisibility() { + AgentSpan span = TRACER.startSpan("test", "ownerVisibility"); + + for (int i = 0; i < 1000; i++) { + span.setTag("key" + i, "value" + i); + } + span.finish(); + + for (int i = 0; i < 1000; i++) { + assertEquals("value" + i, span.getTag("key" + i), "Tag key" + i + " missing after finish"); + } + } + + @Test + @DisplayName("Cross-thread tag write: Thread B sets tags on Thread A's span") + void crossThreadTagWrite() throws Exception { + AgentSpan span = TRACER.startSpan("test", "crossThread"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch done = new CountDownLatch(1); + + executor.submit( + () -> { + for (int i = 0; i < 100; i++) { + span.setTag("cross" + i, "val" + i); + } + done.countDown(); + }); + + done.await(5, TimeUnit.SECONDS); + span.finish(); + + for (int i = 0; i < 100; i++) { + assertEquals("val" + i, span.getTag("cross" + i), "Cross-thread tag cross" + i + " missing"); + } + executor.shutdown(); + } + + @Test + @DisplayName("Post-finish tag write: tags set after finish() are visible") + void postFinishTagWrite() { + AgentSpan span = TRACER.startSpan("test", "postFinish"); + + span.setTag("before", "yes"); + span.finish(); + span.setTag("after", "yes"); + + assertEquals("yes", span.getTag("before")); + assertEquals("yes", span.getTag("after")); + } + + @Test + @DisplayName("Stress test: 8 threads writing tags concurrently — no structural corruption") + void stressTestNoCrash() throws Exception { + AgentSpan span = TRACER.startSpan("test", "stress"); + int numThreads = 8; + int tagsPerThread = 500; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < tagsPerThread; i++) { + span.setTag("t" + threadIdx + "_k" + i, "v" + i); + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + + span.finish(); + executor.shutdown(); + + // No crashes or NPEs — tags may be lost due to races, but no structural corruption + assertEquals(0, errors.size(), "Unexpected errors: " + errors); + // Verify at least some tags are present (the map wasn't corrupted) + assertNotNull(span.getTag("t0_k0")); + } + + @Test + @DisplayName("Tag removal via null value on owner thread") + void tagRemovalOwnerThread() { + AgentSpan span = TRACER.startSpan("test", "removal"); + + span.setTag("toRemove", "present"); + assertEquals("present", span.getTag("toRemove")); + + span.setTag("toRemove", (String) null); + assertNull(span.getTag("toRemove")); + + span.finish(); + } + + @Test + @DisplayName("Mixed metrics and tags on owner thread") + void mixedMetricsAndTags() { + AgentSpan span = TRACER.startSpan("test", "mixed"); + + span.setTag("str", "hello"); + span.setTag("bool", true); + span.setTag("int", 42); + span.setTag("long", 100L); + span.setTag("float", 3.14f); + span.setTag("double", 2.718); + span.setMetric("metric_int", 99); + span.setMetric("metric_double", 1.5); + + assertEquals("hello", span.getTag("str")); + assertEquals(true, span.getTag("bool")); + assertEquals(42, span.getTag("int")); + assertEquals(100L, span.getTag("long")); + assertEquals(3.14f, span.getTag("float")); + assertEquals(2.718, span.getTag("double")); + assertEquals(99, span.getTag("metric_int")); + assertEquals(1.5, span.getTag("metric_double")); + + span.finish(); + } + + /** + * Reproduces the exact benchmark pattern: one span created by thread A, 8 threads calling setTag + * concurrently. This is the pattern that the JMH crossThread benchmark uses, except the benchmark + * failed because of a race in the JMH harness (SharedSpan.setup with Level.Invocation + 8 + * threads), not in the production code. + * + *

This test proves the production code handles this pattern without NPE or structural + * corruption. + */ + @Test + @DisplayName( + "Cross-thread sustained: 8 threads setTag on same span for 10k iterations — no crash") + void crossThreadSustainedNoCrash() throws Exception { + int numThreads = 8; + int iterationsPerThread = 10_000; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + // Create span on main thread, then hand it to 8 other threads + AgentSpan span = TRACER.startSpan("test", "crossSustained"); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < iterationsPerThread; i++) { + span.setTag("key", "value"); + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + span.finish(); + executor.shutdown(); + + assertEquals(0, errors.size(), "Unexpected errors during cross-thread setTag: " + errors); + assertEquals("value", span.getTag("key")); + } + + /** + * Tests the transition from owner-thread to shared mode under concurrent writes: the creating + * thread sets tags first, then 8 other threads join in. This exercises the owner → shared + * transition while writes are in flight. + */ + @Test + @DisplayName("Owner-to-shared transition under concurrent writes — no crash, all tags present") + void ownerToSharedTransition() throws Exception { + int numThreads = 8; + int tagsPerThread = 1_000; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + AgentSpan span = TRACER.startSpan("test", "ownerToShared"); + + // Owner thread writes first batch — these are on the fast path (no lock) + for (int i = 0; i < 100; i++) { + span.setTag("owner_" + i, "val_" + i); + } + + // Now launch 8 threads that also write — these trigger the transition to shared mode + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < tagsPerThread; i++) { + span.setTag("thread" + threadIdx + "_" + i, "v" + i); + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + span.finish(); + executor.shutdown(); + + assertEquals(0, errors.size(), "Unexpected errors during transition: " + errors); + + // Owner-thread tags should all be present — they were written before any contention + for (int i = 0; i < 100; i++) { + assertEquals("val_" + i, span.getTag("owner_" + i), "Owner tag owner_" + i + " missing"); + } + + // Each thread's last write should be visible (earlier writes may be overwritten by races) + for (int t = 0; t < numThreads; t++) { + assertNotNull( + span.getTag("thread" + t + "_" + (tagsPerThread - 1)), + "Thread " + t + " last tag missing"); + } + } + + /** + * Exercises many short-lived spans created on one thread and tagged from another — the exact + * pattern the crossThread benchmark was trying to measure. Uses a stable handoff (CountDownLatch) + * instead of the racy JMH setup. + */ + @Test + @DisplayName("Many short spans tagged cross-thread — no NPE or crash") + void manySpansCrossThread() throws Exception { + int numSpans = 10_000; + ExecutorService tagger = Executors.newFixedThreadPool(8); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + for (int s = 0; s < numSpans; s++) { + AgentSpan span = TRACER.startSpan("test", "manyShort"); + CountDownLatch tagged = new CountDownLatch(8); + + for (int t = 0; t < 8; t++) { + tagger.submit( + () -> { + try { + span.setTag("key", "value"); + } catch (Throwable e) { + errors.add(e); + } finally { + tagged.countDown(); + } + }); + } + + tagged.await(5, TimeUnit.SECONDS); + span.finish(); + } + + tagger.shutdown(); + tagger.awaitTermination(10, TimeUnit.SECONDS); + assertEquals(0, errors.size(), "Errors during cross-thread tagging of short spans: " + errors); + } + + /** + * Concurrent reads and writes from multiple threads on the same span. Readers call getTag while + * writers call setTag, exercising the read path under contention. + */ + @Test + @DisplayName("Mixed concurrent reads and writes — no crash, readers see consistent values") + void concurrentReadsAndWrites() throws Exception { + int numWriters = 4; + int numReaders = 4; + int iterations = 5_000; + ExecutorService executor = Executors.newFixedThreadPool(numWriters + numReaders); + CyclicBarrier barrier = new CyclicBarrier(numWriters + numReaders); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + AtomicBoolean done = new AtomicBoolean(false); + + AgentSpan span = TRACER.startSpan("test", "readWrite"); + // Pre-populate with initial values so readers always have something + for (int i = 0; i < 50; i++) { + span.setTag("shared_" + i, "init"); + } + + List> futures = new ArrayList<>(); + + // Writers: overwrite shared keys with thread-specific values + for (int w = 0; w < numWriters; w++) { + final int writerIdx = w; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < iterations; i++) { + int key = i % 50; + span.setTag("shared_" + key, "w" + writerIdx + "_" + i); + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + // Readers: read tags concurrently with writes + for (int r = 0; r < numReaders; r++) { + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + while (!done.get()) { + for (int i = 0; i < 50; i++) { + Object val = span.getTag("shared_" + i); + // Value should be non-null (either "init" or a writer's value) + assertNotNull(val, "Tag shared_" + i + " was null during concurrent read"); + } + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + // Wait for writers to finish, then signal readers to stop + for (int i = 0; i < numWriters; i++) { + futures.get(i).get(30, TimeUnit.SECONDS); + } + done.set(true); + for (int i = numWriters; i < futures.size(); i++) { + futures.get(i).get(5, TimeUnit.SECONDS); + } + + span.finish(); + executor.shutdown(); + + assertEquals(0, errors.size(), "Errors during concurrent reads/writes: " + errors); + } + + /** + * Randomized fuzz test: each thread performs a random mix of setTag (String, int, double, + * boolean), setMetric, getTag, and removeTag operations. Repeated 10 times for coverage. + */ + @RepeatedTest(10) + @DisplayName("Fuzz test: randomized tag operations from 8 threads — no crash") + void fuzzRandomizedOperations() throws Exception { + int numThreads = 8; + int opsPerThread = 2_000; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + AgentSpan span = TRACER.startSpan("test", "fuzz"); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + ThreadLocalRandom rng = ThreadLocalRandom.current(); + for (int i = 0; i < opsPerThread; i++) { + String key = "fuzz_" + rng.nextInt(100); + switch (rng.nextInt(7)) { + case 0: + span.setTag(key, "str_" + i); + break; + case 1: + span.setTag(key, rng.nextInt()); + break; + case 2: + span.setTag(key, rng.nextDouble()); + break; + case 3: + span.setTag(key, rng.nextBoolean()); + break; + case 4: + span.setMetric(key, rng.nextInt(1000)); + break; + case 5: + span.getTag(key); + break; + case 6: + span.setTag(key, (String) null); + break; + } + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + span.finish(); + executor.shutdown(); + + assertEquals(0, errors.size(), "Errors during fuzz test: " + errors); + } + + /** + * Verifies that tag values written by a single thread are never corrupted (torn). Each writer + * thread owns unique keys and writes values with a recognizable pattern. After all writes + * complete, we verify every key holds a value from the correct writer thread. + */ + @Test + @DisplayName("Value consistency: per-thread keys retain correct writer identity") + void valueConsistencyPerThreadKeys() throws Exception { + int numThreads = 8; + int keysPerThread = 100; + int writesPerKey = 100; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + AgentSpan span = TRACER.startSpan("test", "consistency"); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int w = 0; w < writesPerKey; w++) { + for (int k = 0; k < keysPerThread; k++) { + span.setTag("t" + threadIdx + "_k" + k, "t" + threadIdx + "_v" + w); + } + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + span.finish(); + executor.shutdown(); + + assertEquals(0, errors.size(), "Errors during consistency test: " + errors); + + // Each key should contain a value from the correct thread (pattern: "tN_vM") + for (int t = 0; t < numThreads; t++) { + for (int k = 0; k < keysPerThread; k++) { + Object val = span.getTag("t" + t + "_k" + k); + assertNotNull(val, "Tag t" + t + "_k" + k + " was null"); + assertTrue( + val.toString().startsWith("t" + t + "_"), + "Tag t" + t + "_k" + k + " has wrong writer: " + val); + } + } + } + + /** + * Exercises the owner→shared transition at the exact moment of finish(). The owner thread writes + * tags and then finishes, while other threads attempt to write simultaneously. This tests that + * transitionToShared() in finish() correctly makes post-finish writes visible. + */ + @RepeatedTest(50) + @DisplayName("Race: finish() concurrent with cross-thread writes — no crash") + void finishRacesWithCrossThreadWrites() throws Exception { + int numThreads = 4; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); // +1 for the owner/finisher + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + AgentSpan span = TRACER.startSpan("test", "finishRace"); + span.setTag("owner_tag", "present"); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < 100; i++) { + span.setTag("race_" + threadIdx + "_" + i, "val"); + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + // Owner thread joins the barrier and finishes concurrently + barrier.await(5, TimeUnit.SECONDS); + span.finish(); + + for (Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertEquals(0, errors.size(), "Errors during finish race: " + errors); + assertEquals("present", span.getTag("owner_tag")); + } + + /** + * Concurrent setMetric calls from multiple threads, verifying that metric values (int, long, + * float, double) are not corrupted. + */ + @Test + @DisplayName("Concurrent setMetric from 8 threads — no crash, values present") + void concurrentSetMetric() throws Exception { + int numThreads = 8; + int metricsPerThread = 500; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + AgentSpan span = TRACER.startSpan("test", "metrics"); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < metricsPerThread; i++) { + String key = "m_t" + threadIdx + "_" + i; + switch (i % 4) { + case 0: + ((DDSpan) span).context().setMetric(key, i); + break; + case 1: + ((DDSpan) span).context().setMetric(key, (long) i); + break; + case 2: + ((DDSpan) span).context().setMetric(key, (float) i); + break; + case 3: + ((DDSpan) span).context().setMetric(key, (double) i); + break; + } + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + span.finish(); + executor.shutdown(); + + assertEquals(0, errors.size(), "Errors during concurrent setMetric: " + errors); + // Spot-check: each thread's last metric should be present + for (int t = 0; t < numThreads; t++) { + assertNotNull( + span.getTag("m_t" + t + "_" + (metricsPerThread - 1)), + "Thread " + t + " last metric missing"); + } + } + + /** + * Verifies compound atomicity of setSpanSamplingPriority: the three sampling tags must always + * appear together (all or none) when observed via getTags(). + */ + @RepeatedTest(10) + @DisplayName("setSpanSamplingPriority atomicity: 3 tags always consistent in getTags()") + void spanSamplingPriorityAtomicity() throws Exception { + AgentSpan span = TRACER.startSpan("test", "samplingAtomicity"); + DDSpanContext ctx = ((DDSpan) span).context(); + // Transition to shared so the compound lock is in effect + ctx.transitionToShared(); + + int iterations = 2000; + AtomicBoolean running = new AtomicBoolean(true); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + // Writer: repeatedly sets span sampling priority + Thread writer = + new Thread( + () -> { + try { + for (int i = 0; i < iterations; i++) { + ctx.setSpanSamplingPriority(0.5, 100); + } + } catch (Throwable e) { + errors.add(e); + } finally { + running.set(false); + } + }); + + // Reader: checks that the 3 sampling tags are consistent + Thread reader = + new Thread( + () -> { + try { + while (running.get()) { + datadog.trace.api.TagMap tags = ctx.getTags(); + Object mechanism = tags.getObject("_dd.span_sampling.mechanism"); + Object ruleRate = tags.getObject("_dd.span_sampling.rule_rate"); + Object maxPerSecond = tags.getObject("_dd.span_sampling.max_per_second"); + + // Either all three are present or none + if (mechanism != null || ruleRate != null || maxPerSecond != null) { + if (mechanism == null || ruleRate == null || maxPerSecond == null) { + errors.add( + new AssertionError( + "Partial sampling tags: mechanism=" + + mechanism + + " ruleRate=" + + ruleRate + + " maxPerSecond=" + + maxPerSecond)); + } + } + } + } catch (Throwable e) { + errors.add(e); + } + }); + + writer.start(); + reader.start(); + writer.join(30_000); + reader.join(5_000); + span.finish(); + + assertEquals(0, errors.size(), "Atomicity violated: " + errors); + } + + /** + * Verifies that getTags() returns a consistent snapshot: THREAD_ID and THREAD_NAME are always + * present and consistent with the span's owning thread. + */ + @Test + @DisplayName("getTags() snapshot consistency: virtual fields always present") + void getTagsSnapshotConsistency() throws Exception { + AgentSpan span = TRACER.startSpan("test", "getTagsConsistency"); + + span.setTag("user_tag", "hello"); + span.finish(); + + datadog.trace.api.TagMap tags = ((DDSpan) span).context().getTags(); + assertNotNull(tags.getObject("thread.id"), "THREAD_ID missing from getTags()"); + assertNotNull(tags.getObject("thread.name"), "THREAD_NAME missing from getTags()"); + assertEquals("hello", tags.getObject("user_tag")); + } + + /** + * Exercises the transition window: owner tags, finish (triggers transitionToShared), then + * concurrent readers verify tags are visible post-transition. + */ + @RepeatedTest(5) + @DisplayName("Transition + concurrent read: tags written before finish visible after") + void transitionAndConcurrentRead() throws Exception { + int numReaders = 4; + int tagsToWrite = 200; + AgentSpan span = TRACER.startSpan("test", "transitionRead"); + + // Owner thread writes tags + for (int i = 0; i < tagsToWrite; i++) { + span.setTag("pre_" + i, "val_" + i); + } + + // Finish triggers transitionToShared + span.finish(); + + // Multiple reader threads verify all tags are visible + CyclicBarrier barrier = new CyclicBarrier(numReaders); + ExecutorService executor = Executors.newFixedThreadPool(numReaders); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + List> futures = new ArrayList<>(); + for (int r = 0; r < numReaders; r++) { + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < tagsToWrite; i++) { + Object val = span.getTag("pre_" + i); + if (!"val_".concat(String.valueOf(i)).equals(val)) { + errors.add( + new AssertionError( + "Tag pre_" + i + " expected val_" + i + " got " + val)); + } + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(10, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertEquals(0, errors.size(), "Tags not visible after transition: " + errors); + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/TagMap.java b/internal-api/src/main/java/datadog/trace/api/TagMap.java index 95f676245ef..7c24692b908 100644 --- a/internal-api/src/main/java/datadog/trace/api/TagMap.java +++ b/internal-api/src/main/java/datadog/trace/api/TagMap.java @@ -1,6 +1,7 @@ package datadog.trace.api; import datadog.trace.api.function.TriConsumer; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Arrays; @@ -15,6 +16,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -279,6 +281,22 @@ void forEach( /** Checks if the TagMap is writable - if not throws {@link IllegalStateException} */ void checkWriteAccess(); + /** Sets the thread that owns this TagMap for lock-free access. */ + default void setOwnerThread(Thread thread) {} + + /** Transitions this TagMap to shared mode, requiring synchronization for all future access. */ + default void transitionToShared() {} + + /** Execute op while holding the tag map's internal lock. Use for compound operations. */ + default void withLock(Runnable op) { + op.run(); + } + + /** Execute op while holding the tag map's internal lock. Use for compound operations. */ + default T withLock(Supplier op) { + return op.get(); + } + abstract class EntryChange { public static final EntryRemoval newRemoval(String tag) { return new EntryRemoval(tag); @@ -1250,6 +1268,29 @@ public LegacyTagMap empty() { * However as a precaution if a BucketGroup becomes completely empty, then that BucketGroup will be * removed from the collision chain. */ +/** + * Thread-safe hash map optimized for span tags. + * + *

Supports an owner-thread optimization: when {@link #setOwnerThread(Thread)} is called, the + * designated thread can read and write without acquiring the monitor. All other threads use {@code + * synchronized(this)}. On the first non-owner access, {@code ownerThread} is set to {@code null}, + * permanently disabling the fast path. + * + *

There is a narrow TOCTOU window during transition: the owner thread may be inside an impl + * method (lock-free) when a non-owner thread enters {@code synchronized(this)} and nulls {@code + * ownerThread}. JVM reference writes are atomic (JLS 17.7), so the worst case is a lost update, not + * structural corruption. The window closes on the owner thread's next volatile read of {@code + * ownerThread}. + * + *

For compound operations (multiple reads/writes that must be atomic), callers should use {@link + * #withLock(Runnable)} which always acquires the monitor. Inner per-operation calls are reentrant + * on the same monitor and add zero overhead. + */ +@SuppressFBWarnings( + value = {"AT_NONATOMIC_OPERATIONS_ON_SHARED_VARIABLE", "AT_STALE_THREAD_WRITE_OF_PRIMITIVE"}, + justification = + "Owner-thread fast path: *Impl methods run either on the owner thread (no lock needed)" + + " or inside synchronized(this) via reentrant calls. See class javadoc.") final class OptimizedTagMap implements TagMap { // Using special constructor that creates a frozen view of an existing array // Bucket calculation requires that array length is a power of 2 @@ -1259,6 +1300,7 @@ final class OptimizedTagMap implements TagMap { private final Object[] buckets; private int size; private boolean frozen; + private volatile Thread ownerThread; public OptimizedTagMap() { // needs to be a power of 2 for bucket masking calculation to work as intended @@ -1281,12 +1323,24 @@ public boolean isOptimized() { @Override public int size() { - return this.size; + if (isOwnerThread()) { + return this.size; + } + synchronized (this) { + revokeOwnership(); + return this.size; + } } @Override public boolean isEmpty() { - return (this.size == 0); + if (isOwnerThread()) { + return (this.size == 0); + } + synchronized (this) { + revokeOwnership(); + return (this.size == 0); + } } @Deprecated @@ -1363,9 +1417,29 @@ public boolean containsKey(Object key) { @Override public boolean containsValue(Object value) { - // This could be optimized - but probably isn't called enough to be worth it - for (EntryReader entryReader : this) { - if (entryReader.objectValue().equals(value)) return true; + if (isOwnerThread()) { + return containsValueImpl(value); + } + synchronized (this) { + revokeOwnership(); + return containsValueImpl(value); + } + } + + private boolean containsValueImpl(Object value) { + Object[] thisBuckets = this.buckets; + for (int i = 0; i < thisBuckets.length; ++i) { + Object thisBucket = thisBuckets[i]; + if (thisBucket instanceof Entry) { + if (java.util.Objects.equals(((Entry) thisBucket).objectValue(), value)) return true; + } else if (thisBucket instanceof BucketGroup) { + for (BucketGroup g = (BucketGroup) thisBucket; g != null; g = g.prev) { + for (int j = 0; j < BucketGroup.LEN; ++j) { + Entry e = g._entryAt(j); + if (e != null && java.util.Objects.equals(e.objectValue(), value)) return true; + } + } + } } return false; } @@ -1397,6 +1471,16 @@ public Set> entrySet() { @Override public Entry getEntry(String tag) { + if (isOwnerThread()) { + return getEntryImpl(tag); + } + synchronized (this) { + revokeOwnership(); + return getEntryImpl(tag); + } + } + + private Entry getEntryImpl(String tag) { Object[] thisBuckets = this.buckets; int hash = TagMap.Entry._hash(tag); @@ -1467,7 +1551,16 @@ public void set(String tag, double value) { @Override public Entry getAndSet(Entry newEntry) { this.checkWriteAccess(); + if (isOwnerThread()) { + return getAndSetImpl(newEntry); + } + synchronized (this) { + revokeOwnership(); + return getAndSetImpl(newEntry); + } + } + private Entry getAndSetImpl(Entry newEntry) { Object[] thisBuckets = this.buckets; int newHash = newEntry.hash(); @@ -1550,7 +1643,17 @@ public TagMap.Entry getAndSet(String tag, double value) { public void putAll(Map map) { this.checkWriteAccess(); + if (isOwnerThread()) { + putAllImpl(map); + return; + } + synchronized (this) { + revokeOwnership(); + putAllImpl(map); + } + } + private void putAllImpl(Map map) { if (map instanceof OptimizedTagMap) { this.putAllOptimizedMap((OptimizedTagMap) map); } else { @@ -1574,7 +1677,17 @@ private void putAllUnoptimizedMap(Map that) */ public void putAll(TagMap that) { this.checkWriteAccess(); + if (isOwnerThread()) { + putAllTagMapImpl(that); + return; + } + synchronized (this) { + revokeOwnership(); + putAllTagMapImpl(that); + } + } + private void putAllTagMapImpl(TagMap that) { if (that instanceof OptimizedTagMap) { this.putAllOptimizedMap((OptimizedTagMap) that); } else { @@ -1732,6 +1845,17 @@ private void putAllIntoEmptyMap(OptimizedTagMap that) { } public void fillMap(Map map) { + if (isOwnerThread()) { + fillMapImpl(map); + return; + } + synchronized (this) { + revokeOwnership(); + fillMapImpl(map); + } + } + + private void fillMapImpl(Map map) { Object[] thisBuckets = this.buckets; for (int i = 0; i < thisBuckets.length; ++i) { @@ -1750,6 +1874,17 @@ public void fillMap(Map map) { } public void fillStringMap(Map stringMap) { + if (isOwnerThread()) { + fillStringMapImpl(stringMap); + return; + } + synchronized (this) { + revokeOwnership(); + fillStringMapImpl(stringMap); + } + } + + private void fillStringMapImpl(Map stringMap) { Object[] thisBuckets = this.buckets; for (int i = 0; i < thisBuckets.length; ++i) { @@ -1782,7 +1917,16 @@ public boolean remove(String tag) { @Override public Entry getAndRemove(String tag) { this.checkWriteAccess(); + if (isOwnerThread()) { + return getAndRemoveImpl(tag); + } + synchronized (this) { + revokeOwnership(); + return getAndRemoveImpl(tag); + } + } + private Entry getAndRemoveImpl(String tag) { Object[] thisBuckets = this.buckets; int hash = TagMap.Entry._hash(tag); @@ -1821,16 +1965,38 @@ public Entry getAndRemove(String tag) { @Override public TagMap copy() { + if (isOwnerThread()) { + return copyImpl(); + } + synchronized (this) { + revokeOwnership(); + return copyImpl(); + } + } + + private TagMap copyImpl() { OptimizedTagMap copy = new OptimizedTagMap(); copy.putAllIntoEmptyMap(this); return copy; } public TagMap immutableCopy() { + if (isOwnerThread()) { + return immutableCopyImpl(); + } + synchronized (this) { + revokeOwnership(); + return immutableCopyImpl(); + } + } + + private TagMap immutableCopyImpl() { if (this.frozen) { return this; } else { - return this.copy().freeze(); + OptimizedTagMap copy = new OptimizedTagMap(); + copy.putAllIntoEmptyMap(this); + return copy.freeze(); } } @@ -1846,6 +2012,17 @@ public Stream stream() { @Override public void forEach(Consumer consumer) { + if (isOwnerThread()) { + forEachImpl(consumer); + return; + } + synchronized (this) { + revokeOwnership(); + forEachImpl(consumer); + } + } + + private void forEachImpl(Consumer consumer) { Object[] thisBuckets = this.buckets; for (int i = 0; i < thisBuckets.length; ++i) { @@ -1865,6 +2042,17 @@ public void forEach(Consumer consumer) { @Override public void forEach(T thisObj, BiConsumer consumer) { + if (isOwnerThread()) { + forEachImpl(thisObj, consumer); + return; + } + synchronized (this) { + revokeOwnership(); + forEachImpl(thisObj, consumer); + } + } + + private void forEachImpl(T thisObj, BiConsumer consumer) { Object[] thisBuckets = this.buckets; for (int i = 0; i < thisBuckets.length; ++i) { @@ -1885,6 +2073,18 @@ public void forEach(T thisObj, BiConsumer con @Override public void forEach( T thisObj, U otherObj, TriConsumer consumer) { + if (isOwnerThread()) { + forEachImpl(thisObj, otherObj, consumer); + return; + } + synchronized (this) { + revokeOwnership(); + forEachImpl(thisObj, otherObj, consumer); + } + } + + private void forEachImpl( + T thisObj, U otherObj, TriConsumer consumer) { Object[] thisBuckets = this.buckets; for (int i = 0; i < thisBuckets.length; ++i) { @@ -1904,19 +2104,81 @@ public void forEach( public void clear() { this.checkWriteAccess(); + if (isOwnerThread()) { + clearImpl(); + return; + } + synchronized (this) { + revokeOwnership(); + clearImpl(); + } + } + private void clearImpl() { Arrays.fill(this.buckets, null); this.size = 0; } - public OptimizedTagMap freeze() { - this.frozen = true; + @Override + public void setOwnerThread(Thread thread) { + this.ownerThread = thread; + } - return this; + @Override + public void transitionToShared() { + this.ownerThread = null; + } + + @Override + public void withLock(Runnable op) { + synchronized (this) { + revokeOwnership(); + op.run(); + } + } + + @Override + public T withLock(Supplier op) { + synchronized (this) { + revokeOwnership(); + return op.get(); + } + } + + /** + * Revoke owner-thread fast path. Guarded to avoid redundant volatile writes on the cache line. + */ + private void revokeOwnership() { + if (ownerThread != null) { + ownerThread = null; + } + } + + private boolean isOwnerThread() { + Thread ot = this.ownerThread; + return ot != null && ot == Thread.currentThread(); + } + + public OptimizedTagMap freeze() { + if (isOwnerThread()) { + this.frozen = true; + return this; + } + synchronized (this) { + revokeOwnership(); + this.frozen = true; + return this; + } } public boolean isFrozen() { - return this.frozen; + if (isOwnerThread()) { + return this.frozen; + } + synchronized (this) { + revokeOwnership(); + return this.frozen; + } } public void checkWriteAccess() { @@ -2007,29 +2269,50 @@ boolean checkIfEmpty() { public Object compute( String key, BiFunction remappingFunction) { this.checkWriteAccess(); - - return TagMap.super.compute(key, remappingFunction); + if (isOwnerThread()) { + return TagMap.super.compute(key, remappingFunction); + } + synchronized (this) { + revokeOwnership(); + return TagMap.super.compute(key, remappingFunction); + } } @Override public Object computeIfAbsent( String key, Function mappingFunction) { this.checkWriteAccess(); - - return TagMap.super.computeIfAbsent(key, mappingFunction); + if (isOwnerThread()) { + return TagMap.super.computeIfAbsent(key, mappingFunction); + } + synchronized (this) { + revokeOwnership(); + return TagMap.super.computeIfAbsent(key, mappingFunction); + } } @Override public Object computeIfPresent( String key, BiFunction remappingFunction) { this.checkWriteAccess(); - - return TagMap.super.computeIfPresent(key, remappingFunction); + if (isOwnerThread()) { + return TagMap.super.computeIfPresent(key, remappingFunction); + } + synchronized (this) { + revokeOwnership(); + return TagMap.super.computeIfPresent(key, remappingFunction); + } } @Override public String toString() { - return toPrettyString(); + if (isOwnerThread()) { + return toPrettyString(); + } + synchronized (this) { + revokeOwnership(); + return toPrettyString(); + } } /** @@ -2862,19 +3145,101 @@ public boolean isOptimized() { } @Override - public void clear() { + public synchronized int size() { + return super.size(); + } + + @Override + public synchronized boolean isEmpty() { + return super.isEmpty(); + } + + @Override + public synchronized Object get(Object key) { + return super.get(key); + } + + @Override + public synchronized boolean containsKey(Object key) { + return super.containsKey(key); + } + + @Override + public synchronized boolean containsValue(Object value) { + return super.containsValue(value); + } + + @Override + public synchronized void clear() { this.checkWriteAccess(); super.clear(); } - public LegacyTagMap freeze() { + @Override + public synchronized Object put(String key, Object value) { + this.checkWriteAccess(); + + return super.put(key, value); + } + + @Override + public synchronized void putAll(Map m) { + this.checkWriteAccess(); + + super.putAll(m); + } + + @Override + public synchronized Object remove(Object key) { + this.checkWriteAccess(); + + return super.remove(key); + } + + @Override + public synchronized boolean remove(Object key, Object value) { + this.checkWriteAccess(); + + return super.remove(key, value); + } + + @Override + public synchronized Set> entrySet() { + return super.entrySet(); + } + + @Override + public synchronized Set keySet() { + return super.keySet(); + } + + @Override + public synchronized Collection values() { + return super.values(); + } + + @Override + public void withLock(Runnable op) { + synchronized (this) { + op.run(); + } + } + + @Override + public T withLock(Supplier op) { + synchronized (this) { + return op.get(); + } + } + + public synchronized LegacyTagMap freeze() { this.frozen = true; return this; } - public boolean isFrozen() { + public synchronized boolean isFrozen() { return this.frozen; } @@ -2883,7 +3248,7 @@ public void checkWriteAccess() { } @Override - public TagMap copy() { + public synchronized TagMap copy() { return new LegacyTagMap(this); } @@ -2898,22 +3263,22 @@ public Iterator valueIterator() { } @Override - public void fillMap(Map map) { + public synchronized void fillMap(Map map) { map.putAll(this); } @Override - public void fillStringMap(Map stringMap) { - for (Map.Entry entry : this.entrySet()) { + public synchronized void fillStringMap(Map stringMap) { + for (Map.Entry entry : super.entrySet()) { stringMap.put(entry.getKey(), entry.getValue().toString()); } } @Override - public void forEach(Consumer consumer) { + public synchronized void forEach(Consumer consumer) { EntryReadingHelper entryReadingHelper = new EntryReadingHelper(); - for (Map.Entry entry : this.entrySet()) { + for (Map.Entry entry : super.entrySet()) { entryReadingHelper.set(entry); consumer.accept(entryReadingHelper); @@ -2921,10 +3286,11 @@ public void forEach(Consumer consumer) { } @Override - public void forEach(T thisObj, BiConsumer consumer) { + public synchronized void forEach( + T thisObj, BiConsumer consumer) { EntryReadingHelper entryReadingHelper = new EntryReadingHelper(); - for (Map.Entry entry : this.entrySet()) { + for (Map.Entry entry : super.entrySet()) { entryReadingHelper.set(entry); consumer.accept(thisObj, entryReadingHelper); @@ -2932,11 +3298,11 @@ public void forEach(T thisObj, BiConsumer con } @Override - public void forEach( + public synchronized void forEach( T thisObj, U otherObj, TriConsumer consumer) { EntryReadingHelper entryReadingHelper = new EntryReadingHelper(); - for (Map.Entry entry : this.entrySet()) { + for (Map.Entry entry : super.entrySet()) { entryReadingHelper.set(entry); consumer.accept(thisObj, otherObj, entryReadingHelper); @@ -3148,48 +3514,20 @@ public void set(TagMap.EntryReader newEntryReader) { this.put(newEntryReader.tag(), newEntryReader.objectValue()); } - @Override - public Object put(String key, Object value) { - this.checkWriteAccess(); - - return super.put(key, value); - } - - @Override - public void putAll(Map m) { - this.checkWriteAccess(); - - super.putAll(m); - } - @Override public void putAll(TagMap that) { this.putAll((Map) that); } @Override - public Object remove(Object key) { - this.checkWriteAccess(); - - return super.remove(key); - } - - @Override - public boolean remove(Object key, Object value) { - this.checkWriteAccess(); - - return super.remove(key, value); - } - - @Override - public boolean remove(String tag) { + public synchronized boolean remove(String tag) { this.checkWriteAccess(); return (super.remove(tag) != null); } @Override - public Object compute( + public synchronized Object compute( String key, BiFunction remappingFunction) { this.checkWriteAccess(); @@ -3197,7 +3535,7 @@ public Object compute( } @Override - public Object computeIfAbsent( + public synchronized Object computeIfAbsent( String key, Function mappingFunction) { this.checkWriteAccess(); @@ -3205,7 +3543,7 @@ public Object computeIfAbsent( } @Override - public Object computeIfPresent( + public synchronized Object computeIfPresent( String key, BiFunction remappingFunction) { this.checkWriteAccess(); diff --git a/internal-api/src/test/java/datadog/trace/api/TagMapConcurrencyTest.java b/internal-api/src/test/java/datadog/trace/api/TagMapConcurrencyTest.java new file mode 100644 index 00000000000..342dbd597d1 --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/api/TagMapConcurrencyTest.java @@ -0,0 +1,301 @@ +package datadog.trace.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +/** Concurrency tests for the OptimizedTagMap thread-ownership model. */ +public final class TagMapConcurrencyTest { + + /** + * Creates an OptimizedTagMap directly — ownership semantics only apply to this implementation. + */ + private static TagMap createMap() { + return new OptimizedTagMap(); + } + + @Test + @DisplayName("Owner thread: write 1000 tags via fast path, verify all present") + void ownerThreadFastPath() { + TagMap map = createMap(); + map.setOwnerThread(Thread.currentThread()); + + for (int i = 0; i < 1000; i++) { + map.set("key" + i, "value" + i); + } + + assertEquals(1000, map.size()); + for (int i = 0; i < 1000; i++) { + assertEquals("value" + i, map.getObject("key" + i)); + } + } + + @Test + @DisplayName("Transition: owner writes, transitionToShared, non-owner writes, all visible") + void transitionMakesWritesVisible() throws Exception { + TagMap map = createMap(); + map.setOwnerThread(Thread.currentThread()); + + // Owner writes + for (int i = 0; i < 100; i++) { + map.set("owner" + i, "val" + i); + } + + map.transitionToShared(); + + // Non-owner writes from another thread + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch done = new CountDownLatch(1); + executor.submit( + () -> { + for (int i = 0; i < 100; i++) { + map.set("other" + i, "val" + i); + } + done.countDown(); + }); + + done.await(5, TimeUnit.SECONDS); + executor.shutdown(); + + // All writes visible + for (int i = 0; i < 100; i++) { + assertEquals("val" + i, map.getObject("owner" + i), "Owner tag owner" + i + " missing"); + assertEquals("val" + i, map.getObject("other" + i), "Other tag other" + i + " missing"); + } + assertEquals(200, map.size()); + } + + @RepeatedTest(5) + @DisplayName("Post-transition: 8 threads write concurrently — no crash, no lost keys") + void postTransitionContention() throws Exception { + int numThreads = 8; + int tagsPerThread = 500; + TagMap map = createMap(); + map.setOwnerThread(Thread.currentThread()); + map.transitionToShared(); + + CyclicBarrier barrier = new CyclicBarrier(numThreads); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numThreads; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < tagsPerThread; i++) { + map.set("t" + threadIdx + "_k" + i, "v" + i); + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertEquals(0, errors.size(), "Unexpected errors: " + errors); + // Each thread's last write should be present + for (int t = 0; t < numThreads; t++) { + assertNotNull( + map.getObject("t" + t + "_k" + (tagsPerThread - 1)), "Thread " + t + " last tag missing"); + } + } + + @Test + @DisplayName("withLock provides compound atomicity: batch is all-or-nothing to observers") + void withLockCompoundAtomicity() throws Exception { + TagMap map = createMap(); + map.setOwnerThread(Thread.currentThread()); + map.transitionToShared(); + + int batchSize = 10; + int iterations = 5000; + AtomicBoolean running = new AtomicBoolean(true); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + // Writer: writes batches of N tags atomically via withLock + Thread writer = + new Thread( + () -> { + try { + for (int iter = 0; iter < iterations; iter++) { + final int batch = iter; + map.withLock( + () -> { + for (int i = 0; i < batchSize; i++) { + map.set("batch_" + i, "b" + batch); + } + }); + } + } catch (Throwable e) { + errors.add(e); + } finally { + running.set(false); + } + }); + + // Reader: reads all batch tags and verifies they're from the same batch + Thread reader = + new Thread( + () -> { + try { + while (running.get()) { + map.withLock( + () -> { + Object first = map.getObject("batch_0"); + if (first == null) return; + for (int i = 1; i < batchSize; i++) { + Object val = map.getObject("batch_" + i); + if (val != null && !val.equals(first)) { + errors.add( + new AssertionError( + "Partial batch: batch_0=" + + first + + " but batch_" + + i + + "=" + + val)); + } + } + }); + } + } catch (Throwable e) { + errors.add(e); + } + }); + + writer.start(); + reader.start(); + writer.join(30_000); + assertFalse(writer.isAlive(), "Writer thread did not terminate"); + reader.join(5_000); + assertFalse(reader.isAlive(), "Reader thread did not terminate"); + + assertEquals(0, errors.size(), "Atomicity violated: " + errors); + } + + @RepeatedTest(3) + @DisplayName("size() is consistent under contention: never exceeds total unique keys written") + void sizeConsistencyUnderContention() throws Exception { + int numWriters = 4; + int keysPerWriter = 200; + TagMap map = createMap(); + map.setOwnerThread(Thread.currentThread()); + map.transitionToShared(); + + CyclicBarrier barrier = new CyclicBarrier(numWriters); + ExecutorService executor = Executors.newFixedThreadPool(numWriters); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + List> futures = new ArrayList<>(); + for (int t = 0; t < numWriters; t++) { + final int threadIdx = t; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + for (int i = 0; i < keysPerWriter; i++) { + map.set("w" + threadIdx + "_" + i, "v"); + // Occasionally read size to check consistency + int s = map.size(); + if (s < 0 || s > numWriters * keysPerWriter) { + errors.add(new AssertionError("size() out of range: " + s)); + } + } + } catch (Throwable e) { + errors.add(e); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertEquals(0, errors.size(), "Unexpected errors: " + errors); + int finalSize = map.size(); + assertTrue( + finalSize > 0 && finalSize <= numWriters * keysPerWriter, + "Final size out of range: " + finalSize); + } + + @RepeatedTest(3) + @DisplayName("forEach under contention: writer + forEach reader — no crash") + void forEachUnderContention() throws Exception { + TagMap map = createMap(); + map.setOwnerThread(Thread.currentThread()); + map.transitionToShared(); + + // Pre-populate + for (int i = 0; i < 50; i++) { + map.set("pre" + i, "val" + i); + } + + int iterations = 5000; + AtomicBoolean running = new AtomicBoolean(true); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + Thread writer = + new Thread( + () -> { + try { + for (int i = 0; i < iterations; i++) { + map.set("dyn" + (i % 100), "val" + i); + } + } catch (Throwable e) { + errors.add(e); + } finally { + running.set(false); + } + }); + + Thread reader = + new Thread( + () -> { + try { + while (running.get()) { + int[] count = {0}; + map.forEach(entry -> count[0]++); + if (count[0] < 0) { + errors.add(new AssertionError("forEach returned negative count")); + } + } + } catch (Throwable e) { + errors.add(e); + } + }); + + writer.start(); + reader.start(); + writer.join(30_000); + assertFalse(writer.isAlive(), "Writer thread did not terminate"); + reader.join(5_000); + assertFalse(reader.isAlive(), "Reader thread did not terminate"); + + assertEquals(0, errors.size(), "Errors during forEach contention: " + errors); + } +}