Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
Expand All @@ -34,7 +36,9 @@ class ConsumerImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.ConsumerImpl");
return namedOneOf(
"org.apache.pulsar.client.impl.ConsumerImpl",
"org.apache.pulsar.client.impl.MultiTopicsConsumerImpl");
}

@Override
Expand Down Expand Up @@ -63,6 +67,11 @@ public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isProtected().and(named("internalBatchReceiveAsync")).and(takesArguments(0)),
getClass().getName() + "$ConsumerBatchAsyncReceiveAdvice");

// only in MultiTopicsConsumerImpl
transformer.applyAdviceToMethod(
named("receiveMessageFromConsumer"),
getClass().getName() + "$SuppressInstrumentationAdvice");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -157,4 +166,20 @@ public static CompletableFuture<Messages<?>> after(
return wrapBatch(future, timer, consumer);
}
}

@SuppressWarnings("unused")
public static class SuppressInstrumentationAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static void before() {
// MultiTopicsConsumerImpl#receiveMessageFromConsumer is called from a background thread, we
// don't want to create a span for it.
PulsarSingletons.startSuppressingReceive();
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false)
public static void after() {
PulsarSingletons.endSuppressingReceive();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class PulsarSingletons {
private static final Instrumenter<PulsarRequest, Void> producerInstrumenter =
createProducerInstrumenter();

private static final ThreadLocal<Boolean> suppressReceive = new ThreadLocal<>();

public static Instrumenter<PulsarRequest, Void> consumerProcessInstrumenter() {
return consumerProcessInstrumenter;
}
Expand Down Expand Up @@ -252,6 +254,10 @@ public static CompletableFuture<Void> wrap(CompletableFuture<Void> future) {

public static CompletableFuture<Message<?>> wrap(
CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
if (isSuppressingReceive()) {
return future;
}

boolean listenerContextActive = MessageListenerContext.isProcessing();
Context parent = Context.current();
CompletableFuture<Message<?>> result = new CompletableFuture<>();
Expand Down Expand Up @@ -279,6 +285,10 @@ public static CompletableFuture<Message<?>> wrap(

public static CompletableFuture<Messages<?>> wrapBatch(
CompletableFuture<Messages<?>> future, Timer timer, Consumer<?> consumer) {
if (isSuppressingReceive()) {
return future;
}

Context parent = Context.current();
CompletableFuture<Messages<?>> result = new CompletableFuture<>();
future.whenComplete(
Expand Down Expand Up @@ -309,5 +319,17 @@ private static void runWithContext(@Nullable Context context, Runnable runnable)
}
}

public static void startSuppressingReceive() {
suppressReceive.set(true);
}

public static void endSuppressingReceive() {
suppressReceive.remove();
}

private static boolean isSuppressingReceive() {
return Boolean.TRUE.equals(suppressReceive.get());
}

private PulsarSingletons() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -38,7 +33,6 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
import org.junit.jupiter.api.Test;

class PulsarClientTest extends AbstractPulsarClientTest {
Expand Down Expand Up @@ -771,15 +765,12 @@ public int choosePartition(Message<?> message, TopicMetadata metadata) {
producer.newMessage().key(String.valueOf(i)).value(msg).send();
}

Thread.sleep(1_000); // wait so that messages would be received as one batch

Messages<String> receivedMsg = consumer.batchReceive();
consumer.acknowledge(receivedMsg);
assertThat(receivedMsg).hasSize(4);

Map<String, Long> receivedMessagesByTopic = new HashMap<>();
for (Message<String> message : receivedMsg) {
receivedMessagesByTopic.merge(message.getTopicName(), 1L, Long::sum);
}

testing.waitAndAssertMetrics(
"io.opentelemetry.pulsar-2.8",
"messaging.receive.messages",
Expand All @@ -792,23 +783,15 @@ public int choosePartition(Message<?> message, TopicMetadata metadata) {
.satisfies(
data ->
assertThat(data.getLongSumData().getPoints())
.hasSize(receivedMessagesByTopic.size())
.hasSize(1)
.allSatisfy(
point -> {
String destination =
requireNonNull(
point
.getAttributes()
.get(MESSAGING_DESTINATION_NAME));
assertThat(point.getValue())
.isEqualTo(receivedMessagesByTopic.get(destination));
assertThat(point.getValue()).isEqualTo(4);
assertThat(
point
.getAttributes()
.get(MESSAGING_DESTINATION_PARTITION_ID))
.isEqualTo(
String.valueOf(
TopicName.getPartitionIndex(destination)));
.get(MESSAGING_DESTINATION_NAME))
.isEqualTo(topic);
assertThat(point.getAttributes().get(MESSAGING_OPERATION))
.isEqualTo("receive");
assertThat(point.getAttributes().get(MESSAGING_SYSTEM))
Expand Down
Loading