66package io .opentelemetry .instrumentation .reactor .v3_1 ;
77
88import static java .util .concurrent .TimeUnit .MILLISECONDS ;
9+ import static java .util .concurrent .TimeUnit .SECONDS ;
910import static org .assertj .core .api .Assertions .assertThat ;
10-
11+ import static org .junit .jupiter .api .Assumptions .assumeTrue ;
12+
13+ import io .opentelemetry .api .trace .Span ;
14+ import io .opentelemetry .api .trace .SpanContext ;
15+ import io .opentelemetry .api .trace .TraceFlags ;
16+ import io .opentelemetry .api .trace .TraceState ;
17+ import io .opentelemetry .context .Context ;
18+ import io .opentelemetry .context .Scope ;
1119import java .util .concurrent .Callable ;
20+ import java .util .concurrent .CountDownLatch ;
21+ import java .util .concurrent .atomic .AtomicBoolean ;
1222import java .util .concurrent .atomic .AtomicReference ;
23+ import java .util .function .Function ;
24+ import org .junit .jupiter .api .AfterEach ;
1325import org .junit .jupiter .api .Test ;
1426import reactor .core .CoreSubscriber ;
1527import reactor .core .Disposable ;
28+ import reactor .core .publisher .Hooks ;
1629import reactor .core .publisher .Mono ;
1730import reactor .core .scheduler .Schedulers ;
1831
1932class HooksTest {
2033
34+ private static final Span PARENT_SPAN =
35+ Span .wrap (
36+ SpanContext .create (
37+ "11111111111111111111111111111111" ,
38+ "1111111111111111" ,
39+ TraceFlags .getSampled (),
40+ TraceState .getDefault ()));
41+
42+ @ AfterEach
43+ void resetHooks () {
44+ Hooks .resetOnEachOperator (TracingSubscriber .class .getName ());
45+ if (schedulerHooksSupported ()) {
46+ Schedulers .resetOnScheduleHook (RunnableWrapper .class .getName ());
47+ }
48+ }
49+
2150 @ Test
2251 void canResetOurHooks () {
2352 ContextPropagationOperator operator = ContextPropagationOperator .create ();
@@ -35,6 +64,23 @@ void canResetOurHooks() {
3564 assertThat (subscriber .get ()).extracting ("actual" ).isNotInstanceOf (TracingSubscriber .class );
3665 }
3766
67+ @ Test
68+ void canResetSchedulerHook () throws InterruptedException {
69+ assumeTrue (schedulerHooksSupported ());
70+
71+ ContextPropagationOperator operator = ContextPropagationOperator .create ();
72+
73+ assertThat (scheduledSpanIsCurrent ()).isFalse ();
74+
75+ operator .registerOnEachOperator ();
76+ try {
77+ assertThat (scheduledSpanIsCurrent ()).isTrue ();
78+ } finally {
79+ operator .resetOnEachOperator ();
80+ }
81+ assertThat (scheduledSpanIsCurrent ()).isFalse ();
82+ }
83+
3884 private static class CapturingMono extends Mono <Integer > {
3985 private final AtomicReference <CoreSubscriber <? super Integer >> subscriber ;
4086
@@ -71,4 +117,32 @@ void testInvalidBlockUsage() throws InterruptedException {
71117 disposable .dispose ();
72118 operator .resetOnEachOperator ();
73119 }
120+
121+ private static boolean schedulerHooksSupported () {
122+ try {
123+ Schedulers .class .getMethod ("onScheduleHook" , String .class , Function .class );
124+ Schedulers .class .getMethod ("resetOnScheduleHook" , String .class );
125+ return true ;
126+ } catch (NoSuchMethodException e ) {
127+ return false ;
128+ }
129+ }
130+
131+ private static boolean scheduledSpanIsCurrent () throws InterruptedException {
132+ CountDownLatch latch = new CountDownLatch (1 );
133+ AtomicBoolean currentSpanValid = new AtomicBoolean (false );
134+ // The scope is closed before the scheduled task runs; that is the point of this test:
135+ // the schedulers hook must capture the current context at schedule time, not at execution
136+ // time.
137+ try (Scope ignored = Context .root ().with (PARENT_SPAN ).makeCurrent ()) {
138+ Schedulers .single ()
139+ .schedule (
140+ () -> {
141+ currentSpanValid .set (Span .current ().getSpanContext ().isValid ());
142+ latch .countDown ();
143+ });
144+ }
145+ assertThat (latch .await (5 , SECONDS )).isTrue ();
146+ return currentSpanValid .get ();
147+ }
74148}
0 commit comments