Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public final class ContextPropagationOperator {

private static final Object VALUE = new Object();

private static final String SCHEDULERS_HOOK_KEY = RunnableWrapper.class.getName();

@Nullable
private static final MethodHandle MONO_CONTEXT_WRITE_METHOD = getContextWriteMethod(Mono.class);

Expand All @@ -60,6 +62,9 @@ public final class ContextPropagationOperator {

@Nullable private static final MethodHandle SCHEDULERS_HOOK_METHOD = getSchedulersHookMethod();

@Nullable
private static final MethodHandle SCHEDULERS_RESET_HOOK_METHOD = getSchedulersResetHookMethod();

@Nullable
private static MethodHandle getContextWriteMethod(Class<?> type) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
Expand Down Expand Up @@ -88,6 +93,18 @@ private static MethodHandle getSchedulersHookMethod() {
return null;
}

@Nullable
private static MethodHandle getSchedulersResetHookMethod() {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
try {
return lookup.findStatic(
Schedulers.class, "resetOnScheduleHook", methodType(void.class, String.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// ignore
}
return null;
}

public static ContextPropagationOperator create() {
return builder().build();
}
Expand Down Expand Up @@ -171,7 +188,7 @@ public void registerOnEachOperator() {
Hooks.onEachOperator(
TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
registerScheduleHook(RunnableWrapper.class.getName(), RunnableWrapper::new);
registerScheduleHook(SCHEDULERS_HOOK_KEY, RunnableWrapper::new);
enabled = true;
}
}
Expand All @@ -187,6 +204,17 @@ private static void registerScheduleHook(String key, Function<Runnable, Runnable
}
}

private static void resetScheduleHook(String key) {
if (SCHEDULERS_RESET_HOOK_METHOD == null) {
return;
}
try {
SCHEDULERS_RESET_HOOK_METHOD.invoke(key);
} catch (Throwable t) {
logger.log(WARNING, "Failed to remove scheduler hook", t);
}
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
synchronized (lock) {
Expand All @@ -195,6 +223,7 @@ public void resetOnEachOperator() {
}
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
resetScheduleHook(SCHEDULERS_HOOK_KEY);
enabled = false;
}
}
Expand Down Expand Up @@ -359,7 +388,7 @@ public Object scanUnsafe(Scannable.Attr attr) {
}
}

private static class RunnableWrapper implements Runnable {
static class RunnableWrapper implements Runnable {
private final Runnable delegate;
private final Context context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,49 @@
package io.opentelemetry.instrumentation.reactor.v3_1;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

import static org.junit.jupiter.api.Assumptions.assumeTrue;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

class HooksTest {

private static final Span PARENT_SPAN =
Span.wrap(
SpanContext.create(
"11111111111111111111111111111111",
"1111111111111111",
TraceFlags.getSampled(),
TraceState.getDefault()));

@AfterEach
void resetHooks() throws ReflectiveOperationException {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
if (schedulerHooksSupported()) {
Schedulers.class
.getMethod("resetOnScheduleHook", String.class)
.invoke(null, ContextPropagationOperator.RunnableWrapper.class.getName());
}
}

@Test
void canResetOurHooks() {
ContextPropagationOperator operator = ContextPropagationOperator.create();
Expand All @@ -35,17 +66,19 @@ void canResetOurHooks() {
assertThat(subscriber.get()).extracting("actual").isNotInstanceOf(TracingSubscriber.class);
}

private static class CapturingMono extends Mono<Integer> {
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;
@Test
void canResetSchedulerHook() throws InterruptedException {
assumeTrue(schedulerHooksSupported());

private CapturingMono(AtomicReference<CoreSubscriber<? super Integer>> subscriber) {
this.subscriber = subscriber;
}
ContextPropagationOperator operator = ContextPropagationOperator.create();

@Override
public void subscribe(CoreSubscriber<? super Integer> actual) {
subscriber.set(actual);
}
assertThat(schedulerPropagatesContext()).isFalse();

operator.registerOnEachOperator();
assertThat(schedulerPropagatesContext()).isTrue();

operator.resetOnEachOperator();
assertThat(schedulerPropagatesContext()).isFalse();
}

@Test
Expand All @@ -71,4 +104,42 @@ void testInvalidBlockUsage() throws InterruptedException {
disposable.dispose();
operator.resetOnEachOperator();
}

private static boolean schedulerHooksSupported() {
try {
Schedulers.class.getMethod("onScheduleHook", String.class, Function.class);
Schedulers.class.getMethod("resetOnScheduleHook", String.class);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}

private static boolean schedulerPropagatesContext() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean currentSpanValid = new AtomicBoolean(false);
try (Scope ignored = Context.root().with(PARENT_SPAN).makeCurrent()) {
Schedulers.single()
.schedule(
() -> {
currentSpanValid.set(Span.current().getSpanContext().isValid());
latch.countDown();
});
}
assertThat(latch.await(5, SECONDS)).isTrue();
return currentSpanValid.get();
}

private static class CapturingMono extends Mono<Integer> {
private final AtomicReference<CoreSubscriber<? super Integer>> subscriber;

private CapturingMono(AtomicReference<CoreSubscriber<? super Integer>> subscriber) {
this.subscriber = subscriber;
}

@Override
public void subscribe(CoreSubscriber<? super Integer> actual) {
subscriber.set(actual);
}
}
}
Loading