66package io .opentelemetry .instrumentation .reactor .v3_1 ;
77
88import static java .util .concurrent .TimeUnit .MILLISECONDS ;
9+ import static java .util .concurrent .TimeUnit .SECONDS ;
910import static org .assertj .core .api .Assertions .assertThat ;
10-
11+ import static org .junit .jupiter .api .Assumptions .assumeTrue ;
12+
13+ import io .opentelemetry .api .trace .Span ;
14+ import io .opentelemetry .api .trace .SpanContext ;
15+ import io .opentelemetry .api .trace .TraceFlags ;
16+ import io .opentelemetry .api .trace .TraceState ;
17+ import io .opentelemetry .context .Context ;
18+ import io .opentelemetry .context .Scope ;
1119import java .util .concurrent .Callable ;
20+ import java .util .concurrent .CountDownLatch ;
21+ import java .util .concurrent .atomic .AtomicBoolean ;
1222import java .util .concurrent .atomic .AtomicReference ;
23+ import java .util .function .Function ;
24+ import org .junit .jupiter .api .AfterEach ;
1325import org .junit .jupiter .api .Test ;
1426import reactor .core .CoreSubscriber ;
1527import reactor .core .Disposable ;
28+ import reactor .core .publisher .Hooks ;
1629import reactor .core .publisher .Mono ;
1730import reactor .core .scheduler .Schedulers ;
1831
1932class HooksTest {
2033
34+ private static final Span PARENT_SPAN =
35+ Span .wrap (
36+ SpanContext .create (
37+ "11111111111111111111111111111111" ,
38+ "1111111111111111" ,
39+ TraceFlags .getSampled (),
40+ TraceState .getDefault ()));
41+
42+ @ AfterEach
43+ void resetHooks () {
44+ Hooks .resetOnEachOperator (TracingSubscriber .class .getName ());
45+ if (schedulerHooksSupported ()) {
46+ Schedulers .resetOnScheduleHook (RunnableWrapper .class .getName ());
47+ }
48+ }
49+
2150 @ Test
2251 void canResetOurHooks () {
2352 ContextPropagationOperator operator = ContextPropagationOperator .create ();
@@ -35,17 +64,19 @@ void canResetOurHooks() {
3564 assertThat (subscriber .get ()).extracting ("actual" ).isNotInstanceOf (TracingSubscriber .class );
3665 }
3766
38- private static class CapturingMono extends Mono <Integer > {
39- private final AtomicReference <CoreSubscriber <? super Integer >> subscriber ;
67+ @ Test
68+ void canResetSchedulerHook () throws InterruptedException {
69+ assumeTrue (schedulerHooksSupported ());
4070
41- private CapturingMono (AtomicReference <CoreSubscriber <? super Integer >> subscriber ) {
42- this .subscriber = subscriber ;
43- }
71+ ContextPropagationOperator operator = ContextPropagationOperator .create ();
4472
45- @ Override
46- public void subscribe (CoreSubscriber <? super Integer > actual ) {
47- subscriber .set (actual );
48- }
73+ assertThat (scheduledSpanIsCurrent ()).isFalse ();
74+
75+ operator .registerOnEachOperator ();
76+ assertThat (scheduledSpanIsCurrent ()).isTrue ();
77+
78+ operator .resetOnEachOperator ();
79+ assertThat (scheduledSpanIsCurrent ()).isFalse ();
4980 }
5081
5182 @ Test
@@ -71,4 +102,45 @@ void testInvalidBlockUsage() throws InterruptedException {
71102 disposable .dispose ();
72103 operator .resetOnEachOperator ();
73104 }
105+
106+ private static boolean schedulerHooksSupported () {
107+ try {
108+ Schedulers .class .getMethod ("onScheduleHook" , String .class , Function .class );
109+ Schedulers .class .getMethod ("resetOnScheduleHook" , String .class );
110+ return true ;
111+ } catch (NoSuchMethodException e ) {
112+ return false ;
113+ }
114+ }
115+
116+ private static boolean scheduledSpanIsCurrent () throws InterruptedException {
117+ CountDownLatch latch = new CountDownLatch (1 );
118+ AtomicBoolean currentSpanValid = new AtomicBoolean (false );
119+ // The scope is closed before the scheduled task runs; that is the point of this test:
120+ // the schedulers hook must capture the current context at schedule time, not at execution
121+ // time.
122+ try (Scope ignored = Context .root ().with (PARENT_SPAN ).makeCurrent ()) {
123+ Schedulers .single ()
124+ .schedule (
125+ () -> {
126+ currentSpanValid .set (Span .current ().getSpanContext ().isValid ());
127+ latch .countDown ();
128+ });
129+ }
130+ assertThat (latch .await (5 , SECONDS )).isTrue ();
131+ return currentSpanValid .get ();
132+ }
133+
134+ private static class CapturingMono extends Mono <Integer > {
135+ private final AtomicReference <CoreSubscriber <? super Integer >> subscriber ;
136+
137+ private CapturingMono (AtomicReference <CoreSubscriber <? super Integer >> subscriber ) {
138+ this .subscriber = subscriber ;
139+ }
140+
141+ @ Override
142+ public void subscribe (CoreSubscriber <? super Integer > actual ) {
143+ subscriber .set (actual );
144+ }
145+ }
74146}
0 commit comments