Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ out/
# Visual Studio Code #
######################
.vscode
.cursor

# Others #
##########
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ protected void startNewMessageSpan(Message message) {

DataStreamsTags tags = create("sqs", INBOUND, urlFileName(queueUrl));
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(tags, 0, 0));
System.out.println("Setting a checkpoint in thread" + Thread.currentThread().getId());

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span, queueUrl);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -44,8 +45,13 @@ public String[] helperClassNames() {

@Override
public Map<String, String> contextStore() {
return singletonMap(
Map<String, String> contextStore = new java.util.HashMap<>(2);
contextStore.put(
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String");
contextStore.put(
"software.amazon.awssdk.services.sqs.model.Message",
"datadog.trace.bootstrap.instrumentation.java.concurrent.State");
return contextStore;
}

@Override
Expand All @@ -63,7 +69,11 @@ public static void onExit(
String queueUrl =
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result);
if (queueUrl != null) {
messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId());
ContextStore<Message, State> messageStateStore =
InstrumentationContext.get(Message.class, State.class);
messages =
new TracingList(
messageStateStore, messages, queueUrl, result.responseMetadata().requestId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,12 +31,18 @@
public class TracingIterator<L extends Iterator<Message>> implements Iterator<Message> {
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);

private final ContextStore<Message, State> messageStateStore;
protected final L delegate;
private final String queueUrl;
private final String requestId;
private AgentSpanContext batchContext;

public TracingIterator(L delegate, String queueUrl, String requestId) {
public TracingIterator(
ContextStore<Message, State> messageStateStore,
L delegate,
String queueUrl,
String requestId) {
this.messageStateStore = messageStateStore;
this.delegate = delegate;
this.queueUrl = queueUrl;
this.requestId = requestId;
Expand Down Expand Up @@ -99,6 +107,22 @@ protected void startNewMessageSpan(Message message) {
BROKER_DECORATE.beforeFinish(queueSpan);
queueSpan.finish();
}

// Capture state after data streams checkpoint is set
try {
State state = State.FACTORY.create();
state.captureAndSetContinuation(span);
messageStateStore.put(message, state);
System.out.println(
"[TracingIterator] Captured state for SQS message: "
+ message.messageId()
+ " with span: "
+ span.getSpanId()
+ " on thread: "
+ Thread.currentThread().getId());
} catch (Exception stateException) {
log.debug("Problem capturing state for SQS message", stateException);
}
}
} catch (Exception e) {
log.debug("Problem tracing new SQS message span", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import software.amazon.awssdk.services.sqs.model.Message;

public class TracingList implements List<Message> {
private final ContextStore<Message, State> messageStateStore;
private final List<Message> delegate;
private final String queueUrl;
private final String requestId;

public TracingList(List<Message> delegate, String queueUrl, String requestId) {
public TracingList(
ContextStore<Message, State> messageStateStore,
List<Message> delegate,
String queueUrl,
String requestId) {
this.messageStateStore = messageStateStore;
this.delegate = delegate;
this.queueUrl = queueUrl;
this.requestId = requestId;
Expand Down Expand Up @@ -125,12 +133,14 @@ public ListIterator<Message> listIterator() {
@Override
public ListIterator<Message> listIterator(int index) {
// every iteration will add spans. Not only the very first one
return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId);
return new TracingListIterator(
messageStateStore, delegate.listIterator(index), queueUrl, requestId);
}

@Override
public List<Message> subList(int fromIndex, int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId);
return new TracingList(
messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.ListIterator;
import software.amazon.awssdk.services.sqs.model.Message;

public class TracingListIterator extends TracingIterator<ListIterator<Message>>
implements ListIterator<Message> {

public TracingListIterator(ListIterator<Message> delegate, String queueUrl, String requestId) {
super(delegate, queueUrl, requestId);
public TracingListIterator(
ContextStore<Message, State> messageStateStore,
ListIterator<Message> delegate,
String queueUrl,
String requestId) {
super(messageStateStore, delegate, queueUrl, requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*;
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND;
import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
Expand Down Expand Up @@ -53,22 +55,100 @@ public String[] helperClassNames() {
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap("org.springframework.messaging.Message", State.class.getName());
}

public static class HandleMessageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(
@Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message<?> message) {
AgentSpanContext parentContext;
AgentSpan parent = activeSpan();

// First try to get context from continuation (preferred method)
State state = InstrumentationContext.get(Message.class, State.class).get(message);
if (null != state) {
System.out.println(
"[Spring] Found state in Spring message, attempting to activate continuation on thread: "
+ Thread.currentThread().getId());
AgentScope.Continuation continuation = state.getAndResetContinuation();
if (null != continuation) {
try (AgentScope scope = continuation.activate()) {
AgentSpan span = startSpan(SPRING_INBOUND);
DECORATE.afterStart(span);
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
System.out.println(
"[Spring] Successfully activated continuation from Spring Message with span: "
+ span.getSpanId()
+ " on thread: "
+ Thread.currentThread().getId());
return activateSpan(span);
}
} else {
System.out.println(
"[Spring] No continuation found in state on thread: "
+ Thread.currentThread().getId());
}
} else {
System.out.println(
"[Spring] No state found in Spring message 2, falling back to header extraction on thread: "
+ Thread.currentThread().getId());
}

// Fallback to existing context or header extraction
if (null != parent) {
// prefer existing context, assume it was already extracted from this message
parentContext = parent.context();
System.out.println(
"[Spring] Using existing active span context on thread: "
+ Thread.currentThread().getId());
} else {
// otherwise try to re-extract the message context to avoid disconnected trace
parentContext = extractContextAndGetSpanContext(message, GETTER);
System.out.println(
"[Spring] Extracted context from message headers on thread: "
+ Thread.currentThread().getId());
}

AgentSpan span = startSpan(SPRING_INBOUND, parentContext);
DECORATE.afterStart(span);
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));

// Extract SQS queue information - try different header patterns
Object queueUrl = message.getHeaders().get("Sqs_QueueUrl");
Object queueName = message.getHeaders().get("Sqs_QueueName");

// If not found in Sqs_ prefixed headers, try aws. prefixed headers
if (queueUrl == null) {
queueUrl = message.getHeaders().get("aws.queue.url");
}
if (queueName == null) {
queueName = message.getHeaders().get("aws.queue.name");
}

// If still not found, try to extract from QueueAttributes
if (queueUrl == null || queueName == null) {
Object queueAttributes = message.getHeaders().get("Sqs_QueueAttributes");
if (queueAttributes != null) {
String attributesStr = queueAttributes.toString();
// Extract queue name from attributes if available
if (queueName == null && attributesStr.contains("queueName=")) {
queueName =
attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0];
}
}
}

// Add SQS queue tags to the span
if (queueUrl != null) {
span.setTag("aws.sqs.queue_url", queueUrl.toString());
}
if (queueName != null) {
span.setTag("aws.sqs.queue_name", queueName.toString());
}

return activateSpan(span);
}

Expand Down
25 changes: 25 additions & 0 deletions dd-java-agent/instrumentation/spring/spring-sqs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
muzzle {
pass {
group = 'io.awspring.cloud'
module = 'spring-cloud-aws-sqs'
versions = "[3.0.0,)"
assertInverse = true
}
}

ext {
minJavaVersionForTests = JavaVersion.VERSION_17
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')

[compileTestGroovy, compileLatestDepTestGroovy].each {
it.javaLauncher = getJavaLauncherFor(17)
}

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23'
}
Loading
Loading