Skip to content

Commit 8069c80

Browse files
committed
replace State with ConcurrentState in virtual thread instrumentation
1 parent a41383a commit 8069c80

File tree

1 file changed

+40
-17
lines changed

1 file changed

+40
-17
lines changed

dd-java-agent/instrumentation/java/java-lang/java-lang-21.0/src/main/java/datadog/trace/instrumentation/java/lang/jdk21/VirtualThreadInstrumentation.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package datadog.trace.instrumentation.java.lang.jdk21;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
5-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
6-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
74
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
85
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
96
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
107
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
119

1210
import com.google.auto.service.AutoService;
1311
import datadog.environment.JavaVirtualMachine;
@@ -16,7 +14,8 @@
1614
import datadog.trace.bootstrap.ContextStore;
1715
import datadog.trace.bootstrap.InstrumentationContext;
1816
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
19-
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
17+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
18+
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
2019
import java.util.HashMap;
2120
import java.util.Map;
2221
import net.bytebuddy.asm.Advice;
@@ -25,13 +24,13 @@
2524

2625
/**
2726
* Instruments {@code VirtualThread} to capture active state at creation, activate it on
28-
* continuation mount, and close the scope from activation on continuation unmount.
27+
* continuation mount, close the scope on continuation unmount, and release the continuation when
28+
* the virtual thread terminates.
2929
*
3030
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
31-
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore
32-
* later. It additionally stores the {@link AgentScope} to be able to close it later as activation /
33-
* close is not done around the same method (so passing the scope from {@link OnMethodEnter} /
34-
* {@link OnMethodExit} using advice return value is not possible).
31+
* VirtualThread} inherits from {@link Runnable}) stores a held {@link ConcurrentState} so the
32+
* parent context can be re-activated on each mount. It additionally stores the {@link AgentScope}
33+
* to be able to close it later as activation / close is not done around the same method.
3534
*
3635
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
3736
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
@@ -62,7 +61,7 @@ public boolean isEnabled() {
6261
@Override
6362
public Map<String, String> contextStore() {
6463
Map<String, String> contextStore = new HashMap<>();
65-
contextStore.put(Runnable.class.getName(), State.class.getName());
64+
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
6665
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
6766
return contextStore;
6867
}
@@ -72,36 +71,60 @@ public void methodAdvice(MethodTransformer transformer) {
7271
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
7372
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
7473
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
74+
transformer.applyAdvice(
75+
isMethod().and(named("afterTerminate")).and(takesArguments(2)),
76+
getClass().getName() + "$Terminate");
7577
}
7678

7779
public static final class Construct {
7880
@OnMethodExit(suppress = Throwable.class)
7981
public static void captureScope(@Advice.This Object virtualThread) {
80-
capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread);
82+
ContextStore<Runnable, ConcurrentState> stateStore =
83+
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
84+
ConcurrentState.captureContinuation(
85+
stateStore, (Runnable) virtualThread, AgentTracer.activeSpan());
8186
}
8287
}
8388

8489
public static final class Activate {
8590
@OnMethodExit(suppress = Throwable.class)
8691
public static void activate(@Advice.This Object virtualThread) {
87-
ContextStore<Runnable, State> stateStore =
88-
InstrumentationContext.get(Runnable.class, State.class);
92+
ContextStore<Runnable, ConcurrentState> stateStore =
93+
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
8994
ContextStore<Object, Object> scopeStore =
9095
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
91-
AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread);
92-
scopeStore.put(virtualThread, agentScope);
96+
AgentScope agentScope =
97+
ConcurrentState.activateAndContinueContinuation(stateStore, (Runnable) virtualThread);
98+
if (agentScope != null) {
99+
scopeStore.put(virtualThread, agentScope);
100+
}
93101
}
94102
}
95103

96104
public static final class Close {
97105
@OnMethodEnter(suppress = Throwable.class)
98106
public static void close(@Advice.This Object virtualThread) {
107+
ContextStore<Runnable, ConcurrentState> stateStore =
108+
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
99109
ContextStore<Object, Object> scopeStore =
100110
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
101-
Object agentScope = scopeStore.get(virtualThread);
111+
Object agentScope = scopeStore.remove(virtualThread);
102112
if (agentScope instanceof AgentScope) {
103-
endTaskScope((AgentScope) agentScope);
113+
ConcurrentState.closeScope(
114+
stateStore, (Runnable) virtualThread, (AgentScope) agentScope, null);
104115
}
105116
}
106117
}
118+
119+
public static final class Terminate {
120+
@OnMethodExit(suppress = Throwable.class)
121+
public static void cleanup(@Advice.This Object virtualThread) {
122+
ContextStore<Runnable, ConcurrentState> stateStore =
123+
InstrumentationContext.get(Runnable.class, ConcurrentState.class);
124+
ConcurrentState.cancelAndClearContinuation(stateStore, (Runnable) virtualThread);
125+
ContextStore<Object, Object> scopeStore =
126+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
127+
scopeStore.remove(virtualThread);
128+
}
129+
}
107130
}

0 commit comments

Comments
 (0)