From 96740ee7885738e87341fb064036378b64210653 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 29 Jun 2026 16:56:26 +0300 Subject: [PATCH] Fix flaky pulsar test --- .../v2_8/ConsumerImplInstrumentation.java | 27 ++++++++++++++++- .../v2_8/telemetry/PulsarSingletons.java | 22 ++++++++++++++ .../pulsar/v2_8/PulsarClientTest.java | 29 ++++--------------- 3 files changed, 54 insertions(+), 24 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index 76298d99acf8..e3bffb87a8f6 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isProtected; import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; @@ -18,6 +19,7 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; @@ -34,7 +36,9 @@ class ConsumerImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("org.apache.pulsar.client.impl.ConsumerImpl"); + return namedOneOf( + "org.apache.pulsar.client.impl.ConsumerImpl", + "org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"); } @Override @@ -63,6 +67,11 @@ public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( isProtected().and(named("internalBatchReceiveAsync")).and(takesArguments(0)), getClass().getName() + "$ConsumerBatchAsyncReceiveAdvice"); + + // only in MultiTopicsConsumerImpl + transformer.applyAdviceToMethod( + named("receiveMessageFromConsumer"), + getClass().getName() + "$SuppressInstrumentationAdvice"); } @SuppressWarnings("unused") @@ -157,4 +166,20 @@ public static CompletableFuture> after( return wrapBatch(future, timer, consumer); } } + + @SuppressWarnings("unused") + public static class SuppressInstrumentationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + public static void before() { + // MultiTopicsConsumerImpl#receiveMessageFromConsumer is called from a background thread, we + // don't want to create a span for it. + PulsarSingletons.startSuppressingReceive(); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) + public static void after() { + PulsarSingletons.endSuppressingReceive(); + } + } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index e3a9b50dc7c2..4cd04c0a4a32 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -60,6 +60,8 @@ public class PulsarSingletons { private static final Instrumenter producerInstrumenter = createProducerInstrumenter(); + private static final ThreadLocal suppressReceive = new ThreadLocal<>(); + public static Instrumenter consumerProcessInstrumenter() { return consumerProcessInstrumenter; } @@ -252,6 +254,10 @@ public static CompletableFuture wrap(CompletableFuture future) { public static CompletableFuture> wrap( CompletableFuture> future, Timer timer, Consumer consumer) { + if (isSuppressingReceive()) { + return future; + } + boolean listenerContextActive = MessageListenerContext.isProcessing(); Context parent = Context.current(); CompletableFuture> result = new CompletableFuture<>(); @@ -279,6 +285,10 @@ public static CompletableFuture> wrap( public static CompletableFuture> wrapBatch( CompletableFuture> future, Timer timer, Consumer consumer) { + if (isSuppressingReceive()) { + return future; + } + Context parent = Context.current(); CompletableFuture> result = new CompletableFuture<>(); future.whenComplete( @@ -309,5 +319,17 @@ private static void runWithContext(@Nullable Context context, Runnable runnable) } } + public static void startSuppressingReceive() { + suppressReceive.set(true); + } + + public static void endSuppressingReceive() { + suppressReceive.remove(); + } + + private static boolean isSuppressingReceive() { + return Boolean.TRUE.equals(suppressReceive.get()); + } + private PulsarSingletons() {} } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 69a433bcd21a..9d3603afa177 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -12,20 +12,15 @@ import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; -import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static java.util.Arrays.asList; -import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -38,7 +33,6 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.api.transaction.Transaction; -import org.apache.pulsar.common.naming.TopicName; import org.junit.jupiter.api.Test; class PulsarClientTest extends AbstractPulsarClientTest { @@ -771,15 +765,12 @@ public int choosePartition(Message message, TopicMetadata metadata) { producer.newMessage().key(String.valueOf(i)).value(msg).send(); } + Thread.sleep(1_000); // wait so that messages would be received as one batch + Messages receivedMsg = consumer.batchReceive(); consumer.acknowledge(receivedMsg); assertThat(receivedMsg).hasSize(4); - Map receivedMessagesByTopic = new HashMap<>(); - for (Message message : receivedMsg) { - receivedMessagesByTopic.merge(message.getTopicName(), 1L, Long::sum); - } - testing.waitAndAssertMetrics( "io.opentelemetry.pulsar-2.8", "messaging.receive.messages", @@ -792,23 +783,15 @@ public int choosePartition(Message message, TopicMetadata metadata) { .satisfies( data -> assertThat(data.getLongSumData().getPoints()) - .hasSize(receivedMessagesByTopic.size()) + .hasSize(1) .allSatisfy( point -> { - String destination = - requireNonNull( - point - .getAttributes() - .get(MESSAGING_DESTINATION_NAME)); - assertThat(point.getValue()) - .isEqualTo(receivedMessagesByTopic.get(destination)); + assertThat(point.getValue()).isEqualTo(4); assertThat( point .getAttributes() - .get(MESSAGING_DESTINATION_PARTITION_ID)) - .isEqualTo( - String.valueOf( - TopicName.getPartitionIndex(destination))); + .get(MESSAGING_DESTINATION_NAME)) + .isEqualTo(topic); assertThat(point.getAttributes().get(MESSAGING_OPERATION)) .isEqualTo("receive"); assertThat(point.getAttributes().get(MESSAGING_SYSTEM))