Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package datadog.trace.bootstrap.instrumentation.java.lang;

/** This class is a helper for the java-lang-21.0 {@code VirtualThreadInstrumentation}. */
public final class VirtualThreadHelper {
public static final String VIRTUAL_THREAD_CLASS_NAME = "java.lang.VirtualThread";

/**
* {@link datadog.trace.bootstrap.instrumentation.api.AgentScope} class name as string literal.
* This is mandatory for {@link datadog.trace.bootstrap.ContextStore} API call.
* {@link VirtualThreadState} class name as string literal. This is mandatory for {@link
* datadog.trace.bootstrap.ContextStore} API call.
*/
public static final String AGENT_SCOPE_CLASS_NAME =
"datadog.trace.bootstrap.instrumentation.api.AgentScope";
public static final String VIRTUAL_THREAD_STATE_CLASS_NAME =
"datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadState";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datadog.trace.bootstrap.instrumentation.java.lang;

import datadog.context.Context;
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;

/**
* This class holds the saved context and scope continuation for a virtual thread.
*
* <p>Used by java-lang-21.0 {@code VirtualThreadInstrumentation} to swap the entire scope stack on
* mount/unmount.
*/
public final class VirtualThreadState {
/** The virtual thread's saved context (scope stack snapshot). */
private Context context;

/** Prevents the enclosing context scope from completing before the virtual thread finishes. */
private final Continuation continuation;

/** The carrier thread's saved context, set between mount and unmount. */
private Context previousContext;

public VirtualThreadState(Context context, Continuation continuation) {
this.context = context;
this.continuation = continuation;
}

/** Called on mount: swaps the virtual thread's context into the carrier thread. */
public void onMount() {
this.previousContext = this.context.swap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that's the kind of API that I was imagining

}

/** Called on unmount: restores the carrier thread's original context. */
public void onUnmount() {
if (this.previousContext != null) {
this.context = this.previousContext.swap();
this.previousContext = null;
}
}

/** Called on termination: releases the trace continuation. */
public void onTerminate() {
if (this.continuation != null) {
this.continuation.cancel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import datadog.trace.core.DDSpan;
import datadog.trace.core.PendingTrace;
import datadog.trace.core.TraceCollector;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation;
import java.util.List;
Expand All @@ -30,6 +31,7 @@
import java.util.function.Predicate;
import net.bytebuddy.agent.ByteBuddyAgent;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opentest4j.AssertionFailedError;
Expand All @@ -53,6 +55,19 @@ public abstract class AbstractInstrumentationTest {
protected ClassFileTransformer activeTransformer;
protected ClassFileTransformerListener transformerLister;

@SuppressForbidden // Class.forName() used to dynamically configure context if present
@BeforeAll
static void allowContextTesting() {
// Allow re-registration of context managers so each test can use a fresh tracer.
// This mirrors DDSpecification.allowContextTesting() for the Spock test framework.
try {
Class.forName("datadog.context.ContextManager").getMethod("allowTesting").invoke(null);
Class.forName("datadog.context.ContextBinder").getMethod("allowTesting").invoke(null);
} catch (Throwable ignore) {
// don't block testing if context types aren't available
}
}

@BeforeEach
public void init() {
// If this fails, it's likely the result of another test loading Config before it can be
Expand Down
Original file line number Diff line number Diff line change
@@ -1,69 +1,70 @@
package datadog.trace.instrumentation.java.lang.jdk21;

import static datadog.context.Context.current;
import static datadog.context.Context.root;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.activateAndContinueContinuation;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.captureContinuation;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState.closeScope;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureActiveSpan;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_STATE_CLASS_NAME;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.environment.JavaVirtualMachine;
import datadog.trace.agent.tooling.ExcludeFilterProvider;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState;
import java.util.HashMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
import datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadState;
import java.util.Collection;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.OnMethodEnter;
import net.bytebuddy.asm.Advice.OnMethodExit;

/**
* Instruments {@code VirtualThread} to capture active state at creation, activate it on mount,
* close the scope on unmount, and cancel the continuation on thread termination.
* Instruments {@code VirtualThread} to propagate the context across mount/unmount cycles using
* {@link Context#swap()}, and {@link Continuation} to prevent context scope to complete before the
* thread finishes.
*
* <p>The lifecycle is as follows:
*
* <ol>
* <li>{@code init()}: captures and holds a continuation from the active context (span due to
* legacy API).
* <li>{@code mount()}: activates the held continuation, restoring the context on the current
* carrier thread.
* <li>{@code unmount()}: closes the scope. The continuation survives as still hold.
* <li>{@code init()}: captures the current {@link Context} and an {@link Continuation} to prevent
* the enclosing context scope from completing early.
* <li>{@code mount()}: swaps the virtual thread's saved context into the carrier thread, saving
* the carrier thread's context.
* <li>{@code unmount()}: swaps the carrier thread's original context back, saving the virtual
* thread's (possibly modified) context for the next mount.
* <li>Steps 2-3 repeat on each park/unpark cycle, potentially on different carrier threads.
* <li>{@code afterTerminate()} (for early versions of JDK 21 and 22 before GA), {@code afterDone}
* (for JDK 21 GA above): cancels the held continuation to let the context scope to be closed.
* <li>{@code afterDone()}: cancels the help continuation, releasing the context scope to be
* closed.
* </ol>
*
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link ConcurrentState} to
* restore later. It additionally stores the {@link AgentScope} to be able to close it later as
* activation / close is not done around the same method (so passing the scope from {@link
* OnMethodEnter} / {@link OnMethodExit} using advice return value is not possible).
*
* <p>{@link ConcurrentState} is used instead of {@code State} because virtual threads can mount and
* unmount multiple times across different carrier threads. The held continuation in {@link
* ConcurrentState} survives multiple activate/close cycles without being consumed, and is
* explicitly canceled on thread termination.
*
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
* the carrier thread (ie a platform thread) would store the active context into ThreadLocal using
* the platform thread as key, making the tracer unable to retrieve the stored context from the
* current virtual thread (ThreadLocal will not return the value associated to the underlying
* platform thread as they are considered to be different).
*
* @see VirtualThreadState
*/
@SuppressWarnings("unused")
@AutoService(InstrumenterModule.class)
public final class VirtualThreadInstrumentation extends InstrumenterModule.ContextTracking
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
implements Instrumenter.ForBootstrap,
Instrumenter.ForSingleType,
Instrumenter.HasMethodAdvice,
ExcludeFilterProvider {

public VirtualThreadInstrumentation() {
super("java-lang", "java-lang-21", "virtual-thread");
Expand All @@ -79,67 +80,74 @@ public boolean isEnabled() {
return JavaVirtualMachine.isJavaVersionAtLeast(21) && super.isEnabled();
}

@Override
public Map<ExcludeFilter.ExcludeType, ? extends Collection<String>> excludedClasses() {
// VirtualThread context is activated on mount/unmount, not on Runnable.run().
return singletonMap(RUNNABLE, singletonList(VIRTUAL_THREAD_CLASS_NAME));
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStore = new HashMap<>();
contextStore.put(Runnable.class.getName(), ConcurrentState.class.getName());
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
return contextStore;
return singletonMap(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Mount");
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Unmount");
transformer.applyAdvice(
isMethod().and(namedOneOf("afterTerminate", "afterDone")),
getClass().getName() + "$Terminate");
isMethod().and(named("afterDone")).and(takesArguments(boolean.class)),
getClass().getName() + "$AfterDone");
}

public static final class Construct {
@OnMethodExit(suppress = Throwable.class)
public static void captureScope(@Advice.This Object virtualThread) {
captureContinuation(
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
(Runnable) virtualThread,
activeSpan());
public static void afterInit(@Advice.This Object virtualThread) {
Context context = current();
if (context == root()) {
return; // No active context to propagate, avoid creating state
}
VirtualThreadState state = new VirtualThreadState(context, captureActiveSpan());
ContextStore<Object, Object> store =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
store.put(virtualThread, state);
}
}

public static final class Activate {
public static final class Mount {
@OnMethodExit(suppress = Throwable.class)
public static void activate(@Advice.This Object virtualThread) {
AgentScope scope =
activateAndContinueContinuation(
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
(Runnable) virtualThread);
ContextStore<Object, AgentScope> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
scopeStore.put(virtualThread, scope);
public static void onMount(@Advice.This Object virtualThread) {
ContextStore<Object, VirtualThreadState> store =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
VirtualThreadState state = store.get(virtualThread);
if (state != null) {
state.onMount();
}
}
}

public static final class Close {
public static final class Unmount {
@OnMethodEnter(suppress = Throwable.class)
public static void close(@Advice.This Object virtualThread) {
ContextStore<Object, AgentScope> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
AgentScope scope = scopeStore.remove(virtualThread);
closeScope(
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
(Runnable) virtualThread,
scope,
null);
public static void onUnmount(@Advice.This Object virtualThread) {
ContextStore<Object, VirtualThreadState> store =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
VirtualThreadState state = store.get(virtualThread);
if (state != null) {
state.onUnmount();
}
}
}

public static final class Terminate {
public static final class AfterDone {
@OnMethodEnter(suppress = Throwable.class)
public static void terminate(@Advice.This Object virtualThread) {
ConcurrentState.cancelAndClearContinuation(
InstrumentationContext.get(Runnable.class, ConcurrentState.class),
(Runnable) virtualThread);
public static void onTerminate(@Advice.This Object virtualThread) {
ContextStore<Object, VirtualThreadState> store =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, VIRTUAL_THREAD_STATE_CLASS_NAME);
VirtualThreadState state = store.remove(virtualThread);
if (state != null) {
state.onTerminate();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

/** Test the {@code VirtualThread} and {@code Thread.Builder} API. */
public class VirtualThreadApiInstrumentationTest extends AbstractInstrumentationTest {

@DisplayName("test Thread.Builder.OfVirtual.start()")
@Test
void testBuilderOfVirtualStart() throws InterruptedException, TimeoutException {
Expand Down Expand Up @@ -145,4 +144,16 @@ void assertConnectedTrace() {
span().root().operationName("parent"),
span().childOfPrevious().operationName("asyncChild")));
}

private static void tryUnmount() {
try {
// Multiple sleeps to attempt triggering repeated park/unpark cycles.
// This is not guaranteed to work, but there is no API to force mount/unmount.
for (int i = 0; i < 5; i++) {
Thread.sleep(10);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Loading
Loading