Skip to content

Commit 64fd980

Browse files
traskotelbot[bot]
andauthored
Reactor 3.1: reset schedulers hook in resetOnEachOperator() (#18258)
Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
1 parent 43fbbea commit 64fd980

2 files changed

Lines changed: 112 additions & 12 deletions

File tree

instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public final class ContextPropagationOperator {
5252

5353
private static final Object VALUE = new Object();
5454

55+
private static final String SCHEDULERS_HOOK_KEY = RunnableWrapper.class.getName();
56+
5557
@Nullable
5658
private static final MethodHandle MONO_CONTEXT_WRITE_METHOD = getContextWriteMethod(Mono.class);
5759

@@ -60,6 +62,9 @@ public final class ContextPropagationOperator {
6062

6163
@Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();
6264

65+
@Nullable
66+
private static final MethodHandle SCHEDULERS_RESET_HOOK_METHOD = getSchedulersResetHookMethod();
67+
6368
@Nullable
6469
private static MethodHandle getContextWriteMethod(Class<?> type) {
6570
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
@@ -88,6 +93,18 @@ private static MethodHandle getSchedulersHookMethod() {
8893
return null;
8994
}
9095

96+
@Nullable
97+
private static MethodHandle getSchedulersResetHookMethod() {
98+
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
99+
try {
100+
return lookup.findStatic(
101+
Schedulers.class, "resetOnScheduleHook", methodType(void.class, String.class));
102+
} catch (NoSuchMethodException | IllegalAccessException e) {
103+
// ignore
104+
}
105+
return null;
106+
}
107+
91108
public static ContextPropagationOperator create() {
92109
return builder().build();
93110
}
@@ -171,7 +188,7 @@ public void registerOnEachOperator() {
171188
Hooks.onEachOperator(
172189
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
173190
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
174-
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
191+
registerScheduleHook(SCHEDULERS_HOOK_KEY, RunnableWrapper::new);
175192
enabled = true;
176193
}
177194
}
@@ -187,6 +204,17 @@ private static void registerScheduleHook(String key, Function<Runnable, Runnable
187204
}
188205
}
189206

207+
private static void resetScheduleHook(String key) {
208+
if (SCHEDULERS_RESET_HOOK_METHOD == null) {
209+
return;
210+
}
211+
try {
212+
SCHEDULERS_RESET_HOOK_METHOD.invoke(key);
213+
} catch (Throwable t) {
214+
logger.log(WARNING, "Failed to remove scheduler hook", t);
215+
}
216+
}
217+
190218
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
191219
public void resetOnEachOperator() {
192220
synchronized (lock) {
@@ -195,6 +223,7 @@ public void resetOnEachOperator() {
195223
}
196224
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
197225
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
226+
resetScheduleHook(SCHEDULERS_HOOK_KEY);
198227
enabled = false;
199228
}
200229
}
@@ -359,7 +388,7 @@ public Object scanUnsafe(Scannable.Attr attr) {
359388
}
360389
}
361390

362-
private static class RunnableWrapper implements Runnable {
391+
static class RunnableWrapper implements Runnable {
363392
private final Runnable delegate;
364393
private final Context context;
365394

instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,49 @@
66
package io.opentelemetry.instrumentation.reactor.v3_1;
77

88
import static java.util.concurrent.TimeUnit.MILLISECONDS;
9+
import static java.util.concurrent.TimeUnit.SECONDS;
910
import static org.assertj.core.api.Assertions.assertThat;
10-
11+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
12+
13+
import io.opentelemetry.api.trace.Span;
14+
import io.opentelemetry.api.trace.SpanContext;
15+
import io.opentelemetry.api.trace.TraceFlags;
16+
import io.opentelemetry.api.trace.TraceState;
17+
import io.opentelemetry.context.Context;
18+
import io.opentelemetry.context.Scope;
1119
import java.util.concurrent.Callable;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1222
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.function.Function;
24+
import org.junit.jupiter.api.AfterEach;
1325
import org.junit.jupiter.api.Test;
1426
import reactor.core.CoreSubscriber;
1527
import reactor.core.Disposable;
28+
import reactor.core.publisher.Hooks;
1629
import reactor.core.publisher.Mono;
1730
import reactor.core.scheduler.Schedulers;
1831

1932
class HooksTest {
2033

34+
private static final Span PARENT_SPAN =
35+
Span.wrap(
36+
SpanContext.create(
37+
"11111111111111111111111111111111",
38+
"1111111111111111",
39+
TraceFlags.getSampled(),
40+
TraceState.getDefault()));
41+
42+
@AfterEach
43+
void resetHooks() throws ReflectiveOperationException {
44+
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
45+
if (schedulerHooksSupported()) {
46+
Schedulers.class
47+
.getMethod("resetOnScheduleHook", String.class)
48+
.invoke(null, ContextPropagationOperator.RunnableWrapper.class.getName());
49+
}
50+
}
51+
2152
@Test
2253
void canResetOurHooks() {
2354
ContextPropagationOperator operator = ContextPropagationOperator.create();
@@ -35,17 +66,19 @@ void canResetOurHooks() {
3566
assertThat(subscriber.get()).extracting("actual").isNotInstanceOf(TracingSubscriber.class);
3667
}
3768

38-
private static class CapturingMono extends Mono<Integer> {
39-
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;
69+
@Test
70+
void canResetSchedulerHook() throws InterruptedException {
71+
assumeTrue(schedulerHooksSupported());
4072

41-
private CapturingMono(AtomicReference<CoreSubscriber<? super Integer>> subscriber) {
42-
this.subscriber = subscriber;
43-
}
73+
ContextPropagationOperator operator = ContextPropagationOperator.create();
4474

45-
@Override
46-
public void subscribe(CoreSubscriber<? super Integer> actual) {
47-
subscriber.set(actual);
48-
}
75+
assertThat(schedulerPropagatesContext()).isFalse();
76+
77+
operator.registerOnEachOperator();
78+
assertThat(schedulerPropagatesContext()).isTrue();
79+
80+
operator.resetOnEachOperator();
81+
assertThat(schedulerPropagatesContext()).isFalse();
4982
}
5083

5184
@Test
@@ -71,4 +104,42 @@ void testInvalidBlockUsage() throws InterruptedException {
71104
disposable.dispose();
72105
operator.resetOnEachOperator();
73106
}
107+
108+
private static boolean schedulerHooksSupported() {
109+
try {
110+
Schedulers.class.getMethod("onScheduleHook", String.class, Function.class);
111+
Schedulers.class.getMethod("resetOnScheduleHook", String.class);
112+
return true;
113+
} catch (NoSuchMethodException e) {
114+
return false;
115+
}
116+
}
117+
118+
private static boolean schedulerPropagatesContext() throws InterruptedException {
119+
CountDownLatch latch = new CountDownLatch(1);
120+
AtomicBoolean currentSpanValid = new AtomicBoolean(false);
121+
try (Scope ignored = Context.root().with(PARENT_SPAN).makeCurrent()) {
122+
Schedulers.single()
123+
.schedule(
124+
() -> {
125+
currentSpanValid.set(Span.current().getSpanContext().isValid());
126+
latch.countDown();
127+
});
128+
}
129+
assertThat(latch.await(5, SECONDS)).isTrue();
130+
return currentSpanValid.get();
131+
}
132+
133+
private static class CapturingMono extends Mono<Integer> {
134+
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;
135+
136+
private CapturingMono(AtomicReference<CoreSubscriber<? super Integer>> subscriber) {
137+
this.subscriber = subscriber;
138+
}
139+
140+
@Override
141+
public void subscribe(CoreSubscriber<? super Integer> actual) {
142+
subscriber.set(actual);
143+
}
144+
}
74145
}

0 commit comments

Comments
 (0)