Skip to content

Commit a1f4ec5

Browse files
Improve VirtualThread instrumentation (#11009)
fix(virtual-thread): Prevent duplicate instrumentation conflict between VirtualThread and Runnable fix(virtual-thread): Fix duplicate instrumentation of afterDone feat(virtual-thread): Use context swapping instead of scope activation Replaces the `ConcurrentState` approach (which activated/closed individual scopes) with `VirtualThreadState` that swaps the entire scope stack via `Context#swap()`. This correctly handles child spans created during virtual thread execution and avoids out-of-order scope closing. The new approach mirrors the ZIO FiberContext and Kotlin coroutines instrumentation patterns. feat(virtual-thread): Use context swapping instead of scope activation Replaces the `ConcurrentState` approach (which activated/closed individual scopes) with `VirtualThreadState` that swaps the entire scope stack via `Context#swap()`. This correctly handles child spans created during virtual thread execution and avoids out-of-order scope closing. The new approach mirrors the ZIO FiberContext and Kotlin coroutines instrumentation patterns. Co-authored-by: bruce.bujon <bruce.bujon@datadoghq.com>
1 parent 3123b19 commit a1f4ec5

File tree

6 files changed

+212
-72
lines changed

6 files changed

+212
-72
lines changed
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package datadog.trace.bootstrap.instrumentation.java.lang;
22

3+
/** This class is a helper for the java-lang-21.0 {@code VirtualThreadInstrumentation}. */
34
public final class VirtualThreadHelper {
45
public static final String VIRTUAL_THREAD_CLASS_NAME = "java.lang.VirtualThread";
56

67
/**
7-
* {@link datadog.trace.bootstrap.instrumentation.api.AgentScope} class name as string literal.
8-
* This is mandatory for {@link datadog.trace.bootstrap.ContextStore} API call.
8+
* {@link VirtualThreadState} class name as string literal. This is mandatory for {@link
9+
* datadog.trace.bootstrap.ContextStore} API call.
910
*/
10-
public static final String AGENT_SCOPE_CLASS_NAME =
11-
"datadog.trace.bootstrap.instrumentation.api.AgentScope";
11+
public static final String VIRTUAL_THREAD_STATE_CLASS_NAME =
12+
"datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadState";
1213
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datadog.trace.bootstrap.instrumentation.java.lang;
2+
3+
import datadog.context.Context;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
5+
6+
/**
7+
* This class holds the saved context and scope continuation for a virtual thread.
8+
*
9+
* <p>Used by java-lang-21.0 {@code VirtualThreadInstrumentation} to swap the entire scope stack on
10+
* mount/unmount.
11+
*/
12+
public final class VirtualThreadState {
13+
/** The virtual thread's saved context (scope stack snapshot). */
14+
private Context context;
15+
16+
/** Prevents the enclosing context scope from completing before the virtual thread finishes. */
17+
private final Continuation continuation;
18+
19+
/** The carrier thread's saved context, set between mount and unmount. */
20+
private Context previousContext;
21+
22+
public VirtualThreadState(Context context, Continuation continuation) {
23+
this.context = context;
24+
this.continuation = continuation;
25+
}
26+
27+
/** Called on mount: swaps the virtual thread's context into the carrier thread. */
28+
public void onMount() {
29+
this.previousContext = this.context.swap();
30+
}
31+
32+
/** Called on unmount: restores the carrier thread's original context. */
33+
public void onUnmount() {
34+
if (this.previousContext != null) {
35+
this.context = this.previousContext.swap();
36+
this.previousContext = null;
37+
}
38+
}
39+
40+
/** Called on termination: releases the trace continuation. */
41+
public void onTerminate() {
42+
if (this.continuation != null) {
43+
this.continuation.cancel();
44+
}
45+
}
46+
}

dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/AbstractInstrumentationTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import datadog.trace.core.DDSpan;
2121
import datadog.trace.core.PendingTrace;
2222
import datadog.trace.core.TraceCollector;
23+
import de.thetaphi.forbiddenapis.SuppressForbidden;
2324
import java.lang.instrument.ClassFileTransformer;
2425
import java.lang.instrument.Instrumentation;
2526
import java.util.List;
@@ -30,6 +31,7 @@
3031
import java.util.function.Predicate;
3132
import net.bytebuddy.agent.ByteBuddyAgent;
3233
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeAll;
3335
import org.junit.jupiter.api.BeforeEach;
3436
import org.junit.jupiter.api.extension.ExtendWith;
3537
import org.opentest4j.AssertionFailedError;
@@ -53,6 +55,19 @@ public abstract class AbstractInstrumentationTest {
5355
protected ClassFileTransformer activeTransformer;
5456
protected ClassFileTransformerListener transformerLister;
5557

58+
@SuppressForbidden // Class.forName() used to dynamically configure context if present
59+
@BeforeAll
60+
static void allowContextTesting() {
61+
// Allow re-registration of context managers so each test can use a fresh tracer.
62+
// This mirrors DDSpecification.allowContextTesting() for the Spock test framework.
63+
try {
64+
Class.forName("datadog.context.ContextManager").getMethod("allowTesting").invoke(null);
65+
Class.forName("datadog.context.ContextBinder").getMethod("allowTesting").invoke(null);
66+
} catch (Throwable ignore) {
67+
// don't block testing if context types aren't available
68+
}
69+
}
70+
5671
@BeforeEach
5772
public void init() {
5873
// If this fails, it's likely the result of another test loading Config before it can be
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,70 @@
11
package datadog.trace.instrumentation.java.lang.jdk21;
22

3+
import static datadog.context.Context.current;
4+
import static datadog.context.Context.root;
35
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
5-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
6-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.activateAndContinueContinuation;
7-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.captureContinuation;
8-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.closeScope;
9-
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
6+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureActiveSpan;
7+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
108
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
9+
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_STATE_CLASS_NAME;
10+
import static java.util.Collections.singletonList;
11+
import static java.util.Collections.singletonMap;
1112
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
1213
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
14+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1315

1416
import com.google.auto.service.AutoService;
17+
import datadog.context.Context;
1518
import datadog.environment.JavaVirtualMachine;
19+
import datadog.trace.agent.tooling.ExcludeFilterProvider;
1620
import datadog.trace.agent.tooling.Instrumenter;
1721
import datadog.trace.agent.tooling.InstrumenterModule;
1822
import datadog.trace.bootstrap.ContextStore;
1923
import datadog.trace.bootstrap.InstrumentationContext;
20-
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
21-
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
22-
import java.util.HashMap;
24+
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
25+
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
26+
import datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadState;
27+
import java.util.Collection;
2328
import java.util.Map;
2429
import net.bytebuddy.asm.Advice;
2530
import net.bytebuddy.asm.Advice.OnMethodEnter;
2631
import net.bytebuddy.asm.Advice.OnMethodExit;
2732

2833
/**
29-
* Instruments {@code VirtualThread} to capture active state at creation, activate it on mount,
30-
* close the scope on unmount, and cancel the continuation on thread termination.
34+
* Instruments {@code VirtualThread} to propagate the context across mount/unmount cycles using
35+
* {@link Context#swap()}, and {@link Continuation} to prevent context scope to complete before the
36+
* thread finishes.
3137
*
3238
* <p>The lifecycle is as follows:
3339
*
3440
* <ol>
35-
* <li>{@code init()}: captures and holds a continuation from the active context (span due to
36-
* legacy API).
37-
* <li>{@code mount()}: activates the held continuation, restoring the context on the current
38-
* carrier thread.
39-
* <li>{@code unmount()}: closes the scope. The continuation survives as still hold.
41+
* <li>{@code init()}: captures the current {@link Context} and an {@link Continuation} to prevent
42+
* the enclosing context scope from completing early.
43+
* <li>{@code mount()}: swaps the virtual thread's saved context into the carrier thread, saving
44+
* the carrier thread's context.
45+
* <li>{@code unmount()}: swaps the carrier thread's original context back, saving the virtual
46+
* thread's (possibly modified) context for the next mount.
4047
* <li>Steps 2-3 repeat on each park/unpark cycle, potentially on different carrier threads.
41-
* <li>{@code afterTerminate()} (for early versions of JDK 21 and 22 before GA), {@code afterDone}
42-
* (for JDK 21 GA above): cancels the held continuation to let the context scope to be closed.
48+
* <li>{@code afterDone()}: cancels the help continuation, releasing the context scope to be
49+
* closed.
4350
* </ol>
4451
*
45-
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
46-
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link ConcurrentState} to
47-
* restore later. It additionally stores the {@link AgentScope} to be able to close it later as
48-
* activation / close is not done around the same method (so passing the scope from {@link
49-
* OnMethodEnter} / {@link OnMethodExit} using advice return value is not possible).
50-
*
51-
* <p>{@link ConcurrentState} is used instead of {@code State} because virtual threads can mount and
52-
* unmount multiple times across different carrier threads. The held continuation in {@link
53-
* ConcurrentState} survives multiple activate/close cycles without being consumed, and is
54-
* explicitly canceled on thread termination.
55-
*
5652
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
5753
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
5854
* the carrier thread (ie a platform thread) would store the active context into ThreadLocal using
5955
* the platform thread as key, making the tracer unable to retrieve the stored context from the
6056
* current virtual thread (ThreadLocal will not return the value associated to the underlying
6157
* platform thread as they are considered to be different).
58+
*
59+
* @see VirtualThreadState
6260
*/
6361
@SuppressWarnings("unused")
6462
@AutoService(InstrumenterModule.class)
6563
public final class VirtualThreadInstrumentation extends InstrumenterModule.ContextTracking
66-
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
64+
implements Instrumenter.ForBootstrap,
65+
Instrumenter.ForSingleType,
66+
Instrumenter.HasMethodAdvice,
67+
ExcludeFilterProvider {
6768

6869
public VirtualThreadInstrumentation() {
6970
super("java-lang", "java-lang-21", "virtual-thread");
@@ -79,67 +80,74 @@ public boolean isEnabled() {
7980
return JavaVirtualMachine.isJavaVersionAtLeast(21) && super.isEnabled();
8081
}
8182

83+
@Override
84+
public Map<ExcludeFilter.ExcludeType, ? extends Collection<String>> excludedClasses() {
85+
// VirtualThread context is activated on mount/unmount, not on Runnable.run().
86+
return singletonMap(RUNNABLE, singletonList(VIRTUAL_THREAD_CLASS_NAME));
87+
}
88+
8289
@Override
8390
public Map<String, String> contextStore() {
84-
Map<String, String> contextStore = new HashMap<>();
85-
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
86-
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
87-
return contextStore;
91+
return singletonMap(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
8892
}
8993

9094
@Override
9195
public void methodAdvice(MethodTransformer transformer) {
9296
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
93-
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
94-
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
97+
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Mount");
98+
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Unmount");
9599
transformer.applyAdvice(
96-
isMethod().and(namedOneOf("afterTerminate", "afterDone")),
97-
getClass().getName() + "$Terminate");
100+
isMethod().and(named("afterDone")).and(takesArguments(boolean.class)),
101+
getClass().getName() + "$AfterDone");
98102
}
99103

100104
public static final class Construct {
101105
@OnMethodExit(suppress = Throwable.class)
102-
public static void captureScope(@Advice.This Object virtualThread) {
103-
captureContinuation(
104-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
105-
(Runnable) virtualThread,
106-
activeSpan());
106+
public static void afterInit(@Advice.This Object virtualThread) {
107+
Context context = current();
108+
if (context == root()) {
109+
return; // No active context to propagate, avoid creating state
110+
}
111+
VirtualThreadState state = new VirtualThreadState(context, captureActiveSpan());
112+
ContextStore<Object, Object> store =
113+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
114+
store.put(virtualThread, state);
107115
}
108116
}
109117

110-
public static final class Activate {
118+
public static final class Mount {
111119
@OnMethodExit(suppress = Throwable.class)
112-
public static void activate(@Advice.This Object virtualThread) {
113-
AgentScope scope =
114-
activateAndContinueContinuation(
115-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
116-
(Runnable) virtualThread);
117-
ContextStore<Object, AgentScope> scopeStore =
118-
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
119-
scopeStore.put(virtualThread, scope);
120+
public static void onMount(@Advice.This Object virtualThread) {
121+
ContextStore<Object, VirtualThreadState> store =
122+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
123+
VirtualThreadState state = store.get(virtualThread);
124+
if (state != null) {
125+
state.onMount();
126+
}
120127
}
121128
}
122129

123-
public static final class Close {
130+
public static final class Unmount {
124131
@OnMethodEnter(suppress = Throwable.class)
125-
public static void close(@Advice.This Object virtualThread) {
126-
ContextStore<Object, AgentScope> scopeStore =
127-
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
128-
AgentScope scope = scopeStore.remove(virtualThread);
129-
closeScope(
130-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
131-
(Runnable) virtualThread,
132-
scope,
133-
null);
132+
public static void onUnmount(@Advice.This Object virtualThread) {
133+
ContextStore<Object, VirtualThreadState> store =
134+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
135+
VirtualThreadState state = store.get(virtualThread);
136+
if (state != null) {
137+
state.onUnmount();
138+
}
134139
}
135140
}
136141

137-
public static final class Terminate {
142+
public static final class AfterDone {
138143
@OnMethodEnter(suppress = Throwable.class)
139-
public static void terminate(@Advice.This Object virtualThread) {
140-
ConcurrentState.cancelAndClearContinuation(
141-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
142-
(Runnable) virtualThread);
144+
public static void onTerminate(@Advice.This Object virtualThread) {
145+
ContextStore<Object, VirtualThreadState> store =
146+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
147+
VirtualThreadState state = store.remove(virtualThread);
148+
if (state != null) {
149+
state.onTerminate();
150+
}
143151
}
144152
}
145153
}

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/test/java/testdog/trace/instrumentation/java/lang/jdk21/VirtualThreadApiInstrumentationTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
/** Test the {@code VirtualThread} and {@code Thread.Builder} API. */
1515
public class VirtualThreadApiInstrumentationTest extends AbstractInstrumentationTest {
16-
1716
@DisplayName("test Thread.Builder.OfVirtual.start()")
1817
@Test
1918
void testBuilderOfVirtualStart() throws InterruptedException, TimeoutException {
@@ -145,4 +144,16 @@ void assertConnectedTrace() {
145144
span().root().operationName("parent"),
146145
span().childOfPrevious().operationName("asyncChild")));
147146
}
147+
148+
private static void tryUnmount() {
149+
try {
150+
// Multiple sleeps to attempt triggering repeated park/unpark cycles.
151+
// This is not guaranteed to work, but there is no API to force mount/unmount.
152+
for (int i = 0; i < 5; i++) {
153+
Thread.sleep(10);
154+
}
155+
} catch (InterruptedException e) {
156+
throw new RuntimeException(e);
157+
}
158+
}
148159
}

0 commit comments

Comments
 (0)