From 1a51d042720e820f75415a04f5000f43ad14ba2d Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 20 Jun 2026 04:10:23 +0530 Subject: [PATCH 01/16] feat(spring-cloud-aws): instrument onMessage(Collection>) for batch consumption --- ...MessageListenerAdapterInstrumentation.java | 22 +++++++++- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 40 +++++++++++++++++++ .../cloud/aws/v3_0/AwsSqsTestApplication.java | 34 +++++++++++++++- 3 files changed, 94 insertions(+), 2 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java index 870e4deb38d2..a86dd2576484 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java @@ -8,8 +8,10 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.Collection; import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; @@ -28,7 +30,9 @@ public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( named("onMessage").and(takesArgument(0, named("org.springframework.messaging.Message"))), getClass().getName() + "$OnMessageAdvice"); - // TODO: onMessage(Collection> messages) not instrumented + transformer.applyAdviceToMethod( + named("onMessage").and(takesArgument(0, named("java.util.Collection"))), + getClass().getName() + "$OnMessagesAdvice"); } @SuppressWarnings("unused") @@ -48,4 +52,20 @@ public static void methodExit( } } } + + @SuppressWarnings("unused") + public static class OnMessagesAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + @Nullable + public static Scope methodEnter(@Advice.Argument(0) Collection> messages) { + return SpringAwsUtil.handleBatch(messages); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) + public static void methodExit(@Advice.Enter @Nullable Scope scope) { + if (scope != null) { + scope.close(); + } + } + } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 67f86215e11a..78a1e1d22db5 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -23,6 +23,7 @@ import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -30,6 +31,7 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.pekko.http.scaladsl.Http; import org.assertj.core.api.AbstractStringAssert; @@ -41,6 +43,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.support.MessageBuilder; @SuppressWarnings("deprecation") // using deprecated semconv @SpringBootTest( @@ -176,4 +179,41 @@ void sqsListener() throws Exception { + "/000000000000/test-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))))); } + + @Test + void sqsBatchListener() throws Exception { + String messageContent = "hello"; + CompletableFuture> messageFuture = new CompletableFuture<>(); + AwsSqsTestApplication.batchMessageHandler = + strings -> testing.runWithSpan("callback", () -> messageFuture.complete(strings)); + + testing.runWithSpan( + "parent", + () -> + sqsTemplate.sendMany( + "test-batch-queue", + singletonList(MessageBuilder.withPayload(messageContent).build()))); + + List result = messageFuture.get(10, SECONDS); + assertThat(result).containsExactly(messageContent); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("Sqs.GetQueueUrl") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("test-batch-queue publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2)), + span -> + span.hasName("Sqs.DeleteMessageBatch") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(2)))); + } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java index 133d59ed6e3d..02bd0bdcf9ce 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java @@ -6,8 +6,12 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; import io.awspring.cloud.sqs.annotation.SqsListener; +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.ListenerMode; import io.awspring.cloud.sqs.operations.SqsTemplate; import java.net.URI; +import java.time.Duration; +import java.util.List; import java.util.function.Consumer; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -36,10 +40,38 @@ SqsAsyncClient sqsAsyncClient() { .build(); } - @SqsListener("test-queue") + @Bean + SqsMessageListenerContainerFactory batchFactory(SqsAsyncClient sqsAsyncClient) { + return SqsMessageListenerContainerFactory.builder() + .configure( + options -> + options + .listenerMode(ListenerMode.BATCH) + .maxMessagesPerPoll(10) + .pollTimeout(Duration.ofSeconds(2))) + .sqsAsyncClient(sqsAsyncClient) + .build(); + } + + @Bean + SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory( + SqsAsyncClient sqsAsyncClient) { + return SqsMessageListenerContainerFactory.builder().sqsAsyncClient(sqsAsyncClient).build(); + } + + @SqsListener(value = "test-queue", factory = "defaultSqsListenerContainerFactory") void receiveStringMessage(String message) { if (messageHandler != null) { messageHandler.accept(message); } } + + static volatile Consumer> batchMessageHandler; + + @SqsListener(value = "test-batch-queue", factory = "batchFactory") + void receiveBatchMessages(List messages) { + if (batchMessageHandler != null) { + batchMessageHandler.accept(messages); + } + } } From 4c1dc27a1df9cffaf2c5b533c3964016f400c885 Mon Sep 17 00:00:00 2001 From: aryainguz Date: Tue, 23 Jun 2026 23:03:51 +0530 Subject: [PATCH 02/16] test: add span attribute assertions to AwsSqsTest for improved validation --- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 61 ++++++++++++++++++- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 78a1e1d22db5..de207c12f640 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -204,16 +204,71 @@ void sqsBatchListener() throws Exception { span -> span.hasName("Sqs.GetQueueUrl") .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(0)), + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "GetQueueUrl"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), span -> span.hasName("test-batch-queue publish") .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)), + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "SendMessageBatch"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue"), + equalTo( + AWS_SQS_QUEUE_URL, + "http://localhost:" + + AwsSqsTestApplication.sqsPort + + "/000000000000/test-batch-queue"), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2)), span -> span.hasName("Sqs.DeleteMessageBatch") .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(2)))); + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "DeleteMessageBatch"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + equalTo( + AWS_SQS_QUEUE_URL, + "http://localhost:" + + AwsSqsTestApplication.sqsPort + + "/000000000000/test-batch-queue"), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))))); } } From 2056002c430b519f1b944aa91af7c1945519c29c Mon Sep 17 00:00:00 2001 From: aryainguz Date: Tue, 23 Jun 2026 23:33:50 +0530 Subject: [PATCH 03/16] fix: update Spring Cloud AWS SQS batch instrumentation to support full tracing and proper scope closing --- ...MessageListenerAdapterInstrumentation.java | 8 +++--- .../spring/cloud/aws/v3_0/SpringAwsUtil.java | 10 ++------ .../spring/cloud/aws/v3_0/AwsSqsTest.java | 25 +++++++++++++++++-- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java index a86dd2576484..d2bc89558dbe 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java @@ -57,14 +57,16 @@ public static void methodExit( public static class OnMessagesAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Nullable - public static Scope methodEnter(@Advice.Argument(0) Collection> messages) { + public static SpringAwsUtil.MessageScope methodEnter(@Advice.Argument(0) Collection> messages) { return SpringAwsUtil.handleBatch(messages); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) - public static void methodExit(@Advice.Enter @Nullable Scope scope) { + public static void methodExit( + @Advice.Enter @Nullable SpringAwsUtil.MessageScope scope, + @Advice.Thrown @Nullable Throwable throwable) { if (scope != null) { - scope.close(); + scope.close(throwable); } } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index fe02d827b412..ba9e82bd9f09 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -72,9 +72,8 @@ public static MessageScope handleMessage(Message message) { return tracingContext.trace(); } - // restore context from the first message of the batch @Nullable - public static Scope handleBatch(Collection> messages) { + public static MessageScope handleBatch(Collection> messages) { if (messages.isEmpty()) { return null; } @@ -83,12 +82,7 @@ public static Scope handleBatch(Collection> messages) { if (tracingContext == null) { return null; } - SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); - Context parentContext = tracingContext.receiveContext; - if (parentContext == null) { - parentContext = SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); - } - return parentContext.makeCurrent(); + return tracingContext.trace(); } public static class MessageScope { diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index de207c12f640..8c3d10ea0d5c 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -246,11 +246,32 @@ void sqsBatchListener() throws Exception { + "/000000000000/test-batch-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), span -> - span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2)), + span.hasName("test-batch-queue process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank), + equalTo(MESSAGING_OPERATION, "process"), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue")), + span -> + span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(3)), span -> span.hasName("Sqs.DeleteMessageBatch") .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(2)) + .hasParent(trace.getSpan(3)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_METHOD, "DeleteMessageBatch"), From 444336341cab82f0e3ffdcbd4ad118d9334ecdc2 Mon Sep 17 00:00:00 2001 From: aryainguz Date: Tue, 23 Jun 2026 23:57:49 +0530 Subject: [PATCH 04/16] refactor: use SpringAwsUtil.MessageScope for batch handling and update scope closure to include throwable status --- ...AcknowledgementExecutionContextInstrumentation.java | 10 ++++++---- ...MessagingMessageListenerAdapterInstrumentation.java | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java index 35c30606d029..8f824db05bba 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java @@ -7,7 +7,6 @@ import static net.bytebuddy.matcher.ElementMatchers.named; -import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Collection; @@ -34,14 +33,17 @@ public void transform(TypeTransformer transformer) { public static class ExecuteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Nullable - public static Scope methodEnter(@Advice.Argument(0) Collection> messages) { + public static SpringAwsUtil.MessageScope methodEnter( + @Advice.Argument(0) Collection> messages) { return SpringAwsUtil.handleBatch(messages); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) - public static void methodExit(@Advice.Enter @Nullable Scope scope) { + public static void methodExit( + @Advice.Enter @Nullable SpringAwsUtil.MessageScope scope, + @Advice.Thrown @Nullable Throwable throwable) { if (scope != null) { - scope.close(); + scope.close(throwable); } } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java index d2bc89558dbe..d13c049cd798 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java @@ -8,7 +8,6 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Collection; @@ -57,7 +56,8 @@ public static void methodExit( public static class OnMessagesAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Nullable - public static SpringAwsUtil.MessageScope methodEnter(@Advice.Argument(0) Collection> messages) { + public static SpringAwsUtil.MessageScope methodEnter( + @Advice.Argument(0) Collection> messages) { return SpringAwsUtil.handleBatch(messages); } From 109298d3114a603abf35e1b04a7e929a02c12aaf Mon Sep 17 00:00:00 2001 From: aryainguz Date: Wed, 24 Jun 2026 00:27:32 +0530 Subject: [PATCH 05/16] fix: refactor Spring Cloud AWS SQS batch context restoration and correct parent span assertion in tests --- ...dgementExecutionContextInstrumentation.java | 12 +++++------- .../spring/cloud/aws/v3_0/SpringAwsUtil.java | 18 ++++++++++++++++++ .../spring/cloud/aws/v3_0/AwsSqsTest.java | 2 +- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java index 8f824db05bba..afbd429a5207 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AcknowledgementExecutionContextInstrumentation.java @@ -7,6 +7,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Collection; @@ -33,17 +34,14 @@ public void transform(TypeTransformer transformer) { public static class ExecuteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Nullable - public static SpringAwsUtil.MessageScope methodEnter( - @Advice.Argument(0) Collection> messages) { - return SpringAwsUtil.handleBatch(messages); + public static Scope methodEnter(@Advice.Argument(0) Collection> messages) { + return SpringAwsUtil.restoreBatchContext(messages); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) - public static void methodExit( - @Advice.Enter @Nullable SpringAwsUtil.MessageScope scope, - @Advice.Thrown @Nullable Throwable throwable) { + public static void methodExit(@Advice.Enter @Nullable Scope scope) { if (scope != null) { - scope.close(throwable); + scope.close(); } } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index ba9e82bd9f09..98983b055174 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -85,6 +85,24 @@ public static MessageScope handleBatch(Collection> messages) { return tracingContext.trace(); } + @Nullable + public static Scope restoreBatchContext(Collection> messages) { + if (messages.isEmpty()) { + return null; + } + Message message = messages.iterator().next(); + TracingContext tracingContext = tracingContextField.get(message); + if (tracingContext == null) { + return null; + } + SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); + Context parentContext = tracingContext.receiveContext; + if (parentContext == null) { + parentContext = SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); + } + return parentContext.makeCurrent(); + } + public static class MessageScope { private final Instrumenter instrumenter; private final Context context; diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 8c3d10ea0d5c..f0412d1f15e8 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -271,7 +271,7 @@ void sqsBatchListener() throws Exception { span -> span.hasName("Sqs.DeleteMessageBatch") .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(3)) + .hasParent(trace.getSpan(2)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_METHOD, "DeleteMessageBatch"), From 9e6d71d960a3e98bce5d4d716b81c5501d7c6a76 Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 13:29:08 +0530 Subject: [PATCH 06/16] feat: enable span linking for SQS batch processing in Spring Cloud AWS instrumentation --- ...MessageListenerAdapterInstrumentation.java | 4 +- .../spring/cloud/aws/v3_0/SpringAwsUtil.java | 176 +++++++++++++++++- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 98 +++++----- .../cloud/aws/v3_0/AwsSqsTestApplication.java | 3 +- 4 files changed, 220 insertions(+), 61 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java index d13c049cd798..bf8fea81dffa 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java @@ -56,14 +56,14 @@ public static void methodExit( public static class OnMessagesAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Nullable - public static SpringAwsUtil.MessageScope methodEnter( + public static SpringAwsUtil.BatchMessageScope methodEnter( @Advice.Argument(0) Collection> messages) { return SpringAwsUtil.handleBatch(messages); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) public static void methodExit( - @Advice.Enter @Nullable SpringAwsUtil.MessageScope scope, + @Advice.Enter @Nullable SpringAwsUtil.BatchMessageScope scope, @Advice.Thrown @Nullable Throwable throwable) { if (scope != null) { scope.close(throwable); diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index 98983b055174..cde65610c728 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -5,9 +5,18 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; +import static java.util.Collections.emptyList; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.awssdk.v2_2.internal.Response; import io.opentelemetry.instrumentation.awssdk.v2_2.internal.SqsMessage; @@ -17,15 +26,131 @@ import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingExecutionInterceptor; import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingList; import java.util.Collection; +import java.util.List; import javax.annotation.Nullable; import org.springframework.messaging.Message; +import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; public class SpringAwsUtil { private static final ThreadLocal context = new ThreadLocal<>(); private static final VirtualField, TracingContext> tracingContextField = VirtualField.find(Message.class, TracingContext.class); + private static final MessagingAttributesGetter>, Void> + BATCH_ATTRIBUTES_GETTER = + new MessagingAttributesGetter>, Void>() { + @Override + public String getSystem(Collection> messages) { + return "aws_sqs"; + } + + @Override + public String getDestination(Collection> messages) { + if (!messages.isEmpty()) { + Message message = messages.iterator().next(); + TracingContext tracingContext = tracingContextField.get(message); + if (tracingContext != null) { + SdkRequest sdkRequest = + tracingContext.request.getAttribute( + TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE); + if (sdkRequest instanceof ReceiveMessageRequest) { + String queueUrl = ((ReceiveMessageRequest) sdkRequest).queueUrl(); + if (queueUrl != null) { + int i = queueUrl.lastIndexOf('/'); + if (i > 0) { + return queueUrl.substring(i + 1); + } + } + } + } + } + return null; + } + + @Nullable + @Override + public String getDestinationTemplate(Collection> messages) { + return null; + } + + @Override + public boolean isTemporaryDestination(Collection> messages) { + return false; + } + + @Override + public boolean isAnonymousDestination(Collection> messages) { + return false; + } + + @Nullable + @Override + public String getConversationId(Collection> messages) { + return null; + } + + @Nullable + @Override + public Long getMessageBodySize(Collection> messages) { + return null; + } + + @Nullable + @Override + public Long getMessageEnvelopeSize(Collection> messages) { + return null; + } + + @Nullable + @Override + public String getMessageId(Collection> messages, @Nullable Void unused) { + return null; + } + + @Nullable + @Override + public String getClientId(Collection> messages) { + return null; + } + + @Nullable + @Override + public Long getBatchMessageCount( + Collection> messages, @Nullable Void unused) { + return (long) messages.size(); + } + + @Override + public List getMessageHeader(Collection> messages, String name) { + return emptyList(); + } + }; + + private static final Instrumenter>, Void> BATCH_INSTRUMENTER = + Instrumenter.>, Void>builder( + GlobalOpenTelemetry.get(), + "io.opentelemetry.spring-cloud-aws-3.0", + MessagingSpanNameExtractor.create(BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS) + .build()) + .addSpanLinksExtractor( + (spanLinks, parentContext, messages) -> { + for (Message message : messages) { + TracingContext tracingContext = tracingContextField.get(message); + if (tracingContext != null) { + SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); + Context extracted = + SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); + spanLinks.addLink(Span.fromContext(extracted).getSpanContext()); + } + } + }) + .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + // put the TracingList into thread local, so we can use it in attachTracingState method public static void initialize(Collection messages) { if (messages instanceof TracingList tracingList) { @@ -73,16 +198,34 @@ public static MessageScope handleMessage(Message message) { } @Nullable - public static MessageScope handleBatch(Collection> messages) { + public static BatchMessageScope handleBatch(Collection> messages) { if (messages.isEmpty()) { return null; } - Message message = messages.iterator().next(); - TracingContext tracingContext = tracingContextField.get(message); + + // Check if the batch has any tracing context before starting + Message firstMessage = messages.iterator().next(); + TracingContext tracingContext = tracingContextField.get(firstMessage); if (tracingContext == null) { return null; } - return tracingContext.trace(); + + // Start a separate trace (NO parent from the queue) using Context.current() + // The instrumenter adds span links to all messages. + Context parentContext = Context.current(); + if (!BATCH_INSTRUMENTER.shouldStart(parentContext, messages)) { + return null; + } + Context context = BATCH_INSTRUMENTER.start(parentContext, messages); + + for (Message msg : messages) { + TracingContext tc = tracingContextField.get(msg); + if (tc != null) { + tc.batchProcessContext = context; + } + } + + return new BatchMessageScope(context, messages); } @Nullable @@ -92,15 +235,27 @@ public static Scope restoreBatchContext(Collection> messages) { } Message message = messages.iterator().next(); TracingContext tracingContext = tracingContextField.get(message); - if (tracingContext == null) { + if (tracingContext == null || tracingContext.batchProcessContext == null) { return null; } - SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); - Context parentContext = tracingContext.receiveContext; - if (parentContext == null) { - parentContext = SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); + return tracingContext.batchProcessContext.makeCurrent(); + } + + public static class BatchMessageScope { + private final Context context; + private final Collection> request; + private final Scope scope; + + private BatchMessageScope(Context context, Collection> request) { + this.context = context; + this.request = request; + this.scope = context.makeCurrent(); + } + + public void close(@Nullable Throwable throwable) { + scope.close(); + BATCH_INSTRUMENTER.end(context, request, null, throwable); } - return parentContext.makeCurrent(); } public static class MessageScope { @@ -135,6 +290,7 @@ private static class TracingContext { private final TracingExecutionInterceptor config; @Nullable private final Context receiveContext; private final software.amazon.awssdk.services.sqs.model.Message sqsMessage; + @Nullable Context batchProcessContext; private TracingContext( TracingList tracingList, software.amazon.awssdk.services.sqs.model.Message sqsMessage) { diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index f0412d1f15e8..2b5e8c2ac2df 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.links; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; @@ -23,7 +24,7 @@ import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -182,7 +183,8 @@ void sqsListener() throws Exception { @Test void sqsBatchListener() throws Exception { - String messageContent = "hello"; + String messageContent1 = "hello"; + String messageContent2 = "hello2"; CompletableFuture> messageFuture = new CompletableFuture<>(); AwsSqsTestApplication.batchMessageHandler = strings -> testing.runWithSpan("callback", () -> messageFuture.complete(strings)); @@ -192,10 +194,15 @@ void sqsBatchListener() throws Exception { () -> sqsTemplate.sendMany( "test-batch-queue", - singletonList(MessageBuilder.withPayload(messageContent).build()))); + asList( + MessageBuilder.withPayload(messageContent1).build(), + MessageBuilder.withPayload(messageContent2).build()))); List result = messageFuture.get(10, SECONDS); - assertThat(result).containsExactly(messageContent); + assertThat(result).containsExactlyInAnyOrder(messageContent1, messageContent2); + + java.util.concurrent.atomic.AtomicReference producer = + new java.util.concurrent.atomic.AtomicReference<>(); testing.waitAndAssertTraces( trace -> @@ -219,59 +226,56 @@ void sqsBatchListener() throws Exception { val.startsWith( "http://localhost:" + AwsSqsTestApplication.sqsPort)), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), - span -> - span.hasName("test-batch-queue publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "SendMessageBatch"), - equalTo(RPC_SERVICE, "Sqs"), - equalTo(HTTP_REQUEST_METHOD, POST), - equalTo(HTTP_RESPONSE_STATUS_CODE, 200), - equalTo(SERVER_ADDRESS, "localhost"), - equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), - satisfies( - URL_FULL, - val -> - val.startsWith( - "http://localhost:" + AwsSqsTestApplication.sqsPort)), - equalTo(MESSAGING_SYSTEM, AWS_SQS), - equalTo(MESSAGING_OPERATION, "publish"), - equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue"), - equalTo( - AWS_SQS_QUEUE_URL, - "http://localhost:" - + AwsSqsTestApplication.sqsPort - + "/000000000000/test-batch-queue"), - satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), + span -> { + span.hasName("test-batch-queue publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "SendMessageBatch"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue"), + equalTo( + AWS_SQS_QUEUE_URL, + "http://localhost:" + + AwsSqsTestApplication.sqsPort + + "/000000000000/test-batch-queue"), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))); + producer.set(trace.getSpan(2)); + }), + trace -> + trace.hasSpansSatisfyingExactly( span -> span.hasName("test-batch-queue process") .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(2)) + .hasNoParent() + .hasLinksSatisfying( + links(producer.get().getSpanContext(), producer.get().getSpanContext())) .hasAttributesSatisfyingExactly( - equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "ReceiveMessage"), - equalTo(RPC_SERVICE, "Sqs"), - equalTo(HTTP_REQUEST_METHOD, POST), - equalTo(HTTP_RESPONSE_STATUS_CODE, 200), - equalTo(SERVER_ADDRESS, "localhost"), - equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), - satisfies( - URL_FULL, - val -> - val.startsWith( - "http://localhost:" + AwsSqsTestApplication.sqsPort)), equalTo(MESSAGING_SYSTEM, AWS_SQS), - satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank), equalTo(MESSAGING_OPERATION, "process"), - equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue")), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue"), + equalTo( + io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes + .MESSAGING_BATCH_MESSAGE_COUNT, + 2L)), span -> - span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(3)), + span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)), span -> span.hasName("Sqs.DeleteMessageBatch") .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(2)) + .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_METHOD, "DeleteMessageBatch"), diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java index 02bd0bdcf9ce..8145265bbee5 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java @@ -24,6 +24,7 @@ class AwsSqsTestApplication { static int sqsPort; static volatile Consumer messageHandler; + static volatile Consumer> batchMessageHandler; @Bean SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient) { @@ -66,8 +67,6 @@ void receiveStringMessage(String message) { } } - static volatile Consumer> batchMessageHandler; - @SqsListener(value = "test-batch-queue", factory = "batchFactory") void receiveBatchMessages(List messages) { if (batchMessageHandler != null) { From ff8c9872ed45530265ceb5c437e0e33a34c5d918 Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 13:46:56 +0530 Subject: [PATCH 07/16] fix: change SDK_REQUEST_ATTRIBUTE visibility to public in TracingExecutionInterceptor --- .../awssdk/v2_2/internal/TracingExecutionInterceptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java index f4f4295a503f..2a8604d840da 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/internal/TracingExecutionInterceptor.java @@ -66,7 +66,7 @@ public final class TracingExecutionInterceptor implements ExecutionInterceptor { new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".AwsSdkRequest"); static final ExecutionAttribute SDK_HTTP_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkHttpRequest"); - static final ExecutionAttribute SDK_REQUEST_ATTRIBUTE = + public static final ExecutionAttribute SDK_REQUEST_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkRequest"); private static final ExecutionAttribute REQUEST_FINISHER_ATTRIBUTE = new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestFinisher"); From 50bef8c883a3ee10c06d95fd95bf3b11345212cc Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 13:56:04 +0530 Subject: [PATCH 08/16] refactor: use pattern matching for ReceiveMessageRequest and add @Nullable annotation to getDestination --- .../instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index cde65610c728..e5e4a2906c1b 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -46,6 +46,7 @@ public String getSystem(Collection> messages) { return "aws_sqs"; } + @Nullable @Override public String getDestination(Collection> messages) { if (!messages.isEmpty()) { @@ -55,8 +56,8 @@ public String getDestination(Collection> messages) { SdkRequest sdkRequest = tracingContext.request.getAttribute( TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE); - if (sdkRequest instanceof ReceiveMessageRequest) { - String queueUrl = ((ReceiveMessageRequest) sdkRequest).queueUrl(); + if (sdkRequest instanceof ReceiveMessageRequest receiveMessageRequest) { + String queueUrl = receiveMessageRequest.queueUrl(); if (queueUrl != null) { int i = queueUrl.lastIndexOf('/'); if (i > 0) { From 08922c7919e5e569c00083417fc89c01fdf6288e Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 14:07:36 +0530 Subject: [PATCH 09/16] test: refactor SQS span link assertions to use explicit AssertJ lambdas --- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 2b5e8c2ac2df..1eff82cf33be 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; -import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.links; + import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; @@ -261,7 +261,13 @@ void sqsBatchListener() throws Exception { .hasKind(SpanKind.CONSUMER) .hasNoParent() .hasLinksSatisfying( - links(producer.get().getSpanContext(), producer.get().getSpanContext())) + links -> { + assertThat(links).hasSize(2); + assertThat(links.get(0).getSpanContext()) + .isEqualTo(producer.get().getSpanContext()); + assertThat(links.get(1).getSpanContext()) + .isEqualTo(producer.get().getSpanContext()); + }) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_OPERATION, "process"), From d2836090d568e0e8c44ef148b7e03ef29a15d40a Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 14:16:05 +0530 Subject: [PATCH 10/16] test: update AwsSqsTest link assertions to use satisfiesExactly for improved validation --- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 1eff82cf33be..f8ddd5a5718a 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -263,10 +263,14 @@ void sqsBatchListener() throws Exception { .hasLinksSatisfying( links -> { assertThat(links).hasSize(2); - assertThat(links.get(0).getSpanContext()) - .isEqualTo(producer.get().getSpanContext()); - assertThat(links.get(1).getSpanContext()) - .isEqualTo(producer.get().getSpanContext()); + assertThat(links) + .satisfiesExactly( + l -> + assertThat(l.getSpanContext()) + .isEqualTo(producer.get().getSpanContext()), + l -> + assertThat(l.getSpanContext()) + .isEqualTo(producer.get().getSpanContext())); }) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_SYSTEM, AWS_SQS), From 9d9db2d4c782724fc9729a2beef6478c162beb8e Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 15:11:35 +0530 Subject: [PATCH 11/16] test: stabilize SQS batch tests by adding container lifecycle management and manual warmup --- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 85 +++++++++---------- .../cloud/aws/v3_0/AwsSqsTestApplication.java | 2 +- 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index f8ddd5a5718a..336b70c174ed 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -5,7 +5,6 @@ package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; - import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; @@ -28,6 +27,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; @@ -40,6 +40,7 @@ import org.elasticmq.rest.sqs.SQSRestServerBuilder; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.beans.factory.annotation.Autowired; @@ -57,6 +58,22 @@ class AwsSqsTest { private static SQSRestServer sqs; @Autowired SqsTemplate sqsTemplate; + @Autowired MessageListenerContainerRegistry registry; + + @BeforeEach + void waitForContainersAndClear() throws InterruptedException { + // Wait for containers to start and fetch Queue URLs + Thread.sleep(2000); + + // Warm up the templates so that GetQueueUrl is cached in them as well + sqsTemplate.send("test-queue", "warmup"); + sqsTemplate.sendMany("test-batch-queue", java.util.Arrays.asList( + MessageBuilder.withPayload("warmup1").build(), + MessageBuilder.withPayload("warmup2").build())); + + Thread.sleep(1000); + testing.clearData(); + } @BeforeAll static void setUp() { @@ -88,24 +105,6 @@ void sqsListener() throws Exception { trace -> trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName("Sqs.GetQueueUrl") - .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "GetQueueUrl"), - equalTo(RPC_SERVICE, "Sqs"), - equalTo(HTTP_REQUEST_METHOD, POST), - equalTo(HTTP_RESPONSE_STATUS_CODE, 200), - equalTo(SERVER_ADDRESS, "localhost"), - equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), - satisfies( - URL_FULL, - val -> - val.startsWith( - "http://localhost:" + AwsSqsTestApplication.sqsPort)), - satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), span -> span.hasName("test-queue publish") .hasKind(SpanKind.PRODUCER) @@ -133,10 +132,11 @@ void sqsListener() throws Exception { + AwsSqsTestApplication.sqsPort + "/000000000000/test-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), + span -> span.hasName("test-queue process") .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(2)) + .hasParent(trace.getSpan(1)) .hasAttributesSatisfyingExactly( equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_METHOD, "ReceiveMessage"), @@ -155,7 +155,7 @@ void sqsListener() throws Exception { equalTo(MESSAGING_OPERATION, "process"), equalTo(MESSAGING_DESTINATION_NAME, "test-queue")), span -> - span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(3)), + span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2)), span -> span.hasName("Sqs.DeleteMessageBatch") .hasKind(SpanKind.CLIENT) @@ -183,12 +183,14 @@ void sqsListener() throws Exception { @Test void sqsBatchListener() throws Exception { + registry.getContainerById("batchContainer").stop(); String messageContent1 = "hello"; String messageContent2 = "hello2"; CompletableFuture> messageFuture = new CompletableFuture<>(); AwsSqsTestApplication.batchMessageHandler = strings -> testing.runWithSpan("callback", () -> messageFuture.complete(strings)); + registry.getContainerById("batchContainer").start(); testing.runWithSpan( "parent", () -> @@ -198,6 +200,9 @@ void sqsBatchListener() throws Exception { MessageBuilder.withPayload(messageContent1).build(), MessageBuilder.withPayload(messageContent2).build()))); + Thread.sleep(1000); + registry.getContainerById("batchContainer").start(); + List result = messageFuture.get(10, SECONDS); assertThat(result).containsExactlyInAnyOrder(messageContent1, messageContent2); @@ -208,24 +213,6 @@ void sqsBatchListener() throws Exception { trace -> trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName("Sqs.GetQueueUrl") - .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_METHOD, "GetQueueUrl"), - equalTo(RPC_SERVICE, "Sqs"), - equalTo(HTTP_REQUEST_METHOD, POST), - equalTo(HTTP_RESPONSE_STATUS_CODE, 200), - equalTo(SERVER_ADDRESS, "localhost"), - equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), - satisfies( - URL_FULL, - val -> - val.startsWith( - "http://localhost:" + AwsSqsTestApplication.sqsPort)), - satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), span -> { span.hasName("test-batch-queue publish") .hasKind(SpanKind.PRODUCER) @@ -252,7 +239,7 @@ void sqsBatchListener() throws Exception { + AwsSqsTestApplication.sqsPort + "/000000000000/test-batch-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))); - producer.set(trace.getSpan(2)); + producer.set(trace.getSpan(1)); }), trace -> trace.hasSpansSatisfyingExactly( @@ -265,12 +252,18 @@ void sqsBatchListener() throws Exception { assertThat(links).hasSize(2); assertThat(links) .satisfiesExactly( - l -> - assertThat(l.getSpanContext()) - .isEqualTo(producer.get().getSpanContext()), - l -> - assertThat(l.getSpanContext()) - .isEqualTo(producer.get().getSpanContext())); + l -> { + assertThat(l.getSpanContext().getTraceId()) + .isEqualTo(producer.get().getSpanContext().getTraceId()); + assertThat(l.getSpanContext().getSpanId()) + .isEqualTo(producer.get().getSpanContext().getSpanId()); + }, + l -> { + assertThat(l.getSpanContext().getTraceId()) + .isEqualTo(producer.get().getSpanContext().getTraceId()); + assertThat(l.getSpanContext().getSpanId()) + .isEqualTo(producer.get().getSpanContext().getSpanId()); + }); }) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_SYSTEM, AWS_SQS), diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java index 8145265bbee5..348d4219bde5 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java @@ -67,7 +67,7 @@ void receiveStringMessage(String message) { } } - @SqsListener(value = "test-batch-queue", factory = "batchFactory") + @SqsListener(value = "test-batch-queue", factory = "batchFactory", id = "batchContainer") void receiveBatchMessages(List messages) { if (batchMessageHandler != null) { batchMessageHandler.accept(messages); From ca3a57e355e3b27febfba2fa608f725d88b2bebd Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 15:11:47 +0530 Subject: [PATCH 12/16] fix: store batch process context in MessageScope to prevent loss during SQS message processing --- .../instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index e5e4a2906c1b..e3e5fb41c669 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -315,6 +315,7 @@ MessageScope trace() { return null; } Context context = instrumenter.start(parentContext, processRequest); + this.batchProcessContext = context; return new MessageScope(instrumenter, context, processRequest, response); } } From 89ed8bd3c11db5a3bb8a17f23ec06dc1d3f93bec Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 16:59:58 +0530 Subject: [PATCH 13/16] fix: remove redundant batch context assignment and improve reliability of SQS batch listener tests --- .../spring/cloud/aws/v3_0/SpringAwsUtil.java | 1 - .../spring/cloud/aws/v3_0/AwsSqsTest.java | 60 +++++++++++++------ 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index e3e5fb41c669..e5e4a2906c1b 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -315,7 +315,6 @@ MessageScope trace() { return null; } Context context = instrumenter.start(parentContext, processRequest); - this.batchProcessContext = context; return new MessageScope(instrumenter, context, processRequest, response); } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 336b70c174ed..2d8e57021b5f 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -59,20 +59,41 @@ class AwsSqsTest { @Autowired SqsTemplate sqsTemplate; @Autowired MessageListenerContainerRegistry registry; + @Autowired org.springframework.context.ApplicationContext applicationContext; @BeforeEach void waitForContainersAndClear() throws InterruptedException { - // Wait for containers to start and fetch Queue URLs - Thread.sleep(2000); + // Wait for the listener containers to start and resolve Queue URLs + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 10000) { + long count = testing.spans().stream().filter(s -> s.getName().equals("Sqs.GetQueueUrl")).count(); + if (count >= 2) { + break; + } + Thread.sleep(100); + } - // Warm up the templates so that GetQueueUrl is cached in them as well + // Warm up the templates so that the HTTP client is initialized (prevents Connection refused) sqsTemplate.send("test-queue", "warmup"); - sqsTemplate.sendMany("test-batch-queue", java.util.Arrays.asList( - MessageBuilder.withPayload("warmup1").build(), - MessageBuilder.withPayload("warmup2").build())); - - Thread.sleep(1000); + sqsTemplate.sendMany( + "test-batch-queue", + java.util.Collections.singletonList( + org.springframework.messaging.support.MessageBuilder.withPayload("warmup1").build())); + + // Wait for the warmup message to be processed so it doesn't pollute the test + startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 10000) { + long count = testing.spans().stream().filter(s -> s.getName().equals("test-queue process")).count(); + long countBatch = testing.spans().stream().filter(s -> s.getName().equals("test-batch-queue process")).count(); + if (count >= 1 && countBatch >= 1) { + break; + } + Thread.sleep(100); + } + testing.clearData(); + AwsSqsTestApplication.messageHandler = null; + AwsSqsTestApplication.batchMessageHandler = null; } @BeforeAll @@ -184,13 +205,19 @@ void sqsListener() throws Exception { @Test void sqsBatchListener() throws Exception { registry.getContainerById("batchContainer").stop(); + String messageContent1 = "hello"; String messageContent2 = "hello2"; + java.util.List collectedMessages = new java.util.concurrent.CopyOnWriteArrayList<>(); CompletableFuture> messageFuture = new CompletableFuture<>(); AwsSqsTestApplication.batchMessageHandler = - strings -> testing.runWithSpan("callback", () -> messageFuture.complete(strings)); + strings -> testing.runWithSpan("callback", () -> { + collectedMessages.addAll(strings); + if (collectedMessages.size() >= 2) { + messageFuture.complete(collectedMessages); + } + }); - registry.getContainerById("batchContainer").start(); testing.runWithSpan( "parent", () -> @@ -200,7 +227,6 @@ void sqsBatchListener() throws Exception { MessageBuilder.withPayload(messageContent1).build(), MessageBuilder.withPayload(messageContent2).build()))); - Thread.sleep(1000); registry.getContainerById("batchContainer").start(); List result = messageFuture.get(10, SECONDS); @@ -254,25 +280,25 @@ void sqsBatchListener() throws Exception { .satisfiesExactly( l -> { assertThat(l.getSpanContext().getTraceId()) - .isEqualTo(producer.get().getSpanContext().getTraceId()); + .isEqualTo(producer.get().getTraceId()); assertThat(l.getSpanContext().getSpanId()) - .isEqualTo(producer.get().getSpanContext().getSpanId()); + .isEqualTo(producer.get().getSpanId()); }, l -> { assertThat(l.getSpanContext().getTraceId()) - .isEqualTo(producer.get().getSpanContext().getTraceId()); + .isEqualTo(producer.get().getTraceId()); assertThat(l.getSpanContext().getSpanId()) - .isEqualTo(producer.get().getSpanContext().getSpanId()); + .isEqualTo(producer.get().getSpanId()); }); }) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_OPERATION, "process"), - equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue"), equalTo( io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes .MESSAGING_BATCH_MESSAGE_COUNT, - 2L)), + 2L), + equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue")), span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)), span -> From af530ccf57a59c9c533046864bfae38aaa2f63ad Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 20:11:04 +0530 Subject: [PATCH 14/16] refactor: simplify ApplicationContext import in AwsSqsTest --- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 2d8e57021b5f..1893c2ebe743 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -45,7 +45,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContext; import org.springframework.messaging.support.MessageBuilder; +import java.util.Collections; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; @SuppressWarnings("deprecation") // using deprecated semconv @SpringBootTest( @@ -59,7 +65,7 @@ class AwsSqsTest { @Autowired SqsTemplate sqsTemplate; @Autowired MessageListenerContainerRegistry registry; - @Autowired org.springframework.context.ApplicationContext applicationContext; + @Autowired ApplicationContext applicationContext; @BeforeEach void waitForContainersAndClear() throws InterruptedException { @@ -77,8 +83,8 @@ void waitForContainersAndClear() throws InterruptedException { sqsTemplate.send("test-queue", "warmup"); sqsTemplate.sendMany( "test-batch-queue", - java.util.Collections.singletonList( - org.springframework.messaging.support.MessageBuilder.withPayload("warmup1").build())); + Collections.singletonList( + MessageBuilder.withPayload("warmup1").build())); // Wait for the warmup message to be processed so it doesn't pollute the test startTime = System.currentTimeMillis(); @@ -208,7 +214,7 @@ void sqsBatchListener() throws Exception { String messageContent1 = "hello"; String messageContent2 = "hello2"; - java.util.List collectedMessages = new java.util.concurrent.CopyOnWriteArrayList<>(); + List collectedMessages = new CopyOnWriteArrayList<>(); CompletableFuture> messageFuture = new CompletableFuture<>(); AwsSqsTestApplication.batchMessageHandler = strings -> testing.runWithSpan("callback", () -> { @@ -232,8 +238,8 @@ void sqsBatchListener() throws Exception { List result = messageFuture.get(10, SECONDS); assertThat(result).containsExactlyInAnyOrder(messageContent1, messageContent2); - java.util.concurrent.atomic.AtomicReference producer = - new java.util.concurrent.atomic.AtomicReference<>(); + AtomicReference producer = + new AtomicReference<>(); testing.waitAndAssertTraces( trace -> @@ -295,7 +301,7 @@ void sqsBatchListener() throws Exception { equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_OPERATION, "process"), equalTo( - io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes + MessagingIncubatingAttributes .MESSAGING_BATCH_MESSAGE_COUNT, 2L), equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue")), From 861a78069a7445ea826cd9c5fd609efddd2a19da Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 20:24:24 +0530 Subject: [PATCH 15/16] test: refactor imports and formatting in AwsSqsTest to use MESSAGING_BATCH_MESSAGE_COUNT attribute --- .../spring/cloud/aws/v3_0/AwsSqsTest.java | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 1893c2ebe743..894ab972b8d2 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -15,6 +15,7 @@ import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_SQS_QUEUE_URL; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; @@ -24,6 +25,7 @@ import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -32,8 +34,11 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pekko.http.scaladsl.Http; import org.assertj.core.api.AbstractStringAssert; import org.elasticmq.rest.sqs.SQSRestServer; @@ -47,11 +52,6 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ApplicationContext; import org.springframework.messaging.support.MessageBuilder; -import java.util.Collections; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; @SuppressWarnings("deprecation") // using deprecated semconv @SpringBootTest( @@ -72,25 +72,28 @@ void waitForContainersAndClear() throws InterruptedException { // Wait for the listener containers to start and resolve Queue URLs long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 10000) { - long count = testing.spans().stream().filter(s -> s.getName().equals("Sqs.GetQueueUrl")).count(); + long count = + testing.spans().stream().filter(s -> s.getName().equals("Sqs.GetQueueUrl")).count(); if (count >= 2) { break; } Thread.sleep(100); } - + // Warm up the templates so that the HTTP client is initialized (prevents Connection refused) sqsTemplate.send("test-queue", "warmup"); sqsTemplate.sendMany( - "test-batch-queue", - Collections.singletonList( - MessageBuilder.withPayload("warmup1").build())); - + "test-batch-queue", singletonList(MessageBuilder.withPayload("warmup1").build())); + // Wait for the warmup message to be processed so it doesn't pollute the test startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 10000) { - long count = testing.spans().stream().filter(s -> s.getName().equals("test-queue process")).count(); - long countBatch = testing.spans().stream().filter(s -> s.getName().equals("test-batch-queue process")).count(); + long count = + testing.spans().stream().filter(s -> s.getName().equals("test-queue process")).count(); + long countBatch = + testing.spans().stream() + .filter(s -> s.getName().equals("test-batch-queue process")) + .count(); if (count >= 1 && countBatch >= 1) { break; } @@ -159,7 +162,6 @@ void sqsListener() throws Exception { + AwsSqsTestApplication.sqsPort + "/000000000000/test-queue"), satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))), - span -> span.hasName("test-queue process") .hasKind(SpanKind.CONSUMER) @@ -211,18 +213,21 @@ void sqsListener() throws Exception { @Test void sqsBatchListener() throws Exception { registry.getContainerById("batchContainer").stop(); - + String messageContent1 = "hello"; String messageContent2 = "hello2"; List collectedMessages = new CopyOnWriteArrayList<>(); CompletableFuture> messageFuture = new CompletableFuture<>(); AwsSqsTestApplication.batchMessageHandler = - strings -> testing.runWithSpan("callback", () -> { - collectedMessages.addAll(strings); - if (collectedMessages.size() >= 2) { - messageFuture.complete(collectedMessages); - } - }); + strings -> + testing.runWithSpan( + "callback", + () -> { + collectedMessages.addAll(strings); + if (collectedMessages.size() >= 2) { + messageFuture.complete(collectedMessages); + } + }); testing.runWithSpan( "parent", @@ -238,8 +243,7 @@ void sqsBatchListener() throws Exception { List result = messageFuture.get(10, SECONDS); assertThat(result).containsExactlyInAnyOrder(messageContent1, messageContent2); - AtomicReference producer = - new AtomicReference<>(); + AtomicReference producer = new AtomicReference<>(); testing.waitAndAssertTraces( trace -> @@ -285,25 +289,22 @@ void sqsBatchListener() throws Exception { assertThat(links) .satisfiesExactly( l -> { - assertThat(l.getSpanContext().getTraceId()) - .isEqualTo(producer.get().getTraceId()); - assertThat(l.getSpanContext().getSpanId()) - .isEqualTo(producer.get().getSpanId()); + assertThat(l.getSpanContext().getTraceId()) + .isEqualTo(producer.get().getTraceId()); + assertThat(l.getSpanContext().getSpanId()) + .isEqualTo(producer.get().getSpanId()); }, l -> { - assertThat(l.getSpanContext().getTraceId()) - .isEqualTo(producer.get().getTraceId()); - assertThat(l.getSpanContext().getSpanId()) - .isEqualTo(producer.get().getSpanId()); + assertThat(l.getSpanContext().getTraceId()) + .isEqualTo(producer.get().getTraceId()); + assertThat(l.getSpanContext().getSpanId()) + .isEqualTo(producer.get().getSpanId()); }); }) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_OPERATION, "process"), - equalTo( - MessagingIncubatingAttributes - .MESSAGING_BATCH_MESSAGE_COUNT, - 2L), + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 2L), equalTo(MESSAGING_DESTINATION_NAME, "test-batch-queue")), span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)), From 5c649caba3853bfe46ad01a56d00d131bd88f6c1 Mon Sep 17 00:00:00 2001 From: aryainguz Date: Sat, 27 Jun 2026 20:55:32 +0530 Subject: [PATCH 16/16] test: update SQS test assertions and track batch process context in SpringAwsUtil --- .../spring/cloud/aws/v3_0/SpringAwsUtil.java | 1 + .../spring/cloud/aws/v3_0/AwsSqsTest.java | 26 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java index e5e4a2906c1b..e3e5fb41c669 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/SpringAwsUtil.java @@ -315,6 +315,7 @@ MessageScope trace() { return null; } Context context = instrumenter.start(parentContext, processRequest); + this.batchProcessContext = context; return new MessageScope(instrumenter, context, processRequest, response); } } diff --git a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java index 894ab972b8d2..19d766e273a4 100644 --- a/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java +++ b/instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java @@ -94,7 +94,11 @@ void waitForContainersAndClear() throws InterruptedException { testing.spans().stream() .filter(s -> s.getName().equals("test-batch-queue process")) .count(); - if (count >= 1 && countBatch >= 1) { + long countDelete = + testing.spans().stream() + .filter(s -> s.getName().equals("Sqs.DeleteMessageBatch")) + .count(); + if (count >= 1 && countBatch >= 1 && countDelete >= 2) { break; } Thread.sleep(100); @@ -277,6 +281,26 @@ void sqsBatchListener() throws Exception { satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class))); producer.set(trace.getSpan(1)); }), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("Sqs.GetQueueUrl") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_METHOD, "GetQueueUrl"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(HTTP_REQUEST_METHOD, POST), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, AwsSqsTestApplication.sqsPort), + satisfies( + URL_FULL, + val -> + val.startsWith( + "http://localhost:" + AwsSqsTestApplication.sqsPort)), + satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class)))), trace -> trace.hasSpansSatisfyingExactly( span ->