Skip to content

Commit c0ce9c5

Browse files
fix(virtual-thread): Fix support for multiple mounts / unmounts (#10931)
fix(virtual-thread): Fix support for multiple mounts / unmounts Co-authored-by: bruce.bujon <bruce.bujon@datadoghq.com>
1 parent 9eb410d commit c0ce9c5

File tree

3 files changed

+235
-22
lines changed

3 files changed

+235
-22
lines changed

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/main/java/datadog/trace/instrumentation/java/lang/jdk21/VirtualThreadInstrumentation.java

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package datadog.trace.instrumentation.java.lang.jdk21;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
5-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
6-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
5+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
6+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.activateAndContinueContinuation;
7+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.captureContinuation;
8+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.closeScope;
79
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
810
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
911
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
@@ -16,22 +18,40 @@
1618
import datadog.trace.bootstrap.ContextStore;
1719
import datadog.trace.bootstrap.InstrumentationContext;
1820
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
19-
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
21+
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
2022
import java.util.HashMap;
2123
import java.util.Map;
2224
import net.bytebuddy.asm.Advice;
2325
import net.bytebuddy.asm.Advice.OnMethodEnter;
2426
import net.bytebuddy.asm.Advice.OnMethodExit;
2527

2628
/**
27-
* Instruments {@code VirtualThread} to capture active state at creation, activate it on
28-
* continuation mount, and close the scope from activation on continuation unmount.
29+
* Instruments {@code VirtualThread} to capture active state at creation, activate it on mount,
30+
* close the scope on unmount, and cancel the continuation on thread termination.
31+
*
32+
* <p>The lifecycle is as follows:
33+
*
34+
* <ol>
35+
* <li>{@code init()}: captures and holds a continuation from the active context (span due to
36+
* legacy API).
37+
* <li>{@code mount()}: activates the held continuation, restoring the context on the current
38+
* carrier thread.
39+
* <li>{@code unmount()}: closes the scope. The continuation survives as still hold.
40+
* <li>Steps 2-3 repeat on each park/unpark cycle, potentially on different carrier threads.
41+
* <li>{@code afterTerminate()} (for early versions of JDK 21 and 22 before GA), {@code afterDone}
42+
* (for JDK 21 GA above): cancels the held continuation to let the context scope to be closed.
43+
* </ol>
2944
*
3045
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
31-
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore
32-
* later. It additionally stores the {@link AgentScope} to be able to close it later as activation /
33-
* close is not done around the same method (so passing the scope from {@link OnMethodEnter} /
34-
* {@link OnMethodExit} using advice return value is not possible).
46+
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link ConcurrentState} to
47+
* restore later. It additionally stores the {@link AgentScope} to be able to close it later as
48+
* activation / close is not done around the same method (so passing the scope from {@link
49+
* OnMethodEnter} / {@link OnMethodExit} using advice return value is not possible).
50+
*
51+
* <p>{@link ConcurrentState} is used instead of {@code State} because virtual threads can mount and
52+
* unmount multiple times across different carrier threads. The held continuation in {@link
53+
* ConcurrentState} survives multiple activate/close cycles without being consumed, and is
54+
* explicitly canceled on thread termination.
3555
*
3656
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
3757
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
@@ -62,7 +82,7 @@ public boolean isEnabled() {
6282
@Override
6383
public Map<String, String> contextStore() {
6484
Map<String, String> contextStore = new HashMap<>();
65-
contextStore.put(Runnable.class.getName(), State.class.getName());
85+
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
6686
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
6787
return contextStore;
6888
}
@@ -72,36 +92,54 @@ public void methodAdvice(MethodTransformer transformer) {
7292
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
7393
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
7494
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
95+
transformer.applyAdvice(
96+
isMethod().and(namedOneOf("afterTerminate", "afterDone")),
97+
getClass().getName() + "$Terminate");
7598
}
7699

77100
public static final class Construct {
78101
@OnMethodExit(suppress = Throwable.class)
79102
public static void captureScope(@Advice.This Object virtualThread) {
80-
capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread);
103+
captureContinuation(
104+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
105+
(Runnable) virtualThread,
106+
activeSpan());
81107
}
82108
}
83109

84110
public static final class Activate {
85111
@OnMethodExit(suppress = Throwable.class)
86112
public static void activate(@Advice.This Object virtualThread) {
87-
ContextStore<Runnable, State> stateStore =
88-
InstrumentationContext.get(Runnable.class, State.class);
89-
ContextStore<Object, Object> scopeStore =
113+
AgentScope scope =
114+
activateAndContinueContinuation(
115+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
116+
(Runnable) virtualThread);
117+
ContextStore<Object, AgentScope> scopeStore =
90118
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
91-
AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread);
92-
scopeStore.put(virtualThread, agentScope);
119+
scopeStore.put(virtualThread, scope);
93120
}
94121
}
95122

96123
public static final class Close {
97124
@OnMethodEnter(suppress = Throwable.class)
98125
public static void close(@Advice.This Object virtualThread) {
99-
ContextStore<Object, Object> scopeStore =
126+
ContextStore<Object, AgentScope> scopeStore =
100127
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
101-
Object agentScope = scopeStore.get(virtualThread);
102-
if (agentScope instanceof AgentScope) {
103-
endTaskScope((AgentScope) agentScope);
104-
}
128+
AgentScope scope = scopeStore.remove(virtualThread);
129+
closeScope(
130+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
131+
(Runnable) virtualThread,
132+
scope,
133+
null);
134+
}
135+
}
136+
137+
public static final class Terminate {
138+
@OnMethodEnter(suppress = Throwable.class)
139+
public static void terminate(@Advice.This Object virtualThread) {
140+
ConcurrentState.cancelAndClearContinuation(
141+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
142+
(Runnable) virtualThread);
105143
}
106144
}
107145
}

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/test/java/testdog/trace/instrumentation/java/lang/jdk21/VirtualThreadApiInstrumentationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.junit.jupiter.api.DisplayName;
1212
import org.junit.jupiter.api.Test;
1313

14+
/** Test the {@code VirtualThread} and {@code Thread.Builder} API. */
1415
public class VirtualThreadApiInstrumentationTest extends AbstractInstrumentationTest {
1516

1617
@DisplayName("test Thread.Builder.OfVirtual.start()")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package testdog.trace.instrumentation.java.lang.jdk21;
2+
3+
import static datadog.trace.agent.test.assertions.SpanMatcher.span;
4+
import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import datadog.trace.agent.test.AbstractInstrumentationTest;
8+
import datadog.trace.api.CorrelationIdentifier;
9+
import datadog.trace.api.GlobalTracer;
10+
import datadog.trace.api.Trace;
11+
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.atomic.AtomicReference;
15+
import org.junit.jupiter.api.DisplayName;
16+
import org.junit.jupiter.api.Test;
17+
18+
/** Test context tracking through {@code VirtualThread} lifecycle - park/unpark (remount) cycles. */
19+
public class VirtualThreadLifeCycleTest extends AbstractInstrumentationTest {
20+
private static final Duration TIMEOUT = Duration.ofSeconds(10);
21+
22+
@DisplayName("test context restored after virtual thread remounts")
23+
@Test
24+
void testContextRestoredAfterVirtualThreadRemount() {
25+
int remountCount = 5;
26+
String[] spanId = new String[1];
27+
String[] spanIdBeforeUnmount = new String[1];
28+
String[] spanIdsAfterRemount = new String[remountCount];
29+
30+
new Runnable() {
31+
@Override
32+
@Trace(operationName = "parent")
33+
public void run() {
34+
spanId[0] = GlobalTracer.get().getSpanId();
35+
36+
Thread thread =
37+
Thread.startVirtualThread(
38+
() -> {
39+
spanIdBeforeUnmount[0] = GlobalTracer.get().getSpanId();
40+
for (int remount = 0; remount < remountCount; remount++) {
41+
tryUnmount();
42+
spanIdsAfterRemount[remount] = GlobalTracer.get().getSpanId();
43+
}
44+
});
45+
try {
46+
thread.join(TIMEOUT);
47+
} catch (InterruptedException e) {
48+
throw new RuntimeException(e);
49+
}
50+
}
51+
}.run();
52+
53+
assertEquals(
54+
spanId[0],
55+
spanIdBeforeUnmount[0],
56+
"context should be inherited from the parent execution unit");
57+
for (int i = 0; i < remountCount; i++) {
58+
assertEquals(
59+
spanId[0],
60+
spanIdsAfterRemount[i],
61+
"context should be restored after virtual thread remounts");
62+
}
63+
64+
assertTraces(trace(span().root().operationName("parent")));
65+
}
66+
67+
@DisplayName("test context restored as implicit parent span after remount")
68+
@Test
69+
void testContextRestoredAsImplicitParentSpanAfterRemount() {
70+
new Runnable() {
71+
@Override
72+
@Trace(operationName = "parent")
73+
public void run() {
74+
Thread thread =
75+
Thread.startVirtualThread(
76+
() -> {
77+
tryUnmount();
78+
// Runnable to create child span, not async related
79+
new Runnable() {
80+
@Override
81+
@Trace(operationName = "child")
82+
public void run() {}
83+
}.run();
84+
});
85+
try {
86+
thread.join(TIMEOUT);
87+
} catch (InterruptedException e) {
88+
throw new RuntimeException(e);
89+
}
90+
blockUntilChildSpansFinished(1);
91+
}
92+
}.run();
93+
94+
assertTraces(
95+
trace(
96+
span().root().operationName("parent"),
97+
span().childOfPrevious().operationName("child")));
98+
}
99+
100+
@DisplayName("test concurrent virtual threads with remount")
101+
@Test
102+
void testConcurrentVirtualThreadsWithRemount() {
103+
int threadCount = 5;
104+
String[] spanId = new String[1];
105+
String[] spanIdsAfterRemount = new String[threadCount];
106+
107+
new Runnable() {
108+
@Override
109+
@Trace(operationName = "parent")
110+
public void run() {
111+
spanId[0] = CorrelationIdentifier.getSpanId();
112+
113+
List<Thread> threads = new ArrayList<>();
114+
for (int i = 0; i < threadCount; i++) {
115+
int index = i;
116+
threads.add(
117+
Thread.startVirtualThread(
118+
() -> {
119+
tryUnmount();
120+
spanIdsAfterRemount[index] = CorrelationIdentifier.getSpanId();
121+
}));
122+
}
123+
124+
for (Thread thread : threads) {
125+
try {
126+
thread.join(TIMEOUT);
127+
} catch (InterruptedException e) {
128+
throw new RuntimeException(e);
129+
}
130+
}
131+
}
132+
}.run();
133+
134+
for (int i = 0; i < threadCount; i++) {
135+
assertEquals(
136+
spanId[0],
137+
spanIdsAfterRemount[i],
138+
"context should be restored after virtual thread #" + i + "remounts");
139+
}
140+
141+
assertTraces(trace(span().root().operationName("parent")));
142+
}
143+
144+
@DisplayName("test no context virtual thread remount")
145+
@Test
146+
void testNoContextVirtualThreadRemount() throws InterruptedException {
147+
AtomicReference<String> spanIdBeforeUnmount = new AtomicReference<>();
148+
AtomicReference<String> spanIdAfterRemount = new AtomicReference<>();
149+
150+
Thread.startVirtualThread(
151+
() -> {
152+
spanIdBeforeUnmount.set(CorrelationIdentifier.getSpanId());
153+
tryUnmount();
154+
spanIdAfterRemount.set(CorrelationIdentifier.getSpanId());
155+
})
156+
.join(TIMEOUT);
157+
158+
assertEquals(
159+
"0", spanIdBeforeUnmount.get(), "there should be no active context before unmount");
160+
assertEquals("0", spanIdAfterRemount.get(), "there should be no active context after remount");
161+
}
162+
163+
private static void tryUnmount() {
164+
try {
165+
// Multiple sleeps to expect triggering repeated park/unpark cycles.
166+
// This is not guaranteed to work, but there is no API to force mount/unmount.
167+
for (int i = 0; i < 5; i++) {
168+
Thread.sleep(10);
169+
}
170+
} catch (InterruptedException e) {
171+
throw new RuntimeException(e);
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)