From 6314a4b98be1c474f09bb5c42714d2d4b7662bbf Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sun, 17 May 2026 16:09:55 -0700 Subject: [PATCH 1/3] Fix duplicate Pulsar multi-topic receive spans --- .../v2_8/ConsumerImplInstrumentation.java | 5 +- .../pulsar/v2_8/PulsarClientTest.java | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index e3cddcd6eb45..76298d99acf8 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -11,7 +11,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isProtected; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; @@ -35,9 +34,7 @@ class ConsumerImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return namedOneOf( - "org.apache.pulsar.client.impl.ConsumerImpl", - "org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"); + return named("org.apache.pulsar.client.impl.ConsumerImpl"); } @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 2c34ede8c5ca..66ed054c9e21 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -14,6 +14,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -650,6 +651,81 @@ void testConsumeMultiTopics() throws Exception { processAttributes(topic2, msgId2.toString(), false)))); } + @Test + void testReceiveMultiTopics() throws Exception { + String topicNamePrefix = "persistent://public/default/testReceiveMulti_"; + String topic1 = topicNamePrefix + "1"; + String topic2 = topicNamePrefix + "2"; + producer = client.newProducer(Schema.STRING).topic(topic1).enableBatching(false).create(); + producer2 = client.newProducer(Schema.STRING).topic(topic2).enableBatching(false).create(); + + MessageId msgId1 = testing.runWithSpan("parent1", () -> producer.send("test1")); + MessageId msgId2 = testing.runWithSpan("parent2", () -> producer2.send("test2")); + + consumer = + client + .newConsumer(Schema.STRING) + .topic(topic2, topic1) + .subscriptionName("test_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message received1 = consumer.receive(1, MINUTES); + Message received2 = consumer.receive(1, MINUTES); + consumer.acknowledge(received1); + consumer.acknowledge(received2); + + assertThat(asList(received1.getMessageId().toString(), received2.getMessageId().toString())) + .containsExactlyInAnyOrder(msgId1.toString(), msgId2.toString()); + + AtomicReference producerSpan = new AtomicReference<>(); + AtomicReference producerSpan2 = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanName("parent1", topic1 + " receive", "parent2", topic2 + " receive"), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic1 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic1, msgId1.toString(), false))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(topic1 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic1, msgId1.toString(), false))), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic2 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic2, msgId2.toString(), false))); + + producerSpan2.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(topic2 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic2, msgId2.toString(), false)))); + } + @SuppressWarnings("deprecation") // using deprecated semconv @Test void testConsumePartitionedTopicUsingBatchReceive() throws Exception { From 6a8bec87753da8d5f746ada1259f1f61158aaf13 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 22 May 2026 12:44:04 -0700 Subject: [PATCH 2/3] Fix Pulsar partitioned batch receive metric test The partitioned batch receive test expected a single base-topic receive metric point and read metrics without waiting. Partitioned consumers report receive message counts per partition, so the assertion failed once partition attributes were emitted. Update the test to wait for the receive metric and validate the per-partition point counts and attributes from the received messages. Validation: .\gradlew.bat :instrumentation:pulsar:pulsar-2.8:javaagent:spotlessCheck :instrumentation:pulsar:pulsar-2.8:javaagent:testClasses --- .../pulsar/v2_8/PulsarClientTest.java | 68 +++++++++++++------ 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 66ed054c9e21..85e21a8fe2a1 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -12,15 +12,20 @@ import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +36,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.naming.TopicName; import org.junit.jupiter.api.Test; class PulsarClientTest extends AbstractPulsarClientTest { @@ -749,25 +755,49 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { Messages receivedMsg = consumer.batchReceive(); consumer.acknowledge(receivedMsg); - assertThat(testing.metrics()) - .satisfiesOnlyOnce( - metric -> - assertThat(metric) - .hasName("messaging.receive.messages") - .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") - .hasLongSumSatisfying( - sum -> - sum.containsPointsSatisfying( - point -> - point - .hasValueSatisfying(v -> v.isEqualTo(receivedMsg.size())) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_DESTINATION_NAME, topic), - equalTo(MESSAGING_OPERATION, "receive"), - equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(SERVER_PORT, brokerPort), - equalTo(SERVER_ADDRESS, brokerHost))))); + Map receivedMessagesByTopic = new HashMap<>(); + for (Message message : receivedMsg) { + receivedMessagesByTopic.merge(message.getTopicName(), 1L, Long::sum); + } + + testing.waitAndAssertMetrics( + "io.opentelemetry.pulsar-2.8", + "messaging.receive.messages", + metrics -> + metrics.satisfiesExactly( + metric -> + assertThat(metric) + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .satisfies( + data -> + assertThat(data.getLongSumData().getPoints()) + .hasSize(receivedMessagesByTopic.size()) + .allSatisfy( + point -> { + String destination = + requireNonNull( + point + .getAttributes() + .get(MESSAGING_DESTINATION_NAME)); + assertThat(point.getValue()) + .isEqualTo(receivedMessagesByTopic.get(destination)); + assertThat( + point + .getAttributes() + .get(MESSAGING_DESTINATION_PARTITION_ID)) + .isEqualTo( + String.valueOf( + TopicName.getPartitionIndex(destination))); + assertThat(point.getAttributes().get(MESSAGING_OPERATION)) + .isEqualTo("receive"); + assertThat(point.getAttributes().get(MESSAGING_SYSTEM)) + .isEqualTo("pulsar"); + assertThat(point.getAttributes().get(SERVER_PORT)) + .isEqualTo((long) brokerPort); + assertThat(point.getAttributes().get(SERVER_ADDRESS)) + .isEqualTo(brokerHost); + })))); } @Test From 0f5786a20730fe2503182a09dd56bf9d60205d5d Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 26 May 2026 11:38:46 -0700 Subject: [PATCH 3/3] Make Pulsar batch receive metric test deterministic The partitioned batch receive test sent ten unkeyed messages, so Pulsar could route multiple messages to each partition and the delta receive metric polling could observe partial per-partition counts. Route one keyed message to each partition and assert the batch size before validating the per-partition metric attributes and counts. Validation: .\gradlew.bat :instrumentation:pulsar:pulsar-2.8:javaagent:spotlessCheck :instrumentation:pulsar:pulsar-2.8:javaagent:testClasses --- .../pulsar/v2_8/PulsarClientTest.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 85e21a8fe2a1..69a433bcd21a 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -32,9 +32,11 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.TopicName; import org.junit.jupiter.api.Test; @@ -745,15 +747,33 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); - producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + producer = + client + .newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .messageRouter( + new MessageRouter() { + @Override + public int choosePartition(Message message) { + return Integer.parseInt(message.getKey()); + } + + @Override + public int choosePartition(Message message, TopicMetadata metadata) { + return choosePartition(message); + } + }) + .create(); String msg = "test"; - for (int i = 0; i < 10; i++) { - producer.send(msg); + for (int i = 0; i < 4; i++) { + producer.newMessage().key(String.valueOf(i)).value(msg).send(); } Messages receivedMsg = consumer.batchReceive(); consumer.acknowledge(receivedMsg); + assertThat(receivedMsg).hasSize(4); Map receivedMessagesByTopic = new HashMap<>(); for (Message message : receivedMsg) {