diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java index f4f4295a503f..2a8604d840da 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java @@ -66,7 +66,7 @@ public final class TracingExecutionInterceptor implements ExecutionInterceptor { new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".AwsSdkRequest"); static final ExecutionAttribute SDK_HTTP_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkHttpRequest"); - static final ExecutionAttribute SDK_REQUEST_ATTRIBUTE = + public static final ExecutionAttribute SDK_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkRequest"); private static final ExecutionAttribute REQUEST_FINISHER_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestFinisher"); diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java index 35c30606d029..afbd429a5207 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java @@ -35,7 +35,7 @@ public static class ExecuteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Nullable public static Scope methodEnter(@Advice.Argument(0) Collection> messages) { - return SpringAwsUtil.handleBatch(messages); + return SpringAwsUtil.restoreBatchContext(messages); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java index 870e4deb38d2..bf8fea81dffa 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java @@ -10,6 +10,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.Collection; import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; @@ -28,7 +29,9 @@ public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( named("onMessage").and(takesArgument(0, named("org.springframework.messaging.Message"))), getClass().getName() + "$OnMessageAdvice"); - // TODO: onMessage(Collection> messages) not instrumented + transformer.applyAdviceToMethod( + named("onMessage").and(takesArgument(0, named("java.util.Collection"))), + getClass().getName() + "$OnMessagesAdvice"); } @SuppressWarnings("unused") @@ -48,4 +51,23 @@ public static void methodExit( } } } + + @SuppressWarnings("unused") + public static class OnMessagesAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + @Nullable + public static SpringAwsUtil.BatchMessageScope methodEnter( + @Advice.Argument(0) Collection> messages) { + return SpringAwsUtil.handleBatch(messages); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) + public static void methodExit( + @Advice.Enter @Nullable SpringAwsUtil.BatchMessageScope scope, + @Advice.Thrown @Nullable Throwable throwable) { + if (scope != null) { + scope.close(throwable); + } + } + } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index fe02d827b412..e3e5fb41c669 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -5,9 +5,18 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; +import static java.util.Collections.emptyList; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.awssdk.v2_2.internal.Response; import io.opentelemetry.instrumentation.awssdk.v2_2.internal.SqsMessage; @@ -17,15 +26,132 @@ import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingExecutionInterceptor; import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingList; import java.util.Collection; +import java.util.List; import javax.annotation.Nullable; import org.springframework.messaging.Message; +import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; public class SpringAwsUtil { private static final ThreadLocal context = new ThreadLocal<>(); private static final VirtualField, TracingContext> tracingContextField = VirtualField.find(Message.class, TracingContext.class); + private static final MessagingAttributesGetter>, Void> + BATCH_ATTRIBUTES_GETTER = + new MessagingAttributesGetter>, Void>() { + @Override + public String getSystem(Collection> messages) { + return "aws_sqs"; + } + + @Nullable + @Override + public String getDestination(Collection> messages) { + if (!messages.isEmpty()) { + Message message = messages.iterator().next(); + TracingContext tracingContext = tracingContextField.get(message); + if (tracingContext != null) { + SdkRequest sdkRequest = + tracingContext.request.getAttribute( + TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE); + if (sdkRequest instanceof ReceiveMessageRequest receiveMessageRequest) { + String queueUrl = receiveMessageRequest.queueUrl(); + if (queueUrl != null) { + int i = queueUrl.lastIndexOf('/'); + if (i > 0) { + return queueUrl.substring(i + 1); + } + } + } + } + } + return null; + } + + @Nullable + @Override + public String getDestinationTemplate(Collection> messages) { + return null; + } + + @Override + public boolean isTemporaryDestination(Collection> messages) { + return false; + } + + @Override + public boolean isAnonymousDestination(Collection> messages) { + return false; + } + + @Nullable + @Override + public String getConversationId(Collection> messages) { + return null; + } + + @Nullable + @Override + public Long getMessageBodySize(Collection> messages) { + return null; + } + + @Nullable + @Override + public Long getMessageEnvelopeSize(Collection> messages) { + return null; + } + + @Nullable + @Override + public String getMessageId(Collection> messages, @Nullable Void unused) { + return null; + } + + @Nullable + @Override + public String getClientId(Collection> messages) { + return null; + } + + @Nullable + @Override + public Long getBatchMessageCount( + Collection> messages, @Nullable Void unused) { + return (long) messages.size(); + } + + @Override + public List getMessageHeader(Collection> messages, String name) { + return emptyList(); + } + }; + + private static final Instrumenter>, Void> BATCH_INSTRUMENTER = + Instrumenter.>, Void>builder( + GlobalOpenTelemetry.get(), + "io.opentelemetry.spring-cloud-aws-3.0", + MessagingSpanNameExtractor.create(BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS) + .build()) + .addSpanLinksExtractor( + (spanLinks, parentContext, messages) -> { + for (Message message : messages) { + TracingContext tracingContext = tracingContextField.get(message); + if (tracingContext != null) { + SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); + Context extracted = + SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); + spanLinks.addLink(Span.fromContext(extracted).getSpanContext()); + } + } + }) + .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + // put the TracingList into thread local, so we can use it in attachTracingState method public static void initialize(Collection messages) { if (messages instanceof TracingList tracingList) { @@ -72,23 +198,65 @@ public static MessageScope handleMessage(Message message) { return tracingContext.trace(); } - // restore context from the first message of the batch @Nullable - public static Scope handleBatch(Collection> messages) { + public static BatchMessageScope handleBatch(Collection> messages) { + if (messages.isEmpty()) { + return null; + } + + // Check if the batch has any tracing context before starting + Message firstMessage = messages.iterator().next(); + TracingContext tracingContext = tracingContextField.get(firstMessage); + if (tracingContext == null) { + return null; + } + + // Start a separate trace (NO parent from the queue) using Context.current() + // The instrumenter adds span links to all messages. + Context parentContext = Context.current(); + if (!BATCH_INSTRUMENTER.shouldStart(parentContext, messages)) { + return null; + } + Context context = BATCH_INSTRUMENTER.start(parentContext, messages); + + for (Message msg : messages) { + TracingContext tc = tracingContextField.get(msg); + if (tc != null) { + tc.batchProcessContext = context; + } + } + + return new BatchMessageScope(context, messages); + } + + @Nullable + public static Scope restoreBatchContext(Collection> messages) { if (messages.isEmpty()) { return null; } Message message = messages.iterator().next(); TracingContext tracingContext = tracingContextField.get(message); - if (tracingContext == null) { + if (tracingContext == null || tracingContext.batchProcessContext == null) { return null; } - SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); - Context parentContext = tracingContext.receiveContext; - if (parentContext == null) { - parentContext = SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); + return tracingContext.batchProcessContext.makeCurrent(); + } + + public static class BatchMessageScope { + private final Context context; + private final Collection> request; + private final Scope scope; + + private BatchMessageScope(Context context, Collection> request) { + this.context = context; + this.request = request; + this.scope = context.makeCurrent(); + } + + public void close(@Nullable Throwable throwable) { + scope.close(); + BATCH_INSTRUMENTER.end(context, request, null, throwable); } - return parentContext.makeCurrent(); } public static class MessageScope { @@ -123,6 +291,7 @@ private static class TracingContext { private final TracingExecutionInterceptor config; @Nullable private final Context receiveContext; private final software.amazon.awssdk.services.sqs.model.Message sqsMessage; + @Nullable Context batchProcessContext; private TracingContext( TracingList tracingList, software.amazon.awssdk.services.sqs.model.Message sqsMessage) { @@ -146,6 +315,7 @@ MessageScope trace() { return null; } Context context = instrumenter.start(parentContext, processRequest); + this.batchProcessContext = context; return new MessageScope(instrumenter, context, processRequest, response); } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 67f86215e11a..19d766e273a4 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -15,6 +15,7 @@ import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_SQS_QUEUE_URL; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; @@ -23,24 +24,34 @@ import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pekko.http.scaladsl.Http; import org.assertj.core.api.AbstractStringAssert; import org.elasticmq.rest.sqs.SQSRestServer; import org.elasticmq.rest.sqs.SQSRestServerBuilder; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContext; +import org.springframework.messaging.support.MessageBuilder; @SuppressWarnings("deprecation") // using deprecated semconv @SpringBootTest( @@ -53,6 +64,50 @@ class AwsSqsTest { private static SQSRestServer sqs; @Autowired SqsTemplate sqsTemplate; + @Autowired MessageListenerContainerRegistry registry; + @Autowired ApplicationContext applicationContext; + + @BeforeEach + void waitForContainersAndClear() throws InterruptedException { + // Wait for the listener containers to start and resolve Queue URLs + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 10000) { + long count = + testing.spans().stream().filter(s -> s.getName().equals("Sqs.GetQueueUrl")).count(); + if (count >= 2) { + break; + } + Thread.sleep(100); + } + + // Warm up the templates so that the HTTP client is initialized (prevents Connection refused) + sqsTemplate.send("test-queue", "warmup"); + sqsTemplate.sendMany( + "test-batch-queue", singletonList(MessageBuilder.withPayload("warmup1").build())); + + // Wait for the warmup message to be processed so it doesn't pollute the test + startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 10000) { + long count = + testing.spans().stream().filter(s -> s.getName().equals("test-queue process")).count(); + long countBatch = + testing.spans().stream() + .filter(s -> s.getName().equals("test-batch-queue process")) + .count(); + long countDelete = + testing.spans().stream() + .filter(s -> s.getName().equals("Sqs.DeleteMessageBatch")) + .count(); + if (count >= 1 && countBatch >= 1 && countDelete >= 2) { + break; + } + Thread.sleep(100); + } + + testing.clearData(); + AwsSqsTestApplication.messageHandler = null; + AwsSqsTestApplication.batchMessageHandler = null; + } @BeforeAll static void setUp() { @@ -85,12 +140,12 @@ void sqsListener() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName("Sqs.GetQueueUrl") - .hasKind(SpanKind.CLIENT) + span.hasName("test-queue publish") + .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "GetQueueUrl"), + equalTo(RPC_METHOD, "SendMessage"), equalTo(RPC_SERVICE, "Sqs"), equalTo(HTTP_REQUEST_METHOD, POST), equalTo(HTTP_RESPONSE_STATUS_CODE, 200), @@ -101,14 +156,23 @@ void sqsListener() throws Exception { val -> val.startsWith( "http://localhost:" + AwsSqsTestApplication.sqsPort)), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank), + equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_DESTINATION_NAME, "test-queue"), + equalTo( + AWS_SQS_QUEUE_URL, + "http://localhost:" + + AwsSqsTestApplication.sqsPort + + "/000000000000/test-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), span -> - span.hasName("test-queue publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) + span.hasName("test-queue process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "SendMessage"), + equalTo(RPC_METHOD, "ReceiveMessage"), equalTo(RPC_SERVICE, "Sqs"), equalTo(HTTP_REQUEST_METHOD, POST), equalTo(HTTP_RESPONSE_STATUS_CODE, 200), @@ -121,21 +185,111 @@ void sqsListener() throws Exception { "http://localhost:" + AwsSqsTestApplication.sqsPort)), equalTo(MESSAGING_SYSTEM, AWS_SQS), satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank), - equalTo(MESSAGING_OPERATION, "publish"), - equalTo(MESSAGING_DESTINATION_NAME, "test-queue"), + equalTo(MESSAGING_OPERATION, "process"), + equalTo(MESSAGING_DESTINATION_NAME, "test-queue")), + span -> + span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2)), + span -> + span.hasName("Sqs.DeleteMessageBatch") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "DeleteMessageBatch"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), equalTo( AWS_SQS_QUEUE_URL, "http://localhost:" + AwsSqsTestApplication.sqsPort + "/000000000000/test-queue"), - satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))))); + } + + @Test + void sqsBatchListener() throws Exception { + registry.getContainerById("batchContainer").stop(); + + String messageContent1 = "hello"; + String messageContent2 = "hello2"; + List collectedMessages = new CopyOnWriteArrayList<>(); + CompletableFuture> messageFuture = new CompletableFuture<>(); + AwsSqsTestApplication.batchMessageHandler = + strings -> + testing.runWithSpan( + "callback", + () -> { + collectedMessages.addAll(strings); + if (collectedMessages.size() >= 2) { + messageFuture.complete(collectedMessages); + } + }); + + testing.runWithSpan( + "parent", + () -> + sqsTemplate.sendMany( + "test-batch-queue", + asList( + MessageBuilder.withPayload(messageContent1).build(), + MessageBuilder.withPayload(messageContent2).build()))); + + registry.getContainerById("batchContainer").start(); + + List result = messageFuture.get(10, SECONDS); + assertThat(result).containsExactlyInAnyOrder(messageContent1, messageContent2); + + AtomicReference producer = new AtomicReference<>(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> { + span.hasName("test-batch-queue publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "SendMessageBatch"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue"), + equalTo( + AWS_SQS_QUEUE_URL, + "http://localhost:" + + AwsSqsTestApplication.sqsPort + + "/000000000000/test-batch-queue"), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))); + producer.set(trace.getSpan(1)); + }), + trace -> + trace.hasSpansSatisfyingExactly( span -> - span.hasName("test-queue process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(2)) + span.hasName("Sqs.GetQueueUrl") + .hasKind(SpanKind.CLIENT) + .hasNoParent() .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(RPC_METHOD, "GetQueueUrl"), equalTo(RPC_SERVICE, "Sqs"), equalTo(HTTP_REQUEST_METHOD, POST), equalTo(HTTP_RESPONSE_STATUS_CODE, 200), @@ -146,16 +300,42 @@ void sqsListener() throws Exception { val -> val.startsWith( "http://localhost:" + AwsSqsTestApplication.sqsPort)), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class)))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("test-batch-queue process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinksSatisfying( + links -> { + assertThat(links).hasSize(2); + assertThat(links) + .satisfiesExactly( + l -> { + assertThat(l.getSpanContext().getTraceId()) + .isEqualTo(producer.get().getTraceId()); + assertThat(l.getSpanContext().getSpanId()) + .isEqualTo(producer.get().getSpanId()); + }, + l -> { + assertThat(l.getSpanContext().getTraceId()) + .isEqualTo(producer.get().getTraceId()); + assertThat(l.getSpanContext().getSpanId()) + .isEqualTo(producer.get().getSpanId()); + }); + }) + .hasAttributesSatisfyingExactly( equalTo(MESSAGING_SYSTEM, AWS_SQS), - satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank), equalTo(MESSAGING_OPERATION, "process"), - equalTo(MESSAGING_DESTINATION_NAME, "test-queue")), + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 2L), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue")), span -> - span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(3)), + span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)), span -> span.hasName("Sqs.DeleteMessageBatch") .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(2)) + .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_METHOD, "DeleteMessageBatch"), @@ -173,7 +353,7 @@ void sqsListener() throws Exception { AWS_SQS_QUEUE_URL, "http://localhost:" + AwsSqsTestApplication.sqsPort - + "/000000000000/test-queue"), + + "/000000000000/test-batch-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))))); } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java index 133d59ed6e3d..348d4219bde5 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java @@ -6,8 +6,12 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; import io.awspring.cloud.sqs.annotation.SqsListener; +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.ListenerMode; import io.awspring.cloud.sqs.operations.SqsTemplate; import java.net.URI; +import java.time.Duration; +import java.util.List; import java.util.function.Consumer; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -20,6 +24,7 @@ class AwsSqsTestApplication { static int sqsPort; static volatile Consumer messageHandler; + static volatile Consumer> batchMessageHandler; @Bean SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient) { @@ -36,10 +41,36 @@ SqsAsyncClient sqsAsyncClient() { .build(); } - @SqsListener("test-queue") + @Bean + SqsMessageListenerContainerFactory batchFactory(SqsAsyncClient sqsAsyncClient) { + return SqsMessageListenerContainerFactory.builder() + .configure( + options -> + options + .listenerMode(ListenerMode.BATCH) + .maxMessagesPerPoll(10) + .pollTimeout(Duration.ofSeconds(2))) + .sqsAsyncClient(sqsAsyncClient) + .build(); + } + + @Bean + SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory( + SqsAsyncClient sqsAsyncClient) { + return SqsMessageListenerContainerFactory.builder().sqsAsyncClient(sqsAsyncClient).build(); + } + + @SqsListener(value = "test-queue", factory = "defaultSqsListenerContainerFactory") void receiveStringMessage(String message) { if (messageHandler != null) { messageHandler.accept(message); } } + + @SqsListener(value = "test-batch-queue", factory = "batchFactory", id = "batchContainer") + void receiveBatchMessages(List messages) { + if (batchMessageHandler != null) { + batchMessageHandler.accept(messages); + } + } }