Skip to content

Commit 03f4969

Browse files
committed
fix(virtual-thread): Fix support for multiple mounts / unmounts
1 parent c00f676 commit 03f4969

File tree

3 files changed

+237
-22
lines changed

3 files changed

+237
-22
lines changed

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/main/java/datadog/trace/instrumentation/java/lang/jdk21/VirtualThreadInstrumentation.java

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package datadog.trace.instrumentation.java.lang.jdk21;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
5-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
6-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
5+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
6+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.activateAndContinueContinuation;
7+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.captureContinuation;
8+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.closeScope;
79
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
810
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
911
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
@@ -16,22 +18,40 @@
1618
import datadog.trace.bootstrap.ContextStore;
1719
import datadog.trace.bootstrap.InstrumentationContext;
1820
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
19-
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
21+
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
2022
import java.util.HashMap;
2123
import java.util.Map;
2224
import net.bytebuddy.asm.Advice;
2325
import net.bytebuddy.asm.Advice.OnMethodEnter;
2426
import net.bytebuddy.asm.Advice.OnMethodExit;
2527

2628
/**
27-
* Instruments {@code VirtualThread} to capture active state at creation, activate it on
28-
* continuation mount, and close the scope from activation on continuation unmount.
29+
* Instruments {@code VirtualThread} to capture active state at creation, activate it on mount,
30+
* close the scope on unmount, and cancel the continuation on thread termination.
31+
*
32+
* <p>The lifecycle is as follows:
33+
*
34+
* <ol>
35+
* <li>{@code init()}: captures and holds a continuation from the active context (span due to
36+
* legacy API).
37+
* <li>{@code mount()}: activates the held continuation, restoring the context on the current
38+
* carrier thread.
39+
* <li>{@code unmount()}: closes the scope. The continuation survives as still hold.
40+
* <li>Steps 2-3 repeat on each park/unpark cycle, potentially on different carrier threads.
41+
* <li>{@code afterTerminate()} (for early versions of JDK 21 and 22 before GA), {@code afterDone}
42+
* (for JDK 21 GA above): cancels the held continuation to let the context scope to be closed.
43+
* </ol>
2944
*
3045
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
31-
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore
32-
* later. It additionally stores the {@link AgentScope} to be able to close it later as activation /
33-
* close is not done around the same method (so passing the scope from {@link OnMethodEnter} /
34-
* {@link OnMethodExit} using advice return value is not possible).
46+
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link ConcurrentState} to
47+
* restore later. It additionally stores the {@link AgentScope} to be able to close it later as
48+
* activation / close is not done around the same method (so passing the scope from {@link
49+
* OnMethodEnter} / {@link OnMethodExit} using advice return value is not possible).
50+
*
51+
* <p>{@link ConcurrentState} is used instead of {@code State} because virtual threads can mount and
52+
* unmount multiple times across different carrier threads. The held continuation in {@link
53+
* ConcurrentState} survives multiple activate/close cycles without being consumed, and is
54+
* explicitly canceled on thread termination.
3555
*
3656
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
3757
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
@@ -62,7 +82,7 @@ public boolean isEnabled() {
6282
@Override
6383
public Map<String, String> contextStore() {
6484
Map<String, String> contextStore = new HashMap<>();
65-
contextStore.put(Runnable.class.getName(), State.class.getName());
85+
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
6686
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
6787
return contextStore;
6888
}
@@ -72,36 +92,54 @@ public void methodAdvice(MethodTransformer transformer) {
7292
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
7393
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
7494
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
95+
transformer.applyAdvice(
96+
isMethod().and(namedOneOf("afterTerminate", "afterDone")),
97+
getClass().getName() + "$Terminate");
7598
}
7699

77100
public static final class Construct {
78101
@OnMethodExit(suppress = Throwable.class)
79102
public static void captureScope(@Advice.This Object virtualThread) {
80-
capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread);
103+
captureContinuation(
104+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
105+
(Runnable) virtualThread,
106+
activeSpan());
81107
}
82108
}
83109

84110
public static final class Activate {
85111
@OnMethodExit(suppress = Throwable.class)
86112
public static void activate(@Advice.This Object virtualThread) {
87-
ContextStore<Runnable, State> stateStore =
88-
InstrumentationContext.get(Runnable.class, State.class);
89-
ContextStore<Object, Object> scopeStore =
113+
AgentScope scope =
114+
activateAndContinueContinuation(
115+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
116+
(Runnable) virtualThread);
117+
ContextStore<Object, AgentScope> scopeStore =
90118
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
91-
AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread);
92-
scopeStore.put(virtualThread, agentScope);
119+
scopeStore.put(virtualThread, scope);
93120
}
94121
}
95122

96123
public static final class Close {
97124
@OnMethodEnter(suppress = Throwable.class)
98125
public static void close(@Advice.This Object virtualThread) {
99-
ContextStore<Object, Object> scopeStore =
126+
ContextStore<Object, AgentScope> scopeStore =
100127
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
101-
Object agentScope = scopeStore.get(virtualThread);
102-
if (agentScope instanceof AgentScope) {
103-
endTaskScope((AgentScope) agentScope);
104-
}
128+
AgentScope scope = scopeStore.remove(virtualThread);
129+
closeScope(
130+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
131+
(Runnable) virtualThread,
132+
scope,
133+
null);
134+
}
135+
}
136+
137+
public static final class Terminate {
138+
@OnMethodEnter(suppress = Throwable.class)
139+
public static void terminate(@Advice.This Object virtualThread) {
140+
ConcurrentState.cancelAndClearContinuation(
141+
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
142+
(Runnable) virtualThread);
105143
}
106144
}
107145
}

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/test/java/testdog/trace/instrumentation/java/lang/jdk21/VirtualThreadApiInstrumentationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.junit.jupiter.api.DisplayName;
1212
import org.junit.jupiter.api.Test;
1313

14+
/** Test the {@code VirtualThread} and {@code Thread.Builder} API. */
1415
public class VirtualThreadApiInstrumentationTest extends AbstractInstrumentationTest {
1516

1617
@DisplayName("test Thread.Builder.OfVirtual.start()")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package testdog.trace.instrumentation.java.lang.jdk21;
2+
3+
import static datadog.trace.agent.test.assertions.SpanMatcher.span;
4+
import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import datadog.trace.agent.test.AbstractInstrumentationTest;
8+
import datadog.trace.api.CorrelationIdentifier;
9+
import datadog.trace.api.GlobalTracer;
10+
import datadog.trace.api.Trace;
11+
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.TimeoutException;
15+
import java.util.concurrent.atomic.AtomicReference;
16+
import org.junit.jupiter.api.DisplayName;
17+
import org.junit.jupiter.api.Test;
18+
19+
/** Test context tracking through {@code VirtualThread} lifecycle - park/unpark (remount) cycles. */
20+
public class VirtualThreadLifeCycleTest extends AbstractInstrumentationTest {
21+
private static final Duration TIMEOUT = Duration.ofSeconds(10);
22+
23+
@DisplayName("test context restored after virtual thread remounts")
24+
@Test
25+
void testContextRestoredAfterVirtualThreadRemount() throws InterruptedException {
26+
int remountCount = 5;
27+
String[] spanId = new String[1];
28+
String[] spanIdBeforeUnmount = new String[1];
29+
String[] spanIdsAfterRemount = new String[remountCount];
30+
31+
new Runnable() {
32+
@Override
33+
@Trace(operationName = "parent")
34+
public void run() {
35+
spanId[0] = GlobalTracer.get().getSpanId();
36+
37+
Thread thread =
38+
Thread.startVirtualThread(
39+
() -> {
40+
spanIdBeforeUnmount[0] = GlobalTracer.get().getSpanId();
41+
for (int remount = 0; remount < remountCount; remount++) {
42+
tryUnmount();
43+
spanIdsAfterRemount[remount] = GlobalTracer.get().getSpanId();
44+
}
45+
});
46+
try {
47+
thread.join(TIMEOUT);
48+
} catch (InterruptedException e) {
49+
throw new RuntimeException(e);
50+
}
51+
}
52+
}.run();
53+
54+
assertEquals(
55+
spanId[0],
56+
spanIdBeforeUnmount[0],
57+
"context should be inherited from the parent execution unit");
58+
for (int i = 0; i < remountCount; i++) {
59+
assertEquals(
60+
spanId[0],
61+
spanIdsAfterRemount[i],
62+
"context should be restored after virtual thread remounts");
63+
}
64+
65+
assertTraces(trace(span().root().operationName("parent")));
66+
}
67+
68+
@DisplayName("test context restored as implicit parent span after remount")
69+
@Test
70+
void testContextRestoredAsImplicitParentSpanAfterRemount()
71+
throws InterruptedException, TimeoutException {
72+
new Runnable() {
73+
@Override
74+
@Trace(operationName = "parent")
75+
public void run() {
76+
Thread thread =
77+
Thread.startVirtualThread(
78+
() -> {
79+
tryUnmount();
80+
// Runnable to create child span, not async related
81+
new Runnable() {
82+
@Override
83+
@Trace(operationName = "child")
84+
public void run() {}
85+
}.run();
86+
});
87+
try {
88+
thread.join(TIMEOUT);
89+
} catch (InterruptedException e) {
90+
throw new RuntimeException(e);
91+
}
92+
blockUntilChildSpansFinished(1);
93+
}
94+
}.run();
95+
96+
assertTraces(
97+
trace(
98+
span().root().operationName("parent"),
99+
span().childOfPrevious().operationName("child")));
100+
}
101+
102+
@DisplayName("test concurrent virtual threads with remount")
103+
@Test
104+
void testConcurrentVirtualThreadsWithRemount() throws InterruptedException {
105+
int threadCount = 5;
106+
String[] spanId = new String[1];
107+
String[] spanIdsAfterRemount = new String[threadCount];
108+
109+
new Runnable() {
110+
@Override
111+
@Trace(operationName = "parent")
112+
public void run() {
113+
spanId[0] = CorrelationIdentifier.getSpanId();
114+
115+
List<Thread> threads = new ArrayList<>();
116+
for (int i = 0; i < threadCount; i++) {
117+
int index = i;
118+
threads.add(
119+
Thread.startVirtualThread(
120+
() -> {
121+
tryUnmount();
122+
spanIdsAfterRemount[index] = CorrelationIdentifier.getSpanId();
123+
}));
124+
}
125+
126+
for (Thread thread : threads) {
127+
try {
128+
thread.join(TIMEOUT);
129+
} catch (InterruptedException e) {
130+
throw new RuntimeException(e);
131+
}
132+
}
133+
}
134+
}.run();
135+
136+
for (int i = 0; i < threadCount; i++) {
137+
assertEquals(
138+
spanId[0],
139+
spanIdsAfterRemount[i],
140+
"context should be restored after virtual thread #" + i + "remounts");
141+
}
142+
143+
assertTraces(trace(span().root().operationName("parent")));
144+
}
145+
146+
@DisplayName("test no context virtual thread remount")
147+
@Test
148+
void testNoContextVirtualThreadRemount() throws InterruptedException {
149+
AtomicReference<String> spanIdBeforeUnmount = new AtomicReference<>();
150+
AtomicReference<String> spanIdAfterRemount = new AtomicReference<>();
151+
152+
Thread.startVirtualThread(
153+
() -> {
154+
spanIdBeforeUnmount.set(CorrelationIdentifier.getSpanId());
155+
tryUnmount();
156+
spanIdAfterRemount.set(CorrelationIdentifier.getSpanId());
157+
})
158+
.join(TIMEOUT);
159+
160+
assertEquals(
161+
"0", spanIdBeforeUnmount.get(), "there should be no active context before unmount");
162+
assertEquals("0", spanIdAfterRemount.get(), "there should be no active context after remount");
163+
}
164+
165+
private static void tryUnmount() {
166+
try {
167+
// Multiple sleeps to expect triggering repeated park/unpark cycles.
168+
// This is not guaranteed to work, but there is no API to force mount/unmount.
169+
for (int i = 0; i < 5; i++) {
170+
Thread.sleep(10);
171+
}
172+
} catch (InterruptedException e) {
173+
throw new RuntimeException(e);
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)