Skip to content

Commit 1669bc4

Browse files
otelbot[bot]trask
authored andcommitted
Review fixes for reactor-3.1:library
Automated code review of instrumentation/reactor/reactor-3.1/library.
1 parent 83ad467 commit 1669bc4

2 files changed

Lines changed: 109 additions & 11 deletions

File tree

instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/v3_1/ContextPropagationOperator.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public final class ContextPropagationOperator {
5252

5353
private static final Object VALUE = new Object();
5454

55+
private static final String SCHEDULERS_HOOK_KEY = RunnableWrapper.class.getName();
56+
5557
@Nullable
5658
private static final MethodHandle MONO_CONTEXT_WRITE_METHOD = getContextWriteMethod(Mono.class);
5759

@@ -60,6 +62,9 @@ public final class ContextPropagationOperator {
6062

6163
@Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();
6264

65+
@Nullable
66+
private static final MethodHandle SCHEDULERS_RESET_HOOK_METHOD = getSchedulersResetHookMethod();
67+
6368
@Nullable
6469
private static MethodHandle getContextWriteMethod(Class<?> type) {
6570
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
@@ -88,6 +93,18 @@ private static MethodHandle getSchedulersHookMethod() {
8893
return null;
8994
}
9095

96+
@Nullable
97+
private static MethodHandle getSchedulersResetHookMethod() {
98+
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
99+
try {
100+
return lookup.findStatic(
101+
Schedulers.class, "resetOnScheduleHook", methodType(void.class, String.class));
102+
} catch (NoSuchMethodException | IllegalAccessException e) {
103+
// ignore
104+
}
105+
return null;
106+
}
107+
91108
public static ContextPropagationOperator create() {
92109
return builder().build();
93110
}
@@ -171,7 +188,7 @@ public void registerOnEachOperator() {
171188
Hooks.onEachOperator(
172189
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
173190
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
174-
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
191+
registerScheduleHook(SCHEDULERS_HOOK_KEY, RunnableWrapper::new);
175192
enabled = true;
176193
}
177194
}
@@ -187,6 +204,17 @@ private static void registerScheduleHook(String key, Function<Runnable, Runnable
187204
}
188205
}
189206

207+
private static void resetScheduleHook(String key) {
208+
if (SCHEDULERS_RESET_HOOK_METHOD == null) {
209+
return;
210+
}
211+
try {
212+
SCHEDULERS_RESET_HOOK_METHOD.invoke(key);
213+
} catch (Throwable t) {
214+
logger.log(WARNING, "Failed to remove scheduler hook", t);
215+
}
216+
}
217+
190218
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
191219
public void resetOnEachOperator() {
192220
synchronized (lock) {
@@ -195,6 +223,7 @@ public void resetOnEachOperator() {
195223
}
196224
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
197225
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
226+
resetScheduleHook(SCHEDULERS_HOOK_KEY);
198227
enabled = false;
199228
}
200229
}

instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/v3_1/HooksTest.java

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,47 @@
66
package io.opentelemetry.instrumentation.reactor.v3_1;
77

88
import static java.util.concurrent.TimeUnit.MILLISECONDS;
9+
import static java.util.concurrent.TimeUnit.SECONDS;
910
import static org.assertj.core.api.Assertions.assertThat;
10-
11+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
12+
13+
import io.opentelemetry.api.trace.Span;
14+
import io.opentelemetry.api.trace.SpanContext;
15+
import io.opentelemetry.api.trace.TraceFlags;
16+
import io.opentelemetry.api.trace.TraceState;
17+
import io.opentelemetry.context.Context;
18+
import io.opentelemetry.context.Scope;
1119
import java.util.concurrent.Callable;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1222
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.function.Function;
24+
import org.junit.jupiter.api.AfterEach;
1325
import org.junit.jupiter.api.Test;
1426
import reactor.core.CoreSubscriber;
1527
import reactor.core.Disposable;
28+
import reactor.core.publisher.Hooks;
1629
import reactor.core.publisher.Mono;
1730
import reactor.core.scheduler.Schedulers;
1831

1932
class HooksTest {
2033

34+
private static final Span PARENT_SPAN =
35+
Span.wrap(
36+
SpanContext.create(
37+
"11111111111111111111111111111111",
38+
"1111111111111111",
39+
TraceFlags.getSampled(),
40+
TraceState.getDefault()));
41+
42+
@AfterEach
43+
void resetHooks() {
44+
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
45+
if (schedulerHooksSupported()) {
46+
Schedulers.resetOnScheduleHook(RunnableWrapper.class.getName());
47+
}
48+
}
49+
2150
@Test
2251
void canResetOurHooks() {
2352
ContextPropagationOperator operator = ContextPropagationOperator.create();
@@ -35,17 +64,19 @@ void canResetOurHooks() {
3564
assertThat(subscriber.get()).extracting("actual").isNotInstanceOf(TracingSubscriber.class);
3665
}
3766

38-
private static class CapturingMono extends Mono<Integer> {
39-
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;
67+
@Test
68+
void canResetSchedulerHook() throws InterruptedException {
69+
assumeTrue(schedulerHooksSupported());
4070

41-
private CapturingMono(AtomicReference<CoreSubscriber<? super Integer>> subscriber) {
42-
this.subscriber = subscriber;
43-
}
71+
ContextPropagationOperator operator = ContextPropagationOperator.create();
4472

45-
@Override
46-
public void subscribe(CoreSubscriber<? super Integer> actual) {
47-
subscriber.set(actual);
48-
}
73+
assertThat(schedulerPropagatesContext()).isFalse();
74+
75+
operator.registerOnEachOperator();
76+
assertThat(schedulerPropagatesContext()).isTrue();
77+
78+
operator.resetOnEachOperator();
79+
assertThat(schedulerPropagatesContext()).isFalse();
4980
}
5081

5182
@Test
@@ -71,4 +102,42 @@ void testInvalidBlockUsage() throws InterruptedException {
71102
disposable.dispose();
72103
operator.resetOnEachOperator();
73104
}
105+
106+
private static boolean schedulerHooksSupported() {
107+
try {
108+
Schedulers.class.getMethod("onScheduleHook", String.class, Function.class);
109+
Schedulers.class.getMethod("resetOnScheduleHook", String.class);
110+
return true;
111+
} catch (NoSuchMethodException e) {
112+
return false;
113+
}
114+
}
115+
116+
private static boolean schedulerPropagatesContext() throws InterruptedException {
117+
CountDownLatch latch = new CountDownLatch(1);
118+
AtomicBoolean currentSpanValid = new AtomicBoolean(false);
119+
try (Scope ignored = Context.root().with(PARENT_SPAN).makeCurrent()) {
120+
Schedulers.single()
121+
.schedule(
122+
() -> {
123+
currentSpanValid.set(Span.current().getSpanContext().isValid());
124+
latch.countDown();
125+
});
126+
}
127+
assertThat(latch.await(5, SECONDS)).isTrue();
128+
return currentSpanValid.get();
129+
}
130+
131+
private static class CapturingMono extends Mono<Integer> {
132+
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;
133+
134+
private CapturingMono(AtomicReference<CoreSubscriber<? super Integer>> subscriber) {
135+
this.subscriber = subscriber;
136+
}
137+
138+
@Override
139+
public void subscribe(CoreSubscriber<? super Integer> actual) {
140+
subscriber.set(actual);
141+
}
142+
}
74143
}

0 commit comments

Comments
 (0)