Skip to content

Commit 33c13e8

Browse files
bm1549claude
andcommitted
Rework thread-owned tags: encapsulate locking via withLock API
Address reviewer feedback on the thread-ownership optimization: - Add TagMap.withLock(Runnable/Supplier) API for compound operations, replacing all synchronized(unsafeTags) in DDSpanContext with unsafeTags.withLock() — locking is now fully encapsulated in TagMap - Add missing ownership checks to OptimizedTagMap: size(), isEmpty(), containsValue(), freeze(), isFrozen(), toString(), immutableCopy() - Add revokeOwnership() helper to avoid redundant volatile writes on the cache line after transition to shared mode - withLock() revokes ownership before running the operation, closing the TOCTOU race where the owner thread could bypass the compound lock - Add self-synchronization to LegacyTagMap (synchronized on all key HashMap methods + withLock override) so it is safe without outer locks - Add TagMapConcurrencyTest: owner fast path, transition visibility, post-transition contention, withLock atomicity, size consistency - Expand DDSpanContextConcurrencyTest: compound atomicity for setSpanSamplingPriority, getTags snapshot consistency, transition + concurrent read safety Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d992fcd commit 33c13e8

4 files changed

Lines changed: 795 additions & 161 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java

Lines changed: 111 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -609,11 +609,14 @@ public int getSamplingPriority() {
609609
}
610610

611611
public void setSpanSamplingPriority(double rate, int limit) {
612-
unsafeSetTag(SPAN_SAMPLING_MECHANISM_TAG, SamplingMechanism.SPAN_SAMPLING_RATE);
613-
unsafeSetTag(SPAN_SAMPLING_RULE_RATE_TAG, rate);
614-
if (limit != Integer.MAX_VALUE) {
615-
unsafeSetTag(SPAN_SAMPLING_MAX_PER_SECOND_TAG, limit);
616-
}
612+
unsafeTags.withLock(
613+
() -> {
614+
unsafeSetTag(SPAN_SAMPLING_MECHANISM_TAG, SamplingMechanism.SPAN_SAMPLING_RATE);
615+
unsafeSetTag(SPAN_SAMPLING_RULE_RATE_TAG, rate);
616+
if (limit != Integer.MAX_VALUE) {
617+
unsafeSetTag(SPAN_SAMPLING_MAX_PER_SECOND_TAG, limit);
618+
}
619+
});
617620
}
618621

619622
/**
@@ -901,44 +904,50 @@ void setAllTags(final TagMap map, boolean needsIntercept) {
901904
return;
902905
}
903906

904-
if (needsIntercept) {
905-
// forEach out-performs the iterator of TagMap
906-
// Taking advantage of ability to pass through other context arguments
907-
// to avoid using a capturing lambda
908-
map.forEach(
909-
this,
910-
(ctx, tagEntry) -> {
911-
String tag = tagEntry.tag();
912-
Object value = tagEntry.objectValue();
913-
914-
if (!ctx.tagInterceptor.interceptTag(ctx, tag, value)) {
915-
ctx.unsafeTags.set(tagEntry);
916-
}
917-
});
918-
} else {
919-
unsafeTags.putAll(map);
920-
}
907+
unsafeTags.withLock(
908+
() -> {
909+
if (needsIntercept) {
910+
// forEach out-performs the iterator of TagMap
911+
// Taking advantage of ability to pass through other context arguments
912+
// to avoid using a capturing lambda
913+
map.forEach(
914+
this,
915+
(ctx, tagEntry) -> {
916+
String tag = tagEntry.tag();
917+
Object value = tagEntry.objectValue();
918+
919+
if (!ctx.tagInterceptor.interceptTag(ctx, tag, value)) {
920+
ctx.unsafeTags.set(tagEntry);
921+
}
922+
});
923+
} else {
924+
unsafeTags.putAll(map);
925+
}
926+
});
921927
}
922928

923929
void setAllTags(final TagMap.Ledger ledger) {
924930
if (ledger == null) {
925931
return;
926932
}
927933

928-
for (final TagMap.EntryChange entryChange : ledger) {
929-
if (entryChange.isRemoval()) {
930-
unsafeTags.remove(entryChange.tag());
931-
} else {
932-
TagMap.Entry entry = (TagMap.Entry) entryChange;
934+
unsafeTags.withLock(
935+
() -> {
936+
for (final TagMap.EntryChange entryChange : ledger) {
937+
if (entryChange.isRemoval()) {
938+
unsafeTags.remove(entryChange.tag());
939+
} else {
940+
TagMap.Entry entry = (TagMap.Entry) entryChange;
933941

934-
String tag = entry.tag();
935-
Object value = entry.objectValue();
942+
String tag = entry.tag();
943+
Object value = entry.objectValue();
936944

937-
if (!tagInterceptor.interceptTag(this, tag, value)) {
938-
unsafeTags.set(entry);
939-
}
940-
}
941-
}
945+
if (!tagInterceptor.interceptTag(this, tag, value)) {
946+
unsafeTags.set(entry);
947+
}
948+
}
949+
}
950+
});
942951
}
943952

944953
void setAllTags(final Map<String, ?> map) {
@@ -947,11 +956,14 @@ void setAllTags(final Map<String, ?> map) {
947956
} else if (map instanceof TagMap) {
948957
setAllTags((TagMap) map);
949958
} else if (!map.isEmpty()) {
950-
for (final Map.Entry<String, ?> tag : map.entrySet()) {
951-
if (!tagInterceptor.interceptTag(this, tag.getKey(), tag.getValue())) {
952-
unsafeSetTag(tag.getKey(), tag.getValue());
953-
}
954-
}
959+
unsafeTags.withLock(
960+
() -> {
961+
for (final Map.Entry<String, ?> tag : map.entrySet()) {
962+
if (!tagInterceptor.interceptTag(this, tag.getKey(), tag.getValue())) {
963+
unsafeSetTag(tag.getKey(), tag.getValue());
964+
}
965+
}
966+
});
955967
}
956968
}
957969

@@ -1000,23 +1012,26 @@ public Object unsafeGetTag(final String tag) {
10001012

10011013
@Deprecated
10021014
public TagMap getTags() {
1003-
TagMap tags = unsafeTags.copy();
1004-
1005-
tags.put(DDTags.THREAD_ID, threadId);
1006-
// maintain previously observable type of the thread name :|
1007-
tags.put(DDTags.THREAD_NAME, threadName.toString());
1008-
if (samplingPriority != PrioritySampling.UNSET) {
1009-
tags.put(SAMPLE_RATE_KEY, samplingPriority);
1010-
}
1011-
if (httpStatusCode != 0) {
1012-
tags.put(Tags.HTTP_STATUS, (int) httpStatusCode);
1013-
}
1014-
// maintain previously observable type of http url :|
1015-
Object value = tags.get(Tags.HTTP_URL);
1016-
if (value != null) {
1017-
tags.put(Tags.HTTP_URL, value.toString());
1018-
}
1019-
return tags.freeze();
1015+
return unsafeTags.withLock(
1016+
() -> {
1017+
TagMap tags = unsafeTags.copy();
1018+
1019+
tags.put(DDTags.THREAD_ID, threadId);
1020+
// maintain previously observable type of the thread name :|
1021+
tags.put(DDTags.THREAD_NAME, threadName.toString());
1022+
if (samplingPriority != PrioritySampling.UNSET) {
1023+
tags.put(SAMPLE_RATE_KEY, samplingPriority);
1024+
}
1025+
if (httpStatusCode != 0) {
1026+
tags.put(Tags.HTTP_STATUS, (int) httpStatusCode);
1027+
}
1028+
// maintain previously observable type of http url :|
1029+
Object value = tags.get(Tags.HTTP_URL);
1030+
if (value != null) {
1031+
tags.put(Tags.HTTP_URL, value.toString());
1032+
}
1033+
return tags.freeze();
1034+
});
10201035
}
10211036

10221037
/**
@@ -1048,7 +1063,8 @@ public <T> void setMetaStruct(final String field, final T value) {
10481063
}
10491064

10501065
void earlyProcessTags(AppendableSpanLinks links) {
1051-
TagsPostProcessorFactory.eagerProcessor().processTags(unsafeTags, this, links);
1066+
unsafeTags.withLock(
1067+
() -> TagsPostProcessorFactory.eagerProcessor().processTags(unsafeTags, this, links));
10521068
}
10531069

10541070
void processTagsAndBaggage(
@@ -1058,39 +1074,44 @@ void processTagsAndBaggage(
10581074
// This is a compromise to avoid...
10591075
// - creating an extra wrapper object that would create significant allocation
10601076
// - implementing an interface to read the spans that require making the read method public
1061-
// Tags
1062-
TagsPostProcessorFactory.lazyProcessor().processTags(unsafeTags, this, restrictedSpan);
1063-
1064-
String linksTag = DDSpanLink.toTag(restrictedSpan.getLinks());
1065-
if (linksTag != null) {
1066-
unsafeTags.put(SPAN_LINKS, linksTag);
1067-
}
1068-
// Baggage
1069-
Map<String, String> baggageItemsWithPropagationTags;
1070-
if (injectBaggageAsTags) {
1071-
baggageItemsWithPropagationTags = new HashMap<>(baggageItems);
1072-
if (w3cBaggage != null) {
1073-
injectW3CBaggageTags(baggageItemsWithPropagationTags);
1074-
}
1075-
propagationTags.fillTagMap(baggageItemsWithPropagationTags);
1076-
} else {
1077-
baggageItemsWithPropagationTags = propagationTags.createTagMap();
1078-
}
1079-
1080-
consumer.accept(
1081-
new Metadata(
1082-
threadId,
1083-
threadName,
1084-
unsafeTags,
1085-
baggageItemsWithPropagationTags,
1086-
samplingPriority != PrioritySampling.UNSET ? samplingPriority : getSamplingPriority(),
1087-
measured,
1088-
topLevel,
1089-
httpStatusCode == 0 ? null : HTTP_STATUSES.get(httpStatusCode),
1090-
// Get origin from rootSpan.context
1091-
getOrigin(),
1092-
longRunningVersion,
1093-
ProcessTags.getTagsForSerialization()));
1077+
unsafeTags.withLock(
1078+
() -> {
1079+
// Tags
1080+
TagsPostProcessorFactory.lazyProcessor().processTags(unsafeTags, this, restrictedSpan);
1081+
1082+
String linksTag = DDSpanLink.toTag(restrictedSpan.getLinks());
1083+
if (linksTag != null) {
1084+
unsafeTags.put(SPAN_LINKS, linksTag);
1085+
}
1086+
// Baggage
1087+
Map<String, String> baggageItemsWithPropagationTags;
1088+
if (injectBaggageAsTags) {
1089+
baggageItemsWithPropagationTags = new HashMap<>(baggageItems);
1090+
if (w3cBaggage != null) {
1091+
injectW3CBaggageTags(baggageItemsWithPropagationTags);
1092+
}
1093+
propagationTags.fillTagMap(baggageItemsWithPropagationTags);
1094+
} else {
1095+
baggageItemsWithPropagationTags = propagationTags.createTagMap();
1096+
}
1097+
1098+
consumer.accept(
1099+
new Metadata(
1100+
threadId,
1101+
threadName,
1102+
unsafeTags,
1103+
baggageItemsWithPropagationTags,
1104+
samplingPriority != PrioritySampling.UNSET
1105+
? samplingPriority
1106+
: getSamplingPriority(),
1107+
measured,
1108+
topLevel,
1109+
httpStatusCode == 0 ? null : HTTP_STATUSES.get(httpStatusCode),
1110+
// Get origin from rootSpan.context
1111+
getOrigin(),
1112+
longRunningVersion,
1113+
ProcessTags.getTagsForSerialization()));
1114+
});
10941115
}
10951116

10961117
void injectW3CBaggageTags(Map<String, String> baggageItemsWithPropagationTags) {

dd-trace-core/src/test/java/datadog/trace/core/DDSpanContextConcurrencyTest.java

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,4 +613,145 @@ void concurrentSetMetric() throws Exception {
613613
"Thread " + t + " last metric missing");
614614
}
615615
}
616+
617+
/**
618+
* Verifies compound atomicity of setSpanSamplingPriority: the three sampling tags must always
619+
* appear together (all or none) when observed via getTags().
620+
*/
621+
@RepeatedTest(10)
622+
@DisplayName("setSpanSamplingPriority atomicity: 3 tags always consistent in getTags()")
623+
void spanSamplingPriorityAtomicity() throws Exception {
624+
AgentSpan span = TRACER.startSpan("test", "samplingAtomicity");
625+
DDSpanContext ctx = ((DDSpan) span).context();
626+
// Transition to shared so the compound lock is in effect
627+
ctx.transitionToShared();
628+
629+
int iterations = 2000;
630+
AtomicBoolean running = new AtomicBoolean(true);
631+
CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
632+
633+
// Writer: repeatedly sets span sampling priority
634+
Thread writer =
635+
new Thread(
636+
() -> {
637+
try {
638+
for (int i = 0; i < iterations; i++) {
639+
ctx.setSpanSamplingPriority(0.5, 100);
640+
}
641+
} catch (Throwable e) {
642+
errors.add(e);
643+
} finally {
644+
running.set(false);
645+
}
646+
});
647+
648+
// Reader: checks that the 3 sampling tags are consistent
649+
Thread reader =
650+
new Thread(
651+
() -> {
652+
try {
653+
while (running.get()) {
654+
datadog.trace.api.TagMap tags = ctx.getTags();
655+
Object mechanism = tags.getObject("_dd.span_sampling.mechanism");
656+
Object ruleRate = tags.getObject("_dd.span_sampling.rule_rate");
657+
Object maxPerSecond = tags.getObject("_dd.span_sampling.max_per_second");
658+
659+
// Either all three are present or none
660+
if (mechanism != null || ruleRate != null || maxPerSecond != null) {
661+
if (mechanism == null || ruleRate == null || maxPerSecond == null) {
662+
errors.add(
663+
new AssertionError(
664+
"Partial sampling tags: mechanism="
665+
+ mechanism
666+
+ " ruleRate="
667+
+ ruleRate
668+
+ " maxPerSecond="
669+
+ maxPerSecond));
670+
}
671+
}
672+
}
673+
} catch (Throwable e) {
674+
errors.add(e);
675+
}
676+
});
677+
678+
writer.start();
679+
reader.start();
680+
writer.join(30_000);
681+
reader.join(5_000);
682+
span.finish();
683+
684+
assertEquals(0, errors.size(), "Atomicity violated: " + errors);
685+
}
686+
687+
/**
688+
* Verifies that getTags() returns a consistent snapshot: THREAD_ID and THREAD_NAME are always
689+
* present and consistent with the span's owning thread.
690+
*/
691+
@Test
692+
@DisplayName("getTags() snapshot consistency: virtual fields always present")
693+
void getTagsSnapshotConsistency() throws Exception {
694+
AgentSpan span = TRACER.startSpan("test", "getTagsConsistency");
695+
696+
span.setTag("user_tag", "hello");
697+
span.finish();
698+
699+
datadog.trace.api.TagMap tags = ((DDSpan) span).context().getTags();
700+
assertNotNull(tags.getObject("thread.id"), "THREAD_ID missing from getTags()");
701+
assertNotNull(tags.getObject("thread.name"), "THREAD_NAME missing from getTags()");
702+
assertEquals("hello", tags.getObject("user_tag"));
703+
}
704+
705+
/**
706+
* Exercises the transition window: owner tags, finish (triggers transitionToShared), then
707+
* concurrent readers verify tags are visible post-transition.
708+
*/
709+
@RepeatedTest(5)
710+
@DisplayName("Transition + concurrent read: tags written before finish visible after")
711+
void transitionAndConcurrentRead() throws Exception {
712+
int numReaders = 4;
713+
int tagsToWrite = 200;
714+
AgentSpan span = TRACER.startSpan("test", "transitionRead");
715+
716+
// Owner thread writes tags
717+
for (int i = 0; i < tagsToWrite; i++) {
718+
span.setTag("pre_" + i, "val_" + i);
719+
}
720+
721+
// Finish triggers transitionToShared
722+
span.finish();
723+
724+
// Multiple reader threads verify all tags are visible
725+
CyclicBarrier barrier = new CyclicBarrier(numReaders);
726+
ExecutorService executor = Executors.newFixedThreadPool(numReaders);
727+
CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
728+
729+
List<Future<?>> futures = new ArrayList<>();
730+
for (int r = 0; r < numReaders; r++) {
731+
futures.add(
732+
executor.submit(
733+
() -> {
734+
try {
735+
barrier.await(5, TimeUnit.SECONDS);
736+
for (int i = 0; i < tagsToWrite; i++) {
737+
Object val = span.getTag("pre_" + i);
738+
if (!"val_".concat(String.valueOf(i)).equals(val)) {
739+
errors.add(
740+
new AssertionError(
741+
"Tag pre_" + i + " expected val_" + i + " got " + val));
742+
}
743+
}
744+
} catch (Throwable e) {
745+
errors.add(e);
746+
}
747+
}));
748+
}
749+
750+
for (Future<?> f : futures) {
751+
f.get(10, TimeUnit.SECONDS);
752+
}
753+
executor.shutdown();
754+
755+
assertEquals(0, errors.size(), "Tags not visible after transition: " + errors);
756+
}
616757
}

0 commit comments

Comments
 (0)