Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1a51d04
feat(spring-cloud-aws): instrument onMessage(Collection<Message<T>>) …
Aryainguz Jun 19, 2026
1edd8df
Merge branch 'main' into fix-aws-sqs-batch
Aryainguz Jun 20, 2026
82049e9
Merge branch 'main' into fix-aws-sqs-batch
Aryainguz Jun 23, 2026
4c1dc27
test: add span attribute assertions to AwsSqsTest for improved valida…
Aryainguz Jun 23, 2026
2056002
fix: update Spring Cloud AWS SQS batch instrumentation to support ful…
Aryainguz Jun 23, 2026
4443363
refactor: use SpringAwsUtil.MessageScope for batch handling and updat…
Aryainguz Jun 23, 2026
109298d
fix: refactor Spring Cloud AWS SQS batch context restoration and corr…
Aryainguz Jun 23, 2026
9e6d71d
feat: enable span linking for SQS batch processing in Spring Cloud AW…
Aryainguz Jun 27, 2026
799426b
Merge branch 'main' into fix-aws-sqs-batch
Aryainguz Jun 27, 2026
ff8c987
fix: change SDK_REQUEST_ATTRIBUTE visibility to public in TracingExec…
Aryainguz Jun 27, 2026
50bef8c
refactor: use pattern matching for ReceiveMessageRequest and add @Nul…
Aryainguz Jun 27, 2026
08922c7
test: refactor SQS span link assertions to use explicit AssertJ lambdas
Aryainguz Jun 27, 2026
d283609
test: update AwsSqsTest link assertions to use satisfiesExactly for i…
Aryainguz Jun 27, 2026
9d9db2d
test: stabilize SQS batch tests by adding container lifecycle managem…
Aryainguz Jun 27, 2026
ca3a57e
fix: store batch process context in MessageScope to prevent loss duri…
Aryainguz Jun 27, 2026
89ed8bd
fix: remove redundant batch context assignment and improve reliabilit…
Aryainguz Jun 27, 2026
af530cc
refactor: simplify ApplicationContext import in AwsSqsTest
Aryainguz Jun 27, 2026
861a780
test: refactor imports and formatting in AwsSqsTest to use MESSAGING_…
Aryainguz Jun 27, 2026
5c649ca
test: update SQS test assertions and track batch process context in S…
Aryainguz Jun 27, 2026
f98f6e9
Merge branch 'main' into fix-aws-sqs-batch
Aryainguz Jun 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class TracingExecutionInterceptor implements ExecutionInterceptor {
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".AwsSdkRequest");
static final ExecutionAttribute<SdkHttpRequest> SDK_HTTP_REQUEST_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkHttpRequest");
static final ExecutionAttribute<SdkRequest> SDK_REQUEST_ATTRIBUTE =
public static final ExecutionAttribute<SdkRequest> SDK_REQUEST_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".SdkRequest");
private static final ExecutionAttribute<RequestSpanFinisher> REQUEST_FINISHER_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestFinisher");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static class ExecuteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
@Nullable
public static Scope methodEnter(@Advice.Argument(0) Collection<Message<?>> messages) {
return SpringAwsUtil.handleBatch(messages);
return SpringAwsUtil.restoreBatchContext(messages);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Collection;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
Expand All @@ -28,7 +29,9 @@ public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("onMessage").and(takesArgument(0, named("org.springframework.messaging.Message"))),
getClass().getName() + "$OnMessageAdvice");
// TODO: onMessage(Collection<Message<T>> messages) not instrumented
transformer.applyAdviceToMethod(
named("onMessage").and(takesArgument(0, named("java.util.Collection"))),
getClass().getName() + "$OnMessagesAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -48,4 +51,23 @@ public static void methodExit(
}
}
}

@SuppressWarnings("unused")
public static class OnMessagesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
@Nullable
public static SpringAwsUtil.BatchMessageScope methodEnter(
@Advice.Argument(0) Collection<Message<?>> messages) {
return SpringAwsUtil.handleBatch(messages);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void methodExit(
@Advice.Enter @Nullable SpringAwsUtil.BatchMessageScope scope,
@Advice.Thrown @Nullable Throwable throwable) {
if (scope != null) {
scope.close(throwable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@

package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0;

import static java.util.Collections.emptyList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.awssdk.v2_2.internal.Response;
import io.opentelemetry.instrumentation.awssdk.v2_2.internal.SqsMessage;
Expand All @@ -17,15 +26,132 @@
import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingExecutionInterceptor;
import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.springframework.messaging.Message;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

public class SpringAwsUtil {
private static final ThreadLocal<TracingList> context = new ThreadLocal<>();
private static final VirtualField<Message<?>, TracingContext> tracingContextField =
VirtualField.find(Message.class, TracingContext.class);

private static final MessagingAttributesGetter<Collection<Message<?>>, Void>
BATCH_ATTRIBUTES_GETTER =
new MessagingAttributesGetter<Collection<Message<?>>, Void>() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other instrumentations we have classes like this as top level classes

@Override
public String getSystem(Collection<Message<?>> messages) {
return "aws_sqs";
}

@Nullable
@Override
public String getDestination(Collection<Message<?>> messages) {
if (!messages.isEmpty()) {
Message<?> message = messages.iterator().next();
TracingContext tracingContext = tracingContextField.get(message);
if (tracingContext != null) {
SdkRequest sdkRequest =
tracingContext.request.getAttribute(
TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE);
if (sdkRequest instanceof ReceiveMessageRequest receiveMessageRequest) {
String queueUrl = receiveMessageRequest.queueUrl();
if (queueUrl != null) {
int i = queueUrl.lastIndexOf('/');
if (i > 0) {
return queueUrl.substring(i + 1);
}
}
}
}
}
return null;
}

@Nullable
@Override
public String getDestinationTemplate(Collection<Message<?>> messages) {
return null;
}

@Override
public boolean isTemporaryDestination(Collection<Message<?>> messages) {
return false;
}

@Override
public boolean isAnonymousDestination(Collection<Message<?>> messages) {
return false;
}

@Nullable
@Override
public String getConversationId(Collection<Message<?>> messages) {
return null;
}

@Nullable
@Override
public Long getMessageBodySize(Collection<Message<?>> messages) {
return null;
}

@Nullable
@Override
public Long getMessageEnvelopeSize(Collection<Message<?>> messages) {
return null;
}

@Nullable
@Override
public String getMessageId(Collection<Message<?>> messages, @Nullable Void unused) {
return null;
}

@Nullable
@Override
public String getClientId(Collection<Message<?>> messages) {
return null;
}

@Nullable
@Override
public Long getBatchMessageCount(
Collection<Message<?>> messages, @Nullable Void unused) {
return (long) messages.size();
}

@Override
public List<String> getMessageHeader(Collection<Message<?>> messages, String name) {
return emptyList();
}
};

private static final Instrumenter<Collection<Message<?>>, Void> BATCH_INSTRUMENTER =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instrumenter.<Collection<Message<?>>, Void>builder(
GlobalOpenTelemetry.get(),
"io.opentelemetry.spring-cloud-aws-3.0",
MessagingSpanNameExtractor.create(BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(
BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS)
.build())
.addSpanLinksExtractor(
(spanLinks, parentContext, messages) -> {
for (Message<?> message : messages) {
TracingContext tracingContext = tracingContextField.get(message);
if (tracingContext != null) {
SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage);
Context extracted =
SqsParentContext.ofMessage(wrappedMessage, tracingContext.config);
spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
}
}
})
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());

// put the TracingList into thread local, so we can use it in attachTracingState method
public static void initialize(Collection<?> messages) {
if (messages instanceof TracingList tracingList) {
Expand Down Expand Up @@ -72,23 +198,65 @@ public static MessageScope handleMessage(Message<?> message) {
return tracingContext.trace();
}

// restore context from the first message of the batch
@Nullable
public static Scope handleBatch(Collection<Message<?>> messages) {
public static BatchMessageScope handleBatch(Collection<Message<?>> messages) {
if (messages.isEmpty()) {
return null;
}

// Check if the batch has any tracing context before starting
Message<?> firstMessage = messages.iterator().next();
TracingContext tracingContext = tracingContextField.get(firstMessage);
if (tracingContext == null) {
return null;
}

// Start a separate trace (NO parent from the queue) using Context.current()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really correct. Context.current does not create a trace but rather joins an existing trace, which is fine here (sort of). This is complicated by the otel.instrumentation.messaging.experimental.receive-telemetry.enabled=true flag which creates an extra receive span. When that flag is used the receive span should be parent of the process span used here so you'd need to use the context of the receive span instead of Context.current() when that flag is set.

// The instrumenter adds span links to all messages.
Context parentContext = Context.current();
if (!BATCH_INSTRUMENTER.shouldStart(parentContext, messages)) {
return null;
}
Context context = BATCH_INSTRUMENTER.start(parentContext, messages);

for (Message<?> msg : messages) {
TracingContext tc = tracingContextField.get(msg);
if (tc != null) {
tc.batchProcessContext = context;
}
}

return new BatchMessageScope(context, messages);
}

@Nullable
public static Scope restoreBatchContext(Collection<Message<?>> messages) {
if (messages.isEmpty()) {
return null;
}
Message<?> message = messages.iterator().next();
TracingContext tracingContext = tracingContextField.get(message);
if (tracingContext == null) {
if (tracingContext == null || tracingContext.batchProcessContext == null) {
return null;
}
SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage);
Context parentContext = tracingContext.receiveContext;
if (parentContext == null) {
parentContext = SqsParentContext.ofMessage(wrappedMessage, tracingContext.config);
return tracingContext.batchProcessContext.makeCurrent();
}

public static class BatchMessageScope {
private final Context context;
private final Collection<Message<?>> request;
private final Scope scope;

private BatchMessageScope(Context context, Collection<Message<?>> request) {
this.context = context;
this.request = request;
this.scope = context.makeCurrent();
}

public void close(@Nullable Throwable throwable) {
scope.close();
BATCH_INSTRUMENTER.end(context, request, null, throwable);
}
return parentContext.makeCurrent();
}

public static class MessageScope {
Expand Down Expand Up @@ -123,6 +291,7 @@ private static class TracingContext {
private final TracingExecutionInterceptor config;
@Nullable private final Context receiveContext;
private final software.amazon.awssdk.services.sqs.model.Message sqsMessage;
@Nullable Context batchProcessContext;

private TracingContext(
TracingList tracingList, software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
Expand All @@ -146,6 +315,7 @@ MessageScope trace() {
return null;
}
Context context = instrumenter.start(parentContext, processRequest);
this.batchProcessContext = context;
return new MessageScope(instrumenter, context, processRequest, response);
}
}
Expand Down
Loading
Loading