Skip to content

Commit a34d01a

Browse files
committed
defer consume span closing to after async method ends
1 parent f5a4e71 commit a34d01a

3 files changed

Lines changed: 181 additions & 9 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package datadog.trace.instrumentation.springmessaging;
2+
3+
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
4+
5+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
6+
import java.lang.reflect.Method;
7+
import java.util.concurrent.CompletionException;
8+
import java.util.concurrent.CompletionStage;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.function.BiConsumer;
12+
import java.util.function.Consumer;
13+
14+
public final class SpringMessageAsyncHelper {
15+
private SpringMessageAsyncHelper() {}
16+
17+
private static final ClassValue<ReactorCallbackMethods> REACTOR_CALLBACK_METHODS =
18+
new ReactorCallbacksClassValue();
19+
20+
public static Object wrapAsyncResult(Object result, AgentSpan span) {
21+
if (result == null) {
22+
return null;
23+
}
24+
SpanFinisher finisher = new SpanFinisher(span);
25+
if (result instanceof CompletionStage<?>) {
26+
return ((CompletionStage<?>) result)
27+
.whenComplete(new CompletionStageFinishCallback(finisher));
28+
}
29+
ReactorCallbackMethods callbackMethods = REACTOR_CALLBACK_METHODS.get(result.getClass());
30+
if (!callbackMethods.supported()) {
31+
return null;
32+
}
33+
try {
34+
Object wrapped = callbackMethods.doOnError.invoke(result, new ErrorCallback(finisher));
35+
wrapped = callbackMethods.doOnTerminate.invoke(wrapped, new FinishCallback(finisher));
36+
return callbackMethods.doOnCancel.invoke(wrapped, new FinishCallback(finisher));
37+
} catch (Throwable ignored) {
38+
return null;
39+
}
40+
}
41+
42+
static final class ReactorCallbacksClassValue extends ClassValue<ReactorCallbackMethods> {
43+
@Override
44+
protected ReactorCallbackMethods computeValue(Class<?> type) {
45+
try {
46+
Method doOnError = type.getMethod("doOnError", Consumer.class);
47+
Method doOnTerminate = type.getMethod("doOnTerminate", Runnable.class);
48+
Method doOnCancel = type.getMethod("doOnCancel", Runnable.class);
49+
return new ReactorCallbackMethods(doOnError, doOnTerminate, doOnCancel);
50+
} catch (Throwable ignored) {
51+
return ReactorCallbackMethods.UNSUPPORTED;
52+
}
53+
}
54+
}
55+
56+
static final class ReactorCallbackMethods {
57+
static final ReactorCallbackMethods UNSUPPORTED = new ReactorCallbackMethods(null, null, null);
58+
59+
final Method doOnError;
60+
final Method doOnTerminate;
61+
final Method doOnCancel;
62+
63+
ReactorCallbackMethods(Method doOnError, Method doOnTerminate, Method doOnCancel) {
64+
this.doOnError = doOnError;
65+
this.doOnTerminate = doOnTerminate;
66+
this.doOnCancel = doOnCancel;
67+
}
68+
69+
boolean supported() {
70+
return doOnError != null && doOnTerminate != null && doOnCancel != null;
71+
}
72+
}
73+
74+
static final class SpanFinisher {
75+
private final AgentSpan span;
76+
private final AtomicBoolean finished = new AtomicBoolean(false);
77+
78+
SpanFinisher(AgentSpan span) {
79+
this.span = span;
80+
}
81+
82+
void onError(Throwable throwable) {
83+
DECORATE.onError(span, throwable);
84+
}
85+
86+
void finish() {
87+
if (finished.compareAndSet(false, true)) {
88+
DECORATE.beforeFinish(span);
89+
span.finish();
90+
}
91+
}
92+
}
93+
94+
static final class CompletionStageFinishCallback implements BiConsumer<Object, Throwable> {
95+
private final SpanFinisher finisher;
96+
97+
CompletionStageFinishCallback(SpanFinisher finisher) {
98+
this.finisher = finisher;
99+
}
100+
101+
@Override
102+
public void accept(Object ignored, Throwable throwable) {
103+
if (throwable != null) {
104+
finisher.onError(unwrap(throwable));
105+
}
106+
finisher.finish();
107+
}
108+
}
109+
110+
static final class ErrorCallback implements Consumer<Throwable> {
111+
private final SpanFinisher finisher;
112+
113+
ErrorCallback(SpanFinisher finisher) {
114+
this.finisher = finisher;
115+
}
116+
117+
@Override
118+
public void accept(Throwable throwable) {
119+
finisher.onError(throwable);
120+
}
121+
}
122+
123+
static final class FinishCallback implements Runnable {
124+
private final SpanFinisher finisher;
125+
126+
FinishCallback(SpanFinisher finisher) {
127+
this.finisher = finisher;
128+
}
129+
130+
@Override
131+
public void run() {
132+
finisher.finish();
133+
}
134+
}
135+
136+
private static Throwable unwrap(Throwable throwable) {
137+
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
138+
Throwable cause = throwable.getCause();
139+
if (cause != null) {
140+
return cause;
141+
}
142+
}
143+
return throwable;
144+
}
145+
}

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1515

1616
import com.google.auto.service.AutoService;
17+
import datadog.context.Context;
1718
import datadog.context.ContextScope;
1819
import datadog.trace.agent.tooling.Instrumenter;
1920
import datadog.trace.agent.tooling.InstrumenterModule;
2021
import datadog.trace.agent.tooling.annotation.AppliesOn;
22+
import datadog.trace.bootstrap.InstrumentationContext;
2123
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2224
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
23-
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
24-
import java.util.concurrent.CompletionStage;
25+
import java.util.Collections;
26+
import java.util.Map;
2527
import net.bytebuddy.asm.Advice;
28+
import org.reactivestreams.Publisher;
2629
import org.springframework.messaging.Message;
2730
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2831

@@ -50,12 +53,24 @@ public void methodAdvice(MethodTransformer transformer) {
5053
SpringMessageHandlerInstrumentation.class.getName() + "$HandleMessageAdvice");
5154
}
5255

56+
@Override
57+
public Map<String, String> contextStore() {
58+
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
59+
}
60+
5361
@Override
5462
public String[] helperClassNames() {
5563
return new String[] {
5664
packageName + ".SpringMessageDecorator",
5765
packageName + ".SpringMessageExtractAdapter",
58-
packageName + ".SpringMessageExtractAdapter$1"
66+
packageName + ".SpringMessageExtractAdapter$1",
67+
packageName + ".SpringMessageAsyncHelper",
68+
packageName + ".SpringMessageAsyncHelper$ReactorCallbacksClassValue",
69+
packageName + ".SpringMessageAsyncHelper$ReactorCallbackMethods",
70+
packageName + ".SpringMessageAsyncHelper$SpanFinisher",
71+
packageName + ".SpringMessageAsyncHelper$CompletionStageFinishCallback",
72+
packageName + ".SpringMessageAsyncHelper$ErrorCallback",
73+
packageName + ".SpringMessageAsyncHelper$FinishCallback",
5974
};
6075
}
6176

@@ -95,8 +110,20 @@ public static void onExit(
95110
}
96111
AgentSpan span = scope.span();
97112
scope.close();
98-
if (result instanceof CompletionStage) {
99-
result = ((CompletionStage<?>) result).whenComplete(AsyncResultExtensions.finishSpan(span));
113+
Object asyncResult = SpringMessageAsyncHelper.wrapAsyncResult(result, span);
114+
if (asyncResult != null) {
115+
if (result != asyncResult
116+
&& result instanceof Publisher<?>
117+
&& asyncResult instanceof Publisher<?>) {
118+
Context publisherContext =
119+
InstrumentationContext.get(Publisher.class, Context.class)
120+
.remove((Publisher<?>) result);
121+
if (publisherContext != null) {
122+
InstrumentationContext.get(Publisher.class, Context.class)
123+
.put((Publisher<?>) asyncResult, publisherContext);
124+
}
125+
}
126+
result = asyncResult;
100127
} else {
101128
if (null != error) {
102129
DECORATE.onError(span, error);

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification {
4848

4949
and: "child.work is a child of spring.consume"
5050
DDSpan produce1Span, produce2Span, springConsumeParent
51-
assertTraces(10, SORT_TRACES_BY_ID) {
51+
assertTraces(9, SORT_TRACES_BY_ID) {
5252
trace(1) {
5353
produceSpan(it)
5454
produce1Span = span(0)
@@ -63,13 +63,13 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification {
6363
trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
6464
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
6565

66-
trace(1) {
66+
trace(2) {
6767
// consume messages in one batch
6868
springConsumeSpan(it)
6969
springConsumeParent = span(0)
70+
// child work span connected to the spring consume span
71+
childWorkSpan(it, springConsumeParent)
7072
}
71-
// child work span connected to the spring consume span
72-
trace(1) { childWorkSpan(it, springConsumeParent) }
7373

7474
trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
7575
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }

0 commit comments

Comments
 (0)