diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index e3cddcd6eb45..76298d99acf8 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -11,7 +11,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isProtected; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; @@ -35,9 +34,7 @@ class ConsumerImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return namedOneOf( - "org.apache.pulsar.client.impl.ConsumerImpl", - "org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"); + return named("org.apache.pulsar.client.impl.ConsumerImpl"); } @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 2c34ede8c5ca..69a433bcd21a 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -12,24 +12,33 @@ import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.naming.TopicName; import org.junit.jupiter.api.Test; class PulsarClientTest extends AbstractPulsarClientTest { @@ -650,6 +659,81 @@ void testConsumeMultiTopics() throws Exception { processAttributes(topic2, msgId2.toString(), false)))); } + @Test + void testReceiveMultiTopics() throws Exception { + String topicNamePrefix = "persistent://public/default/testReceiveMulti_"; + String topic1 = topicNamePrefix + "1"; + String topic2 = topicNamePrefix + "2"; + producer = client.newProducer(Schema.STRING).topic(topic1).enableBatching(false).create(); + producer2 = client.newProducer(Schema.STRING).topic(topic2).enableBatching(false).create(); + + MessageId msgId1 = testing.runWithSpan("parent1", () -> producer.send("test1")); + MessageId msgId2 = testing.runWithSpan("parent2", () -> producer2.send("test2")); + + consumer = + client + .newConsumer(Schema.STRING) + .topic(topic2, topic1) + .subscriptionName("test_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message received1 = consumer.receive(1, MINUTES); + Message received2 = consumer.receive(1, MINUTES); + consumer.acknowledge(received1); + consumer.acknowledge(received2); + + assertThat(asList(received1.getMessageId().toString(), received2.getMessageId().toString())) + .containsExactlyInAnyOrder(msgId1.toString(), msgId2.toString()); + + AtomicReference producerSpan = new AtomicReference<>(); + AtomicReference producerSpan2 = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanName("parent1", topic1 + " receive", "parent2", topic2 + " receive"), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic1 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic1, msgId1.toString(), false))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(topic1 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic1, msgId1.toString(), false))), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic2 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic2, msgId2.toString(), false))); + + producerSpan2.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(topic2 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic2, msgId2.toString(), false)))); + } + @SuppressWarnings("deprecation") // using deprecated semconv @Test void testConsumePartitionedTopicUsingBatchReceive() throws Exception { @@ -663,35 +747,77 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); - producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + producer = + client + .newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .messageRouter( + new MessageRouter() { + @Override + public int choosePartition(Message message) { + return Integer.parseInt(message.getKey()); + } + + @Override + public int choosePartition(Message message, TopicMetadata metadata) { + return choosePartition(message); + } + }) + .create(); String msg = "test"; - for (int i = 0; i < 10; i++) { - producer.send(msg); + for (int i = 0; i < 4; i++) { + producer.newMessage().key(String.valueOf(i)).value(msg).send(); } Messages receivedMsg = consumer.batchReceive(); consumer.acknowledge(receivedMsg); + assertThat(receivedMsg).hasSize(4); - assertThat(testing.metrics()) - .satisfiesOnlyOnce( - metric -> - assertThat(metric) - .hasName("messaging.receive.messages") - .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") - .hasLongSumSatisfying( - sum -> - sum.containsPointsSatisfying( - point -> - point - .hasValueSatisfying(v -> v.isEqualTo(receivedMsg.size())) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_DESTINATION_NAME, topic), - equalTo(MESSAGING_OPERATION, "receive"), - equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(SERVER_PORT, brokerPort), - equalTo(SERVER_ADDRESS, brokerHost))))); + Map receivedMessagesByTopic = new HashMap<>(); + for (Message message : receivedMsg) { + receivedMessagesByTopic.merge(message.getTopicName(), 1L, Long::sum); + } + + testing.waitAndAssertMetrics( + "io.opentelemetry.pulsar-2.8", + "messaging.receive.messages", + metrics -> + metrics.satisfiesExactly( + metric -> + assertThat(metric) + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .satisfies( + data -> + assertThat(data.getLongSumData().getPoints()) + .hasSize(receivedMessagesByTopic.size()) + .allSatisfy( + point -> { + String destination = + requireNonNull( + point + .getAttributes() + .get(MESSAGING_DESTINATION_NAME)); + assertThat(point.getValue()) + .isEqualTo(receivedMessagesByTopic.get(destination)); + assertThat( + point + .getAttributes() + .get(MESSAGING_DESTINATION_PARTITION_ID)) + .isEqualTo( + String.valueOf( + TopicName.getPartitionIndex(destination))); + assertThat(point.getAttributes().get(MESSAGING_OPERATION)) + .isEqualTo("receive"); + assertThat(point.getAttributes().get(MESSAGING_SYSTEM)) + .isEqualTo("pulsar"); + assertThat(point.getAttributes().get(SERVER_PORT)) + .isEqualTo((long) brokerPort); + assertThat(point.getAttributes().get(SERVER_ADDRESS)) + .isEqualTo(brokerHost); + })))); } @Test