Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

Expand All @@ -35,9 +34,7 @@ class ConsumerImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.apache.pulsar.client.impl.ConsumerImpl",
"org.apache.pulsar.client.impl.MultiTopicsConsumerImpl");
return named("org.apache.pulsar.client.impl.ConsumerImpl");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,33 @@
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
import org.junit.jupiter.api.Test;

class PulsarClientTest extends AbstractPulsarClientTest {
Expand Down Expand Up @@ -650,6 +659,81 @@ void testConsumeMultiTopics() throws Exception {
processAttributes(topic2, msgId2.toString(), false))));
}

@Test
void testReceiveMultiTopics() throws Exception {
String topicNamePrefix = "persistent://public/default/testReceiveMulti_";
String topic1 = topicNamePrefix + "1";
String topic2 = topicNamePrefix + "2";
producer = client.newProducer(Schema.STRING).topic(topic1).enableBatching(false).create();
producer2 = client.newProducer(Schema.STRING).topic(topic2).enableBatching(false).create();

MessageId msgId1 = testing.runWithSpan("parent1", () -> producer.send("test1"));
MessageId msgId2 = testing.runWithSpan("parent2", () -> producer2.send("test2"));

consumer =
client
.newConsumer(Schema.STRING)
.topic(topic2, topic1)
.subscriptionName("test_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<String> received1 = consumer.receive(1, MINUTES);
Message<String> received2 = consumer.receive(1, MINUTES);
consumer.acknowledge(received1);
consumer.acknowledge(received2);

assertThat(asList(received1.getMessageId().toString(), received2.getMessageId().toString()))
.containsExactlyInAnyOrder(msgId1.toString(), msgId2.toString());

AtomicReference<SpanData> producerSpan = new AtomicReference<>();
AtomicReference<SpanData> producerSpan2 = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanName("parent1", topic1 + " receive", "parent2", topic2 + " receive"),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic1 + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic1, msgId1.toString(), false)));

producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(topic1 + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic1, msgId1.toString(), false))),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic2 + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic2, msgId2.toString(), false)));

producerSpan2.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(topic2 + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasLinks(LinkData.create(producerSpan2.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic2, msgId2.toString(), false))));
}

@SuppressWarnings("deprecation") // using deprecated semconv
@Test
void testConsumePartitionedTopicUsingBatchReceive() throws Exception {
Expand All @@ -663,35 +747,77 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception {
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
producer =
client
.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.messageRouter(
new MessageRouter() {
@Override
public int choosePartition(Message<?> message) {
return Integer.parseInt(message.getKey());
}

@Override
public int choosePartition(Message<?> message, TopicMetadata metadata) {
return choosePartition(message);
}
})
.create();

String msg = "test";
for (int i = 0; i < 10; i++) {
producer.send(msg);
for (int i = 0; i < 4; i++) {
producer.newMessage().key(String.valueOf(i)).value(msg).send();
}

Messages<String> receivedMsg = consumer.batchReceive();
consumer.acknowledge(receivedMsg);
assertThat(receivedMsg).hasSize(4);

assertThat(testing.metrics())
.satisfiesOnlyOnce(
metric ->
assertThat(metric)
.hasName("messaging.receive.messages")
.hasUnit("{message}")
.hasDescription("Measures the number of received messages.")
.hasLongSumSatisfying(
sum ->
sum.containsPointsSatisfying(
point ->
point
.hasValueSatisfying(v -> v.isEqualTo(receivedMsg.size()))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "receive"),
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost)))));
Map<String, Long> receivedMessagesByTopic = new HashMap<>();
for (Message<String> message : receivedMsg) {
receivedMessagesByTopic.merge(message.getTopicName(), 1L, Long::sum);
}

testing.waitAndAssertMetrics(
"io.opentelemetry.pulsar-2.8",
"messaging.receive.messages",
metrics ->
metrics.satisfiesExactly(
metric ->
assertThat(metric)
.hasUnit("{message}")
.hasDescription("Measures the number of received messages.")
.satisfies(
data ->
assertThat(data.getLongSumData().getPoints())
.hasSize(receivedMessagesByTopic.size())
.allSatisfy(
point -> {
String destination =
requireNonNull(
point
.getAttributes()
.get(MESSAGING_DESTINATION_NAME));
assertThat(point.getValue())
.isEqualTo(receivedMessagesByTopic.get(destination));
assertThat(
point
.getAttributes()
.get(MESSAGING_DESTINATION_PARTITION_ID))
.isEqualTo(
String.valueOf(
TopicName.getPartitionIndex(destination)));
assertThat(point.getAttributes().get(MESSAGING_OPERATION))
.isEqualTo("receive");
assertThat(point.getAttributes().get(MESSAGING_SYSTEM))
.isEqualTo("pulsar");
assertThat(point.getAttributes().get(SERVER_PORT))
.isEqualTo((long) brokerPort);
assertThat(point.getAttributes().get(SERVER_ADDRESS))
.isEqualTo(brokerHost);
}))));
}

@Test
Expand Down
Loading