Skip to content

Commit b5306f1

Browse files
committed
feat(profiler): add LockSupport.park/unpark instrumentation for causal DAG edges
1 parent 9333784 commit b5306f1

4 files changed

Lines changed: 281 additions & 0 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
apply from: "$rootDir/gradle/java.gradle"
2+
3+
muzzle {
4+
pass {
5+
coreJdk()
6+
}
7+
}
8+
9+
dependencies {
10+
testImplementation libs.bundles.junit5
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package datadog.trace.instrumentation.locksupport;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
7+
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.Instrumenter;
11+
import datadog.trace.agent.tooling.InstrumenterModule;
12+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
14+
import datadog.trace.bootstrap.instrumentation.api.ProfilerContext;
15+
import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import net.bytebuddy.asm.Advice;
18+
import net.bytebuddy.matcher.ElementMatchers;
19+
20+
/**
21+
* Instruments {@link java.util.concurrent.locks.LockSupport#park} variants to emit {@code
22+
* datadog.TaskBlock} JFR events. These events record the span, root-span, and duration of every
23+
* blocking interval, enabling critical-path analysis across async handoffs.
24+
*
25+
* <p>Also instruments {@link java.util.concurrent.locks.LockSupport#unpark} to capture the span ID
26+
* of the unblocking thread, which is then recorded in the TaskBlock event.
27+
*
28+
* <p>Only fires when a Datadog span is active on the calling thread, so there is no overhead on
29+
* threads that are not part of a traced request.
30+
*/
31+
@AutoService(InstrumenterModule.class)
32+
public class LockSupportProfilingInstrumentation extends InstrumenterModule.Profiling
33+
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
34+
35+
public LockSupportProfilingInstrumentation() {
36+
super("lock-support-profiling");
37+
}
38+
39+
@Override
40+
public String[] knownMatchingTypes() {
41+
return new String[] {"java.util.concurrent.locks.LockSupport"};
42+
}
43+
44+
@Override
45+
public void methodAdvice(MethodTransformer transformer) {
46+
transformer.applyAdvice(
47+
isMethod()
48+
.and(isStatic())
49+
.and(nameStartsWith("park"))
50+
.and(isDeclaredBy(named("java.util.concurrent.locks.LockSupport"))),
51+
getClass().getName() + "$ParkAdvice");
52+
transformer.applyAdvice(
53+
isMethod()
54+
.and(isStatic())
55+
.and(ElementMatchers.named("unpark"))
56+
.and(isDeclaredBy(named("java.util.concurrent.locks.LockSupport"))),
57+
getClass().getName() + "$UnparkAdvice");
58+
}
59+
60+
/** Holds shared state accessible from both {@link ParkAdvice} and {@link UnparkAdvice}. */
61+
public static final class State {
62+
/** Maps target thread to the span ID of the thread that called {@code unpark()} on it. */
63+
public static final ConcurrentHashMap<Thread, Long> UNPARKING_SPAN = new ConcurrentHashMap<>();
64+
}
65+
66+
public static final class ParkAdvice {
67+
68+
@Advice.OnMethodEnter(suppress = Throwable.class)
69+
public static long[] before(@Advice.Argument(value = 0, optional = true) Object blocker) {
70+
AgentSpan span = AgentTracer.activeSpan();
71+
if (!(span instanceof ProfilerContext)) {
72+
return null;
73+
}
74+
ProfilerContext ctx = (ProfilerContext) span;
75+
ProfilingContextIntegration profiling = AgentTracer.get().getProfilingContext();
76+
long startTicks = profiling.getCurrentTicks();
77+
if (startTicks == 0L) {
78+
// profiler not active
79+
return null;
80+
}
81+
long blockerHash = blocker != null ? System.identityHashCode(blocker) : 0L;
82+
return new long[] {startTicks, ctx.getSpanId(), ctx.getRootSpanId(), blockerHash};
83+
}
84+
85+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
86+
public static void after(@Advice.Enter long[] state) {
87+
if (state == null) {
88+
return;
89+
}
90+
Long unblockingSpanId = State.UNPARKING_SPAN.remove(Thread.currentThread());
91+
AgentTracer.get()
92+
.getProfilingContext()
93+
.recordTaskBlock(
94+
state[0],
95+
state[1],
96+
state[2],
97+
state[3],
98+
unblockingSpanId != null ? unblockingSpanId : 0L);
99+
}
100+
}
101+
102+
public static final class UnparkAdvice {
103+
104+
@Advice.OnMethodEnter(suppress = Throwable.class)
105+
public static void before(@Advice.Argument(0) Thread thread) {
106+
if (thread == null) {
107+
return;
108+
}
109+
AgentSpan span = AgentTracer.activeSpan();
110+
if (!(span instanceof ProfilerContext)) {
111+
return;
112+
}
113+
State.UNPARKING_SPAN.put(thread, ((ProfilerContext) span).getSpanId());
114+
}
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package datadog.trace.instrumentation.locksupport;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertNull;
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
9+
import datadog.trace.instrumentation.locksupport.LockSupportProfilingInstrumentation.State;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
import org.junit.jupiter.api.AfterEach;
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
17+
/**
18+
* Unit tests for {@link LockSupportProfilingInstrumentation}.
19+
*
20+
* <p>These tests exercise the {@link State} map directly, verifying the mechanism used to
21+
* communicate the unblocking span ID from {@code UnparkAdvice} to {@code ParkAdvice}.
22+
*/
23+
class LockSupportProfilingInstrumentationTest {
24+
25+
@BeforeEach
26+
void clearState() {
27+
State.UNPARKING_SPAN.clear();
28+
}
29+
30+
@AfterEach
31+
void cleanupState() {
32+
State.UNPARKING_SPAN.clear();
33+
}
34+
35+
// -------------------------------------------------------------------------
36+
// State map — basic contract
37+
// -------------------------------------------------------------------------
38+
39+
@Test
40+
void state_put_and_remove() {
41+
Thread t = Thread.currentThread();
42+
long spanId = 12345L;
43+
44+
State.UNPARKING_SPAN.put(t, spanId);
45+
Long retrieved = State.UNPARKING_SPAN.remove(t);
46+
47+
assertNotNull(retrieved);
48+
assertEquals(spanId, (long) retrieved);
49+
// After removal the entry should be gone
50+
assertNull(State.UNPARKING_SPAN.get(t));
51+
}
52+
53+
@Test
54+
void state_remove_returns_null_when_absent() {
55+
Thread t = new Thread(() -> {});
56+
assertNull(State.UNPARKING_SPAN.remove(t));
57+
}
58+
59+
@Test
60+
void state_is_initially_empty() {
61+
assertTrue(State.UNPARKING_SPAN.isEmpty());
62+
}
63+
64+
// -------------------------------------------------------------------------
65+
// Multithreaded: unpark thread populates map, parked thread reads it
66+
// -------------------------------------------------------------------------
67+
68+
/**
69+
* Simulates the UnparkAdvice → ParkAdvice handoff:
70+
*
71+
* <ol>
72+
* <li>Thread A (the "parked" thread) blocks on a latch.
73+
* <li>Thread B (the "unparking" thread) places its span ID in {@code State.UNPARKING_SPAN} for
74+
* Thread A and then releases the latch.
75+
* <li>Thread A wakes up, reads and removes the span ID from the map.
76+
* </ol>
77+
*/
78+
@Test
79+
void unparking_spanId_is_visible_to_parked_thread() throws InterruptedException {
80+
long unparkingSpanId = 99887766L;
81+
82+
CountDownLatch ready = new CountDownLatch(1);
83+
CountDownLatch go = new CountDownLatch(1);
84+
AtomicLong capturedSpanId = new AtomicLong(-1L);
85+
AtomicReference<Thread> parkedThreadRef = new AtomicReference<>();
86+
87+
Thread parkedThread =
88+
new Thread(
89+
() -> {
90+
parkedThreadRef.set(Thread.currentThread());
91+
ready.countDown();
92+
try {
93+
go.await();
94+
} catch (InterruptedException e) {
95+
Thread.currentThread().interrupt();
96+
}
97+
98+
// Simulate what ParkAdvice.after does: read and remove unblocking span id
99+
Long unblockingId = State.UNPARKING_SPAN.remove(Thread.currentThread());
100+
capturedSpanId.set(unblockingId != null ? unblockingId : 0L);
101+
});
102+
103+
parkedThread.start();
104+
ready.await(); // wait for parked thread to register itself
105+
106+
// Simulate what UnparkAdvice.before does: record unparking span id
107+
State.UNPARKING_SPAN.put(parkedThread, unparkingSpanId);
108+
go.countDown(); // unblock parked thread
109+
110+
parkedThread.join(2_000);
111+
assertFalse(parkedThread.isAlive(), "Test thread did not finish in time");
112+
assertEquals(
113+
unparkingSpanId,
114+
capturedSpanId.get(),
115+
"Parked thread should have read the unblocking span id placed by unparking thread");
116+
}
117+
118+
/**
119+
* Verifies that if no entry exists for the parked thread (i.e. the thread was unblocked by a
120+
* non-traced thread), the {@code remove} returns {@code null} and the code falls back to 0.
121+
*/
122+
@Test
123+
void no_unparking_entry_yields_zero() throws InterruptedException {
124+
AtomicLong capturedSpanId = new AtomicLong(-1L);
125+
126+
Thread parkedThread =
127+
new Thread(
128+
() -> {
129+
Long unblockingId = State.UNPARKING_SPAN.remove(Thread.currentThread());
130+
capturedSpanId.set(unblockingId != null ? unblockingId : 0L);
131+
});
132+
parkedThread.start();
133+
parkedThread.join(2_000);
134+
135+
assertEquals(
136+
0L, capturedSpanId.get(), "Should fall back to 0 when no unparking span id is recorded");
137+
}
138+
139+
// -------------------------------------------------------------------------
140+
// ParkAdvice.after — null state is a no-op
141+
// -------------------------------------------------------------------------
142+
143+
/**
144+
* When {@code ParkAdvice.before} returns {@code null} (profiler not active or no active span),
145+
* {@code ParkAdvice.after} must be a no-op and must not throw.
146+
*/
147+
@Test
148+
void parkAdvice_after_null_state_isNoOp() {
149+
// Should not throw and should not touch State.UNPARKING_SPAN
150+
LockSupportProfilingInstrumentation.ParkAdvice.after(null);
151+
assertTrue(State.UNPARKING_SPAN.isEmpty());
152+
}
153+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ include(
325325
":dd-java-agent:instrumentation:datadog:dynamic-instrumentation:span-origin",
326326
":dd-java-agent:instrumentation:datadog:profiling:enable-wallclock-profiling",
327327
":dd-java-agent:instrumentation:datadog:profiling:exception-profiling",
328+
":dd-java-agent:instrumentation:datadog:profiling:lock-support-profiling",
328329
":dd-java-agent:instrumentation:datadog:tracing:trace-annotation",
329330
":dd-java-agent:instrumentation:datanucleus-4.0.5",
330331
":dd-java-agent:instrumentation:datastax-cassandra:datastax-cassandra-3.0",

0 commit comments

Comments
 (0)