-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(spring-cloud-aws): instrument onMessage(Collection<Message<T>>) … #19053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1a51d04
1edd8df
82049e9
4c1dc27
2056002
4443363
109298d
9e6d71d
799426b
ff8c987
50bef8c
08922c7
d283609
9d9db2d
ca3a57e
89ed8bd
af530cc
861a780
5c649ca
f98f6e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,9 +5,18 @@ | |
|
|
||
| package io.opentelemetry.javaagent.instrumentation.spring.cloud.aws.v3_0; | ||
|
|
||
| import static java.util.Collections.emptyList; | ||
|
|
||
| import io.opentelemetry.api.GlobalOpenTelemetry; | ||
| import io.opentelemetry.api.trace.Span; | ||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; | ||
| import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
| import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; | ||
| import io.opentelemetry.instrumentation.api.util.VirtualField; | ||
| import io.opentelemetry.instrumentation.awssdk.v2_2.internal.Response; | ||
| import io.opentelemetry.instrumentation.awssdk.v2_2.internal.SqsMessage; | ||
|
|
@@ -17,15 +26,132 @@ | |
| import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingExecutionInterceptor; | ||
| import io.opentelemetry.instrumentation.awssdk.v2_2.internal.TracingList; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import javax.annotation.Nullable; | ||
| import org.springframework.messaging.Message; | ||
| import software.amazon.awssdk.core.SdkRequest; | ||
| import software.amazon.awssdk.core.interceptor.ExecutionAttributes; | ||
| import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; | ||
|
|
||
| public class SpringAwsUtil { | ||
| private static final ThreadLocal<TracingList> context = new ThreadLocal<>(); | ||
| private static final VirtualField<Message<?>, TracingContext> tracingContextField = | ||
| VirtualField.find(Message.class, TracingContext.class); | ||
|
|
||
| private static final MessagingAttributesGetter<Collection<Message<?>>, Void> | ||
| BATCH_ATTRIBUTES_GETTER = | ||
| new MessagingAttributesGetter<Collection<Message<?>>, Void>() { | ||
| @Override | ||
| public String getSystem(Collection<Message<?>> messages) { | ||
| return "aws_sqs"; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getDestination(Collection<Message<?>> messages) { | ||
| if (!messages.isEmpty()) { | ||
| Message<?> message = messages.iterator().next(); | ||
| TracingContext tracingContext = tracingContextField.get(message); | ||
| if (tracingContext != null) { | ||
| SdkRequest sdkRequest = | ||
| tracingContext.request.getAttribute( | ||
| TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE); | ||
| if (sdkRequest instanceof ReceiveMessageRequest receiveMessageRequest) { | ||
| String queueUrl = receiveMessageRequest.queueUrl(); | ||
| if (queueUrl != null) { | ||
| int i = queueUrl.lastIndexOf('/'); | ||
| if (i > 0) { | ||
| return queueUrl.substring(i + 1); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getDestinationTemplate(Collection<Message<?>> messages) { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isTemporaryDestination(Collection<Message<?>> messages) { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isAnonymousDestination(Collection<Message<?>> messages) { | ||
| return false; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getConversationId(Collection<Message<?>> messages) { | ||
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public Long getMessageBodySize(Collection<Message<?>> messages) { | ||
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public Long getMessageEnvelopeSize(Collection<Message<?>> messages) { | ||
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getMessageId(Collection<Message<?>> messages, @Nullable Void unused) { | ||
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String getClientId(Collection<Message<?>> messages) { | ||
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public Long getBatchMessageCount( | ||
| Collection<Message<?>> messages, @Nullable Void unused) { | ||
| return (long) messages.size(); | ||
| } | ||
|
|
||
| @Override | ||
| public List<String> getMessageHeader(Collection<Message<?>> messages, String name) { | ||
| return emptyList(); | ||
| } | ||
| }; | ||
|
|
||
| private static final Instrumenter<Collection<Message<?>>, Void> BATCH_INSTRUMENTER = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typically the code that creates the instrumenter for agent instrumentations is in a class whose name ends with Singletons. For example see https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaSingletons.java |
||
| Instrumenter.<Collection<Message<?>>, Void>builder( | ||
| GlobalOpenTelemetry.get(), | ||
| "io.opentelemetry.spring-cloud-aws-3.0", | ||
| MessagingSpanNameExtractor.create(BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS)) | ||
| .addAttributesExtractor( | ||
| MessagingAttributesExtractor.builder( | ||
| BATCH_ATTRIBUTES_GETTER, MessageOperation.PROCESS) | ||
| .build()) | ||
| .addSpanLinksExtractor( | ||
| (spanLinks, parentContext, messages) -> { | ||
| for (Message<?> message : messages) { | ||
| TracingContext tracingContext = tracingContextField.get(message); | ||
| if (tracingContext != null) { | ||
| SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); | ||
| Context extracted = | ||
| SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); | ||
| spanLinks.addLink(Span.fromContext(extracted).getSpanContext()); | ||
| } | ||
| } | ||
| }) | ||
| .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); | ||
|
|
||
| // put the TracingList into thread local, so we can use it in attachTracingState method | ||
| public static void initialize(Collection<?> messages) { | ||
| if (messages instanceof TracingList tracingList) { | ||
|
|
@@ -72,23 +198,65 @@ public static MessageScope handleMessage(Message<?> message) { | |
| return tracingContext.trace(); | ||
| } | ||
|
|
||
| // restore context from the first message of the batch | ||
| @Nullable | ||
| public static Scope handleBatch(Collection<Message<?>> messages) { | ||
| public static BatchMessageScope handleBatch(Collection<Message<?>> messages) { | ||
| if (messages.isEmpty()) { | ||
| return null; | ||
| } | ||
|
|
||
| // Check if the batch has any tracing context before starting | ||
| Message<?> firstMessage = messages.iterator().next(); | ||
| TracingContext tracingContext = tracingContextField.get(firstMessage); | ||
| if (tracingContext == null) { | ||
| return null; | ||
| } | ||
|
|
||
| // Start a separate trace (NO parent from the queue) using Context.current() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't really correct. Context.current does not create a trace but rather joins an existing trace, which is fine here (sort of). This is complicated by the |
||
| // The instrumenter adds span links to all messages. | ||
| Context parentContext = Context.current(); | ||
| if (!BATCH_INSTRUMENTER.shouldStart(parentContext, messages)) { | ||
| return null; | ||
| } | ||
| Context context = BATCH_INSTRUMENTER.start(parentContext, messages); | ||
|
|
||
| for (Message<?> msg : messages) { | ||
| TracingContext tc = tracingContextField.get(msg); | ||
| if (tc != null) { | ||
| tc.batchProcessContext = context; | ||
| } | ||
| } | ||
|
|
||
| return new BatchMessageScope(context, messages); | ||
| } | ||
|
|
||
| @Nullable | ||
| public static Scope restoreBatchContext(Collection<Message<?>> messages) { | ||
| if (messages.isEmpty()) { | ||
| return null; | ||
| } | ||
| Message<?> message = messages.iterator().next(); | ||
| TracingContext tracingContext = tracingContextField.get(message); | ||
| if (tracingContext == null) { | ||
| if (tracingContext == null || tracingContext.batchProcessContext == null) { | ||
| return null; | ||
| } | ||
| SqsMessage wrappedMessage = SqsMessageImpl.wrap(tracingContext.sqsMessage); | ||
| Context parentContext = tracingContext.receiveContext; | ||
| if (parentContext == null) { | ||
| parentContext = SqsParentContext.ofMessage(wrappedMessage, tracingContext.config); | ||
| return tracingContext.batchProcessContext.makeCurrent(); | ||
| } | ||
|
|
||
| public static class BatchMessageScope { | ||
| private final Context context; | ||
| private final Collection<Message<?>> request; | ||
| private final Scope scope; | ||
|
|
||
| private BatchMessageScope(Context context, Collection<Message<?>> request) { | ||
| this.context = context; | ||
| this.request = request; | ||
| this.scope = context.makeCurrent(); | ||
| } | ||
|
|
||
| public void close(@Nullable Throwable throwable) { | ||
| scope.close(); | ||
| BATCH_INSTRUMENTER.end(context, request, null, throwable); | ||
| } | ||
| return parentContext.makeCurrent(); | ||
| } | ||
|
|
||
| public static class MessageScope { | ||
|
|
@@ -123,6 +291,7 @@ private static class TracingContext { | |
| private final TracingExecutionInterceptor config; | ||
| @Nullable private final Context receiveContext; | ||
| private final software.amazon.awssdk.services.sqs.model.Message sqsMessage; | ||
| @Nullable Context batchProcessContext; | ||
|
|
||
| private TracingContext( | ||
| TracingList tracingList, software.amazon.awssdk.services.sqs.model.Message sqsMessage) { | ||
|
|
@@ -146,6 +315,7 @@ MessageScope trace() { | |
| return null; | ||
| } | ||
| Context context = instrumenter.start(parentContext, processRequest); | ||
| this.batchProcessContext = context; | ||
| return new MessageScope(instrumenter, context, processRequest, response); | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in other instrumentations we have classes like this as top level classes