Skip to content

Commit d0e39d9

Browse files
committed
feat(virtual-thread): Use context swapping instead of scope activation
Replaces the `ConcurrentState` approach (which activated/closed individual scopes) with `VirtualThreadState` that swaps the entire scope stack via `Context#swap()`. This correctly handles child spans created during virtual thread execution and avoids out-of-order scope closing. The new approach mirrors the ZIO FiberContext and Kotlin coroutines instrumentation patterns.
1 parent fdb3c9f commit d0e39d9

File tree

6 files changed

+180
-62
lines changed

6 files changed

+180
-62
lines changed
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package datadog.trace.bootstrap.instrumentation.java.lang;
22

3+
/** This class is a helper for the java-lang-21.0 {@code VirtualThreadInstrumentation}. */
34
public final class VirtualThreadHelper {
45
public static final String VIRTUAL_THREAD_CLASS_NAME = "java.lang.VirtualThread";
56

67
/**
7-
* {@link datadog.trace.bootstrap.instrumentation.api.AgentScope} class name as string literal.
8-
* This is mandatory for {@link datadog.trace.bootstrap.ContextStore} API call.
8+
* {@link VirtualThreadState} class name as string literal. This is mandatory for {@link
9+
* datadog.trace.bootstrap.ContextStore} API call.
910
*/
10-
public static final String AGENT_SCOPE_CLASS_NAME =
11-
"datadog.trace.bootstrap.instrumentation.api.AgentScope";
11+
public static final String VIRTUAL_THREAD_STATE_CLASS_NAME =
12+
"datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadState";
1213
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datadog.trace.bootstrap.instrumentation.java.lang;
2+
3+
import datadog.context.Context;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
5+
6+
/**
7+
* This class holds the saved context and scope continuation for a virtual thread.
8+
*
9+
* <p>Used by java-lang-21.0 {@code VirtualThreadInstrumentation} to swap the entire scope stack on
10+
* mount/unmount.
11+
*/
12+
public final class VirtualThreadState {
13+
/** The virtual thread's saved context (scope stack snapshot). */
14+
private Context context;
15+
16+
/** Prevents the enclosing context scope from completing before the virtual thread finishes. */
17+
private final Continuation continuation;
18+
19+
/** The carrier thread's saved context, set between mount and unmount. */
20+
private Context previousContext;
21+
22+
public VirtualThreadState(Context context, Continuation continuation) {
23+
this.context = context;
24+
this.continuation = continuation;
25+
}
26+
27+
/** Called on mount: swaps the virtual thread's context into the carrier thread. */
28+
public void onMount() {
29+
this.previousContext = this.context.swap();
30+
}
31+
32+
/** Called on unmount: restores the carrier thread's original context. */
33+
public void onUnmount() {
34+
if (this.previousContext != null) {
35+
this.context = this.previousContext.swap();
36+
this.previousContext = null;
37+
}
38+
}
39+
40+
/** Called on termination: releases the trace continuation. */
41+
public void onTerminate() {
42+
if (this.continuation != null) {
43+
this.continuation.cancel();
44+
}
45+
}
46+
}

dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/AbstractInstrumentationTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@
4242
*/
4343
@ExtendWith(TestClassShadowingExtension.class)
4444
public abstract class AbstractInstrumentationTest {
45+
static {
46+
// Allow re-registration of ContextManagers so each test can use a fresh tracer.
47+
// This mirrors DDSpecification.allowContextTesting() for the JUnit 5 test framework.
48+
datadog.context.ContextManager.allowTesting();
49+
datadog.context.ContextBinder.allowTesting();
50+
}
51+
4552
static final Instrumentation INSTRUMENTATION = ByteBuddyAgent.getInstrumentation();
4653

4754
static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(20);
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,62 @@
11
package datadog.trace.instrumentation.java.lang.jdk21;
22

3+
import static datadog.context.Context.current;
4+
import static datadog.context.Context.root;
35
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
5-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.activateAndContinueContinuation;
6-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.captureContinuation;
7-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.closeScope;
6+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureActiveSpan;
87
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
9-
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
108
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
9+
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_STATE_CLASS_NAME;
1110
import static java.util.Collections.singletonList;
1211
import static java.util.Collections.singletonMap;
1312
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
1413
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
1514
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1615

1716
import com.google.auto.service.AutoService;
17+
import datadog.context.Context;
1818
import datadog.environment.JavaVirtualMachine;
1919
import datadog.trace.agent.tooling.ExcludeFilterProvider;
2020
import datadog.trace.agent.tooling.Instrumenter;
2121
import datadog.trace.agent.tooling.InstrumenterModule;
2222
import datadog.trace.bootstrap.ContextStore;
2323
import datadog.trace.bootstrap.InstrumentationContext;
24-
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
25-
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
24+
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
2625
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
26+
import datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadState;
2727
import java.util.Collection;
28-
import java.util.HashMap;
2928
import java.util.Map;
3029
import net.bytebuddy.asm.Advice;
3130
import net.bytebuddy.asm.Advice.OnMethodEnter;
3231
import net.bytebuddy.asm.Advice.OnMethodExit;
3332

3433
/**
35-
* Instruments {@code VirtualThread} to capture active state at creation, activate it on mount,
36-
* close the scope on unmount, and cancel the continuation on thread termination.
34+
* Instruments {@code VirtualThread} to propagate the context across mount/unmount cycles using
35+
* {@link Context#swap()}, and {@link Continuation} to prevent context scope to complete before the
36+
* thread finishes.
3737
*
3838
* <p>The lifecycle is as follows:
3939
*
4040
* <ol>
41-
* <li>{@code init()}: captures and holds a continuation from the active context (span due to
42-
* legacy API).
43-
* <li>{@code mount()}: activates the held continuation, restoring the context on the current
44-
* carrier thread.
45-
* <li>{@code unmount()}: closes the scope. The continuation survives as still hold.
41+
* <li>{@code init()}: captures the current {@link Context} and an {@link Continuation} to prevent
42+
* the enclosing context scope from completing early.
43+
* <li>{@code mount()}: swaps the virtual thread's saved context into the carrier thread, saving
44+
* the carrier thread's context.
45+
* <li>{@code unmount()}: swaps the carrier thread's original context back, saving the virtual
46+
* thread's (possibly modified) context for the next mount.
4647
* <li>Steps 2-3 repeat on each park/unpark cycle, potentially on different carrier threads.
47-
* <li>{@code afterDone}: cancels the held continuation to let the context scope to be closed.
48+
* <li>{@code afterDone()}: cancels the help continuation, releasing the context scope to be
49+
* closed.
4850
* </ol>
4951
*
50-
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
51-
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link ConcurrentState} to
52-
* restore later. It additionally stores the {@link AgentScope} to be able to close it later as
53-
* activation / close is not done around the same method (so passing the scope from {@link
54-
* OnMethodEnter} / {@link OnMethodExit} using advice return value is not possible).
55-
*
56-
* <p>{@link ConcurrentState} is used instead of {@code State} because virtual threads can mount and
57-
* unmount multiple times across different carrier threads. The held continuation in {@link
58-
* ConcurrentState} survives multiple activate/close cycles without being consumed, and is
59-
* explicitly canceled on thread termination.
60-
*
6152
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
6253
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
6354
* the carrier thread (ie a platform thread) would store the active context into ThreadLocal using
6455
* the platform thread as key, making the tracer unable to retrieve the stored context from the
6556
* current virtual thread (ThreadLocal will not return the value associated to the underlying
6657
* platform thread as they are considered to be different).
58+
*
59+
* @see VirtualThreadState
6760
*/
6861
@SuppressWarnings("unused")
6962
@AutoService(InstrumenterModule.class)
@@ -95,10 +88,7 @@ public boolean isEnabled() {
9588

9689
@Override
9790
public Map<String, String> contextStore() {
98-
Map<String, String> contextStore = new HashMap<>();
99-
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
100-
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
101-
return contextStore;
91+
return singletonMap(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
10292
}
10393

10494
@Override
@@ -113,47 +103,51 @@ public void methodAdvice(MethodTransformer transformer) {
113103

114104
public static final class Construct {
115105
@OnMethodExit(suppress = Throwable.class)
116-
public static void capture(@Advice.This Object virtualThread) {
117-
captureContinuation(
118-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
119-
(Runnable) virtualThread,
120-
activeSpan());
106+
public static void afterInit(@Advice.This Object virtualThread) {
107+
Context context = current();
108+
if (context == root()) {
109+
return; // no active context to propagate
110+
}
111+
VirtualThreadState state = new VirtualThreadState(context, captureActiveSpan());
112+
ContextStore<Object, Object> store =
113+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
114+
store.put(virtualThread, state);
121115
}
122116
}
123117

124118
public static final class Mount {
125119
@OnMethodExit(suppress = Throwable.class)
126-
public static void activate(@Advice.This Object virtualThread) {
127-
AgentScope scope =
128-
activateAndContinueContinuation(
129-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
130-
(Runnable) virtualThread);
131-
ContextStore<Object, AgentScope> scopeStore =
132-
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
133-
scopeStore.put(virtualThread, scope);
120+
public static void onMount(@Advice.This Object virtualThread) {
121+
ContextStore<Object, VirtualThreadState> store =
122+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
123+
VirtualThreadState state = store.get(virtualThread);
124+
if (state != null) {
125+
state.onMount();
126+
}
134127
}
135128
}
136129

137130
public static final class Unmount {
138131
@OnMethodEnter(suppress = Throwable.class)
139-
public static void close(@Advice.This Object virtualThread) {
140-
ContextStore<Object, AgentScope> scopeStore =
141-
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
142-
AgentScope scope = scopeStore.remove(virtualThread);
143-
closeScope(
144-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
145-
(Runnable) virtualThread,
146-
scope,
147-
null);
132+
public static void onUnmount(@Advice.This Object virtualThread) {
133+
ContextStore<Object, VirtualThreadState> store =
134+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
135+
VirtualThreadState state = store.get(virtualThread);
136+
if (state != null) {
137+
state.onUnmount();
138+
}
148139
}
149140
}
150141

151142
public static final class AfterDone {
152143
@OnMethodEnter(suppress = Throwable.class)
153-
public static void clear(@Advice.This Object virtualThread) {
154-
ConcurrentState.cancelAndClearContinuation(
155-
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
156-
(Runnable) virtualThread);
144+
public static void onTerminate(@Advice.This Object virtualThread) {
145+
ContextStore<Object, VirtualThreadState> store =
146+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
147+
VirtualThreadState state = store.remove(virtualThread);
148+
if (state != null) {
149+
state.onTerminate();
150+
}
157151
}
158152
}
159153
}

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/test/java/testdog/trace/instrumentation/java/lang/jdk21/VirtualThreadApiInstrumentationTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
/** Test the {@code VirtualThread} and {@code Thread.Builder} API. */
1515
public class VirtualThreadApiInstrumentationTest extends AbstractInstrumentationTest {
16-
1716
@DisplayName("test Thread.Builder.OfVirtual.start()")
1817
@Test
1918
void testBuilderOfVirtualStart() throws InterruptedException, TimeoutException {
@@ -145,4 +144,16 @@ void assertConnectedTrace() {
145144
span().root().operationName("parent"),
146145
span().childOfPrevious().operationName("asyncChild")));
147146
}
147+
148+
private static void tryUnmount() {
149+
try {
150+
// Multiple sleeps to attempt triggering repeated park/unpark cycles.
151+
// This is not guaranteed to work, but there is no API to force mount/unmount.
152+
for (int i = 0; i < 5; i++) {
153+
Thread.sleep(10);
154+
}
155+
} catch (InterruptedException e) {
156+
throw new RuntimeException(e);
157+
}
158+
}
148159
}

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/test/java/testdog/trace/instrumentation/java/lang/jdk21/VirtualThreadLifeCycleTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package testdog.trace.instrumentation.java.lang.jdk21;
22

33
import static datadog.trace.agent.test.assertions.SpanMatcher.span;
4+
import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME;
45
import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
56
import static org.junit.jupiter.api.Assertions.assertEquals;
7+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
68

79
import datadog.trace.agent.test.AbstractInstrumentationTest;
810
import datadog.trace.api.CorrelationIdentifier;
@@ -160,6 +162,63 @@ void testNoContextVirtualThreadRemount() throws InterruptedException {
160162
assertEquals("0", spanIdAfterRemount.get(), "there should be no active context after remount");
161163
}
162164

165+
@DisplayName("test context ordering with child span across unmount/remount")
166+
@Test
167+
void testContextOrderingWithChildSpanAcrossRemount() throws InterruptedException {
168+
String[] parentSpanId = new String[1];
169+
String[] beforeChild = new String[1];
170+
String[] insideChildBeforeUnmount = new String[1];
171+
String[] insideChildAfterRemount = new String[1];
172+
String[] afterChild = new String[1];
173+
174+
new Runnable() {
175+
@Override
176+
@Trace(operationName = "parent")
177+
public void run() {
178+
parentSpanId[0] = GlobalTracer.get().getSpanId();
179+
180+
Thread thread =
181+
Thread.startVirtualThread(
182+
() -> {
183+
beforeChild[0] = GlobalTracer.get().getSpanId();
184+
childWork(insideChildBeforeUnmount, insideChildAfterRemount);
185+
afterChild[0] = GlobalTracer.get().getSpanId();
186+
});
187+
try {
188+
thread.join();
189+
} catch (InterruptedException e) {
190+
throw new RuntimeException(e);
191+
}
192+
blockUntilChildSpansFinished(1);
193+
}
194+
}.run();
195+
196+
// Verify context ordering at each checkpoint
197+
assertEquals(parentSpanId[0], beforeChild[0], "parent should be active before child span");
198+
assertNotEquals("0", insideChildBeforeUnmount[0], "child should be active before unmount");
199+
assertNotEquals(
200+
parentSpanId[0], insideChildBeforeUnmount[0], "active span should be child, not parent");
201+
assertEquals(
202+
insideChildBeforeUnmount[0],
203+
insideChildAfterRemount[0],
204+
"child should still be active after remount (no out-of-order scope close)");
205+
assertEquals(parentSpanId[0], afterChild[0], "parent should be active after child span closes");
206+
207+
// Verify trace structure
208+
assertTraces(
209+
trace(
210+
SORT_BY_START_TIME,
211+
span().root().operationName("parent"),
212+
span().childOfPrevious().operationName("child")));
213+
}
214+
215+
@Trace(operationName = "child")
216+
private static void childWork(String[] beforeUnmount, String[] afterRemount) {
217+
beforeUnmount[0] = GlobalTracer.get().getSpanId();
218+
tryUnmount();
219+
afterRemount[0] = GlobalTracer.get().getSpanId();
220+
}
221+
163222
private static void tryUnmount() {
164223
try {
165224
// Multiple sleeps to expect triggering repeated park/unpark cycles.

0 commit comments

Comments
 (0)