Skip to content

Commit 6f62474

Browse files
committed
feat(spring-cloud-aws): instrument onMessage(Collection<Message<T>>) for batch consumption
1 parent 2618b31 commit 6f62474

3 files changed

Lines changed: 73 additions & 1 deletion

File tree

instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/MessagingMessageListenerAdapterInstrumentation.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import static net.bytebuddy.matcher.ElementMatchers.named;
99
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1010

11+
import io.opentelemetry.context.Scope;
1112
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1213
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
14+
import java.util.Collection;
1315
import javax.annotation.Nullable;
1416
import net.bytebuddy.asm.Advice;
1517
import net.bytebuddy.description.type.TypeDescription;
@@ -28,7 +30,9 @@ public void transform(TypeTransformer transformer) {
2830
transformer.applyAdviceToMethod(
2931
named("onMessage").and(takesArgument(0, named("org.springframework.messaging.Message"))),
3032
getClass().getName() + "$OnMessageAdvice");
31-
// TODO: onMessage(Collection<Message<T>> messages) not instrumented
33+
transformer.applyAdviceToMethod(
34+
named("onMessage").and(takesArgument(0, named("java.util.Collection"))),
35+
getClass().getName() + "$OnMessagesAdvice");
3236
}
3337

3438
@SuppressWarnings("unused")
@@ -48,4 +52,20 @@ public static void methodExit(
4852
}
4953
}
5054
}
55+
56+
@SuppressWarnings("unused")
57+
public static class OnMessagesAdvice {
58+
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
59+
@Nullable
60+
public static Scope methodEnter(@Advice.Argument(0) Collection<Message<?>> messages) {
61+
return SpringAwsUtil.handleBatch(messages);
62+
}
63+
64+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
65+
public static void methodExit(@Advice.Enter @Nullable Scope scope) {
66+
if (scope != null) {
67+
scope.close();
68+
}
69+
}
70+
}
5171
}

instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static java.util.concurrent.TimeUnit.SECONDS;
2727
import static org.assertj.core.api.Assertions.assertThat;
2828

29+
import java.util.Arrays;
30+
2931
import io.awspring.cloud.sqs.operations.SqsTemplate;
3032
import io.opentelemetry.api.trace.SpanKind;
3133
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
@@ -176,4 +178,45 @@ void sqsListener() throws Exception {
176178
+ "/000000000000/test-queue"),
177179
satisfies(AWS_REQUEST_ID, val -> val.isInstanceOf(String.class)))));
178180
}
181+
182+
@Test
183+
void sqsBatchListener() throws Exception {
184+
String messageContent1 = "hello";
185+
String messageContent2 = "world";
186+
CompletableFuture<java.util.List<String>> messageFuture = new CompletableFuture<>();
187+
AwsSqsTestApplication.batchMessageHandler =
188+
strings -> testing.runWithSpan("callback", () -> messageFuture.complete(strings));
189+
190+
testing.runWithSpan(
191+
"parent",
192+
() ->
193+
sqsTemplate.sendMany(
194+
"test-batch-queue", Arrays.asList(messageContent1, messageContent2)));
195+
196+
java.util.List<String> result = messageFuture.get(10, SECONDS);
197+
assertThat(result).containsExactlyInAnyOrder(messageContent1, messageContent2);
198+
199+
testing.waitAndAssertTraces(
200+
trace ->
201+
trace.hasSpansSatisfyingExactly(
202+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
203+
span ->
204+
span.hasName("Sqs.GetQueueUrl")
205+
.hasKind(SpanKind.CLIENT)
206+
.hasParent(trace.getSpan(0)),
207+
span ->
208+
span.hasName("test-batch-queue publish")
209+
.hasKind(SpanKind.PRODUCER)
210+
.hasParent(trace.getSpan(0)),
211+
span ->
212+
span.hasName("test-batch-queue process")
213+
.hasKind(SpanKind.CONSUMER)
214+
.hasParent(trace.getSpan(2)),
215+
span ->
216+
span.hasName("callback").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(3)),
217+
span ->
218+
span.hasName("Sqs.DeleteMessageBatch")
219+
.hasKind(SpanKind.CLIENT)
220+
.hasParent(trace.getSpan(2))));
221+
}
179222
}

instrumentation/spring/spring-cloud-aws-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/cloud/aws/v3_0/AwsSqsTestApplication.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,13 @@ void receiveStringMessage(String message) {
4242
messageHandler.accept(message);
4343
}
4444
}
45+
46+
static volatile Consumer<java.util.List<String>> batchMessageHandler;
47+
48+
@SqsListener("test-batch-queue")
49+
void receiveBatchMessages(java.util.List<String> messages) {
50+
if (batchMessageHandler != null) {
51+
batchMessageHandler.accept(messages);
52+
}
53+
}
4554
}

0 commit comments

Comments
 (0)