1212import static io .opentelemetry .semconv .ServerAttributes .SERVER_ADDRESS ;
1313import static io .opentelemetry .semconv .ServerAttributes .SERVER_PORT ;
1414import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_DESTINATION_NAME ;
15+ import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_DESTINATION_PARTITION_ID ;
1516import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_OPERATION ;
1617import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_SYSTEM ;
18+ import static java .util .Arrays .asList ;
19+ import static java .util .Objects .requireNonNull ;
1720import static java .util .concurrent .TimeUnit .MINUTES ;
1821import static java .util .concurrent .TimeUnit .SECONDS ;
22+ import static org .assertj .core .api .Assertions .assertThat ;
1923
2024import io .opentelemetry .api .trace .SpanKind ;
2125import io .opentelemetry .sdk .trace .data .LinkData ;
2226import io .opentelemetry .sdk .trace .data .SpanData ;
27+ import java .util .HashMap ;
28+ import java .util .Map ;
2329import java .util .concurrent .CompletableFuture ;
2430import java .util .concurrent .CountDownLatch ;
2531import java .util .concurrent .atomic .AtomicReference ;
2632import org .apache .pulsar .client .api .Message ;
2733import org .apache .pulsar .client .api .MessageId ;
2834import org .apache .pulsar .client .api .MessageListener ;
35+ import org .apache .pulsar .client .api .MessageRouter ;
2936import org .apache .pulsar .client .api .Messages ;
3037import org .apache .pulsar .client .api .Schema ;
3138import org .apache .pulsar .client .api .SubscriptionInitialPosition ;
39+ import org .apache .pulsar .client .api .TopicMetadata ;
3240import org .apache .pulsar .client .api .transaction .Transaction ;
41+ import org .apache .pulsar .common .naming .TopicName ;
3342import org .junit .jupiter .api .Test ;
3443
3544class PulsarClientTest extends AbstractPulsarClientTest {
@@ -650,6 +659,81 @@ void testConsumeMultiTopics() throws Exception {
650659 processAttributes (topic2 , msgId2 .toString (), false ))));
651660 }
652661
662+ @ Test
663+ void testReceiveMultiTopics () throws Exception {
664+ String topicNamePrefix = "persistent://public/default/testReceiveMulti_" ;
665+ String topic1 = topicNamePrefix + "1" ;
666+ String topic2 = topicNamePrefix + "2" ;
667+ producer = client .newProducer (Schema .STRING ).topic (topic1 ).enableBatching (false ).create ();
668+ producer2 = client .newProducer (Schema .STRING ).topic (topic2 ).enableBatching (false ).create ();
669+
670+ MessageId msgId1 = testing .runWithSpan ("parent1" , () -> producer .send ("test1" ));
671+ MessageId msgId2 = testing .runWithSpan ("parent2" , () -> producer2 .send ("test2" ));
672+
673+ consumer =
674+ client
675+ .newConsumer (Schema .STRING )
676+ .topic (topic2 , topic1 )
677+ .subscriptionName ("test_sub" )
678+ .subscriptionInitialPosition (SubscriptionInitialPosition .Earliest )
679+ .subscribe ();
680+
681+ Message <String > received1 = consumer .receive (1 , MINUTES );
682+ Message <String > received2 = consumer .receive (1 , MINUTES );
683+ consumer .acknowledge (received1 );
684+ consumer .acknowledge (received2 );
685+
686+ assertThat (asList (received1 .getMessageId ().toString (), received2 .getMessageId ().toString ()))
687+ .containsExactlyInAnyOrder (msgId1 .toString (), msgId2 .toString ());
688+
689+ AtomicReference <SpanData > producerSpan = new AtomicReference <>();
690+ AtomicReference <SpanData > producerSpan2 = new AtomicReference <>();
691+ testing .waitAndAssertSortedTraces (
692+ orderByRootSpanName ("parent1" , topic1 + " receive" , "parent2" , topic2 + " receive" ),
693+ trace -> {
694+ trace .hasSpansSatisfyingExactly (
695+ span -> span .hasName ("parent1" ).hasKind (SpanKind .INTERNAL ).hasNoParent (),
696+ span ->
697+ span .hasName (topic1 + " publish" )
698+ .hasKind (SpanKind .PRODUCER )
699+ .hasParent (trace .getSpan (0 ))
700+ .hasAttributesSatisfyingExactly (
701+ sendAttributes (topic1 , msgId1 .toString (), false )));
702+
703+ producerSpan .set (trace .getSpan (1 ));
704+ },
705+ trace ->
706+ trace .hasSpansSatisfyingExactly (
707+ span ->
708+ span .hasName (topic1 + " receive" )
709+ .hasKind (SpanKind .CONSUMER )
710+ .hasNoParent ()
711+ .hasLinks (LinkData .create (producerSpan .get ().getSpanContext ()))
712+ .hasAttributesSatisfyingExactly (
713+ receiveAttributes (topic1 , msgId1 .toString (), false ))),
714+ trace -> {
715+ trace .hasSpansSatisfyingExactly (
716+ span -> span .hasName ("parent2" ).hasKind (SpanKind .INTERNAL ).hasNoParent (),
717+ span ->
718+ span .hasName (topic2 + " publish" )
719+ .hasKind (SpanKind .PRODUCER )
720+ .hasParent (trace .getSpan (0 ))
721+ .hasAttributesSatisfyingExactly (
722+ sendAttributes (topic2 , msgId2 .toString (), false )));
723+
724+ producerSpan2 .set (trace .getSpan (1 ));
725+ },
726+ trace ->
727+ trace .hasSpansSatisfyingExactly (
728+ span ->
729+ span .hasName (topic2 + " receive" )
730+ .hasKind (SpanKind .CONSUMER )
731+ .hasNoParent ()
732+ .hasLinks (LinkData .create (producerSpan2 .get ().getSpanContext ()))
733+ .hasAttributesSatisfyingExactly (
734+ receiveAttributes (topic2 , msgId2 .toString (), false ))));
735+ }
736+
653737 @ SuppressWarnings ("deprecation" ) // using deprecated semconv
654738 @ Test
655739 void testConsumePartitionedTopicUsingBatchReceive () throws Exception {
@@ -663,35 +747,77 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception {
663747 .subscriptionInitialPosition (SubscriptionInitialPosition .Earliest )
664748 .subscribe ();
665749
666- producer = client .newProducer (Schema .STRING ).topic (topic ).enableBatching (false ).create ();
750+ producer =
751+ client
752+ .newProducer (Schema .STRING )
753+ .topic (topic )
754+ .enableBatching (false )
755+ .messageRouter (
756+ new MessageRouter () {
757+ @ Override
758+ public int choosePartition (Message <?> message ) {
759+ return Integer .parseInt (message .getKey ());
760+ }
761+
762+ @ Override
763+ public int choosePartition (Message <?> message , TopicMetadata metadata ) {
764+ return choosePartition (message );
765+ }
766+ })
767+ .create ();
667768
668769 String msg = "test" ;
669- for (int i = 0 ; i < 10 ; i ++) {
670- producer .send ( msg );
770+ for (int i = 0 ; i < 4 ; i ++) {
771+ producer .newMessage (). key ( String . valueOf ( i )). value ( msg ). send ( );
671772 }
672773
673774 Messages <String > receivedMsg = consumer .batchReceive ();
674775 consumer .acknowledge (receivedMsg );
776+ assertThat (receivedMsg ).hasSize (4 );
675777
676- assertThat (testing .metrics ())
677- .satisfiesOnlyOnce (
678- metric ->
679- assertThat (metric )
680- .hasName ("messaging.receive.messages" )
681- .hasUnit ("{message}" )
682- .hasDescription ("Measures the number of received messages." )
683- .hasLongSumSatisfying (
684- sum ->
685- sum .containsPointsSatisfying (
686- point ->
687- point
688- .hasValueSatisfying (v -> v .isEqualTo (receivedMsg .size ()))
689- .hasAttributesSatisfyingExactly (
690- equalTo (MESSAGING_DESTINATION_NAME , topic ),
691- equalTo (MESSAGING_OPERATION , "receive" ),
692- equalTo (MESSAGING_SYSTEM , "pulsar" ),
693- equalTo (SERVER_PORT , brokerPort ),
694- equalTo (SERVER_ADDRESS , brokerHost )))));
778+ Map <String , Long > receivedMessagesByTopic = new HashMap <>();
779+ for (Message <String > message : receivedMsg ) {
780+ receivedMessagesByTopic .merge (message .getTopicName (), 1L , Long ::sum );
781+ }
782+
783+ testing .waitAndAssertMetrics (
784+ "io.opentelemetry.pulsar-2.8" ,
785+ "messaging.receive.messages" ,
786+ metrics ->
787+ metrics .satisfiesExactly (
788+ metric ->
789+ assertThat (metric )
790+ .hasUnit ("{message}" )
791+ .hasDescription ("Measures the number of received messages." )
792+ .satisfies (
793+ data ->
794+ assertThat (data .getLongSumData ().getPoints ())
795+ .hasSize (receivedMessagesByTopic .size ())
796+ .allSatisfy (
797+ point -> {
798+ String destination =
799+ requireNonNull (
800+ point
801+ .getAttributes ()
802+ .get (MESSAGING_DESTINATION_NAME ));
803+ assertThat (point .getValue ())
804+ .isEqualTo (receivedMessagesByTopic .get (destination ));
805+ assertThat (
806+ point
807+ .getAttributes ()
808+ .get (MESSAGING_DESTINATION_PARTITION_ID ))
809+ .isEqualTo (
810+ String .valueOf (
811+ TopicName .getPartitionIndex (destination )));
812+ assertThat (point .getAttributes ().get (MESSAGING_OPERATION ))
813+ .isEqualTo ("receive" );
814+ assertThat (point .getAttributes ().get (MESSAGING_SYSTEM ))
815+ .isEqualTo ("pulsar" );
816+ assertThat (point .getAttributes ().get (SERVER_PORT ))
817+ .isEqualTo ((long ) brokerPort );
818+ assertThat (point .getAttributes ().get (SERVER_ADDRESS ))
819+ .isEqualTo (brokerHost );
820+ }))));
695821 }
696822
697823 @ Test
0 commit comments