Skip to content

Commit 4530e05

Browse files
otelbot[bot]trask
authored andcommitted
Review fixes for reactor-3.1:library
Automated code review of instrumentation/reactor/reactor-3.1/library.
1 parent 83ad467 commit 4530e05

2 files changed

Lines changed: 91 additions & 1 deletion

File tree

instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,13 @@ public final class ContextPropagationOperator {
5858
@Nullable
5959
private static final MethodHandle FLUX_CONTEXT_WRITE_METHOD = getContextWriteMethod(Flux.class);
6060

61+
private static final String SCHEDULERS_HOOK_KEY = RunnableWrapper.class.getName();
62+
6163
@Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();
6264

65+
@Nullable
66+
private static final MethodHandle SCHEDULERS_RESET_HOOK_METHOD = getSchedulersResetHookMethod();
67+
6368
@Nullable
6469
private static MethodHandle getContextWriteMethod(Class<?> type) {
6570
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
@@ -88,6 +93,18 @@ private static MethodHandle getSchedulersHookMethod() {
8893
return null;
8994
}
9095

96+
@Nullable
97+
private static MethodHandle getSchedulersResetHookMethod() {
98+
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
99+
try {
100+
return lookup.findStatic(
101+
Schedulers.class, "resetOnScheduleHook", methodType(void.class, String.class));
102+
} catch (NoSuchMethodException | IllegalAccessException e) {
103+
// ignore
104+
}
105+
return null;
106+
}
107+
91108
public static ContextPropagationOperator create() {
92109
return builder().build();
93110
}
@@ -171,7 +188,7 @@ public void registerOnEachOperator() {
171188
Hooks.onEachOperator(
172189
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
173190
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
174-
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
191+
registerScheduleHook(SCHEDULERS_HOOK_KEY, RunnableWrapper::new);
175192
enabled = true;
176193
}
177194
}
@@ -187,6 +204,17 @@ private static void registerScheduleHook(String key, Function<Runnable, Runnable
187204
}
188205
}
189206

207+
private static void resetScheduleHook(String key) {
208+
if (SCHEDULERS_RESET_HOOK_METHOD == null) {
209+
return;
210+
}
211+
try {
212+
SCHEDULERS_RESET_HOOK_METHOD.invoke(key);
213+
} catch (Throwable t) {
214+
logger.log(WARNING, "Failed to remove scheduler hook", t);
215+
}
216+
}
217+
190218
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
191219
public void resetOnEachOperator() {
192220
synchronized (lock) {
@@ -195,6 +223,7 @@ public void resetOnEachOperator() {
195223
}
196224
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
197225
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
226+
resetScheduleHook(SCHEDULERS_HOOK_KEY);
198227
enabled = false;
199228
}
200229
}

instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,19 @@
66
package io.opentelemetry.instrumentation.reactor.v3_1;
77

88
import static java.util.concurrent.TimeUnit.MILLISECONDS;
9+
import static java.util.concurrent.TimeUnit.SECONDS;
910
import static org.assertj.core.api.Assertions.assertThat;
1011

12+
import io.opentelemetry.api.trace.Span;
13+
import io.opentelemetry.api.trace.SpanContext;
14+
import io.opentelemetry.api.trace.TraceFlags;
15+
import io.opentelemetry.api.trace.TraceState;
16+
import io.opentelemetry.context.Context;
17+
import io.opentelemetry.context.Scope;
1118
import java.util.concurrent.Callable;
19+
import java.util.concurrent.CountDownLatch;
1220
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.Function;
1322
import org.junit.jupiter.api.Test;
1423
import reactor.core.CoreSubscriber;
1524
import reactor.core.Disposable;
@@ -18,6 +27,14 @@
1827

1928
class HooksTest {
2029

30+
private static final Span PARENT_SPAN =
31+
Span.wrap(
32+
SpanContext.create(
33+
"11111111111111111111111111111111",
34+
"1111111111111111",
35+
TraceFlags.getSampled(),
36+
TraceState.getDefault()));
37+
2138
@Test
2239
void canResetOurHooks() {
2340
ContextPropagationOperator operator = ContextPropagationOperator.create();
@@ -35,6 +52,25 @@ void canResetOurHooks() {
3552
assertThat(subscriber.get()).extracting("actual").isNotInstanceOf(TracingSubscriber.class);
3653
}
3754

55+
@Test
56+
void canResetSchedulerHook() throws InterruptedException {
57+
if (!schedulerHooksSupported()) {
58+
return;
59+
}
60+
61+
ContextPropagationOperator operator = ContextPropagationOperator.create();
62+
63+
assertThat(scheduledSpanIsCurrent()).isFalse();
64+
65+
operator.registerOnEachOperator();
66+
try {
67+
assertThat(scheduledSpanIsCurrent()).isTrue();
68+
} finally {
69+
operator.resetOnEachOperator();
70+
}
71+
assertThat(scheduledSpanIsCurrent()).isFalse();
72+
}
73+
3874
private static class CapturingMono extends Mono<Integer> {
3975
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;
4076

@@ -71,4 +107,29 @@ void testInvalidBlockUsage() throws InterruptedException {
71107
disposable.dispose();
72108
operator.resetOnEachOperator();
73109
}
110+
111+
private static boolean schedulerHooksSupported() {
112+
try {
113+
Schedulers.class.getMethod("onScheduleHook", String.class, Function.class);
114+
Schedulers.class.getMethod("resetOnScheduleHook", String.class);
115+
return true;
116+
} catch (NoSuchMethodException e) {
117+
return false;
118+
}
119+
}
120+
121+
private static boolean scheduledSpanIsCurrent() throws InterruptedException {
122+
CountDownLatch latch = new CountDownLatch(1);
123+
AtomicReference<Boolean> currentSpanValid = new AtomicReference<>(false);
124+
try (Scope ignored = Context.root().with(PARENT_SPAN).makeCurrent()) {
125+
Schedulers.single()
126+
.schedule(
127+
() -> {
128+
currentSpanValid.set(Span.current().getSpanContext().isValid());
129+
latch.countDown();
130+
});
131+
}
132+
assertThat(latch.await(5, SECONDS)).isTrue();
133+
return currentSpanValid.get();
134+
}
74135
}

0 commit comments

Comments
 (0)