Skip to content

Commit cde8f35

Browse files
authored
fix: SNS v2 batch injection bug (#11190)
# What Does This Do Fixes SNS v2 batch injection bug that causes a batch entry to be dropped when the number of attributes exceeded limits. + adds a test that reproduces the bug + migrates one existing test from Spock to JUnit as it fits more the purpose + cleans up some dups # Motivation Fix the bug affecting the application's logic due to an instrumentation flaw. # Additional Notes Ref: #7150 # Contributor Checklist - Format the title according to [the contribution guidelines](https://github.com/DataDog/dd-trace-java/blob/master/CONTRIBUTING.md#title-format) - Assign the `type:` and (`comp:` or `inst:`) labels in addition to [any other useful labels](https://github.com/DataDog/dd-trace-java/blob/master/CONTRIBUTING.md#labels) - Avoid using `close`, `fix`, or [any linking keywords](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword) when referencing an issue Use `solves` instead, and assign the PR [milestone](https://github.com/DataDog/dd-trace-java/milestones) to the issue - Update the [CODEOWNERS](https://github.com/DataDog/dd-trace-java/blob/master/.github/CODEOWNERS) file on source file addition, migration, or deletion - Update [public documentation](https://docs.datadoghq.com/tracing/trace_collection/library_config/java/) with any new configuration flags or behaviors Jira ticket: [APMS-19177] ***Note:*** **Once your PR is ready to merge, add it to the merge queue by commenting `/merge`.** `/merge -c` cancels the queue request. `/merge -f --reason "reason"` skips all merge queue checks; please use this judiciously, as some checks do not run at the PR-level. For more information, see [this doc](https://datadoghq.atlassian.net/wiki/spaces/DEVX/pages/3121612126/MergeQueue). <!-- # Opening vs Drafting a PR: When opening a pull request, please open it as a draft to not auto assign reviewers before you feel the pull request is in a reviewable state. # Linking a JIRA ticket: Please link your JIRA ticket by adding its identifier between brackets (ex [PROJ-IDENT]) in the PR description, not the title. This requirement only applies to Datadog employees. --> [APMS-19177]: https://datadoghq.atlassian.net/browse/APMS-19177?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Co-authored-by: yury.gribkov <yury.gribkov@datadoghq.com>
1 parent 346672f commit cde8f35

3 files changed

Lines changed: 132 additions & 65 deletions

File tree

dd-java-agent/instrumentation/aws-java/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package datadog.trace.instrumentation.aws.v2.sns;
22

33
import static datadog.context.propagation.Propagators.defaultPropagator;
4-
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
5-
import static datadog.trace.api.datastreams.DataStreamsTags.create;
64
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
7-
import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER;
85

96
import datadog.context.Context;
107
import datadog.trace.api.Config;
@@ -28,6 +25,9 @@
2825

2926
public class SnsInterceptor implements ExecutionInterceptor {
3027

28+
// SQS subscriber limit; SNS inherits it when SQS is used as a subscriber
29+
private static final int MAX_MESSAGE_ATTRIBUTES = 10;
30+
3131
public static final ExecutionAttribute<Context> CONTEXT_ATTRIBUTE =
3232
InstanceStore.of(ExecutionAttribute.class)
3333
.putIfAbsent("DatadogContext", () -> new ExecutionAttribute<>("DatadogContext"));
@@ -38,10 +38,12 @@ private SdkBytes getMessageAttributeValueToInject(
3838
StringBuilder jsonBuilder = new StringBuilder();
3939
jsonBuilder.append('{');
4040
if (traceConfig().isDataStreamsEnabled()) {
41-
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(snsTopicName));
41+
DataStreamsTags tags =
42+
DataStreamsTags.create("sns", DataStreamsTags.Direction.OUTBOUND, snsTopicName);
43+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
4244
context = context.with(dsmContext);
4345
}
44-
defaultPropagator().inject(context, jsonBuilder, SETTER);
46+
defaultPropagator().inject(context, jsonBuilder, TextMapInjectAdapter.SETTER);
4547
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
4648
jsonBuilder.append('}');
4749
return SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8);
@@ -57,9 +59,7 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
5759
// Injecting the trace context into SNS messageAttributes.
5860
if (context.request() instanceof PublishRequest) {
5961
PublishRequest request = (PublishRequest) context.request();
60-
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
61-
// the limit still applies here
62-
if (request.messageAttributes().size() < 10) {
62+
if (request.messageAttributes().size() < MAX_MESSAGE_ATTRIBUTES) {
6363
// Get topic name for DSM
6464
String snsTopicArn = request.topicArn();
6565
if (null == snsTopicArn) {
@@ -70,17 +70,11 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
7070
}
7171

7272
String snsTopicName = snsTopicArn.substring(snsTopicArn.lastIndexOf(':') + 1);
73-
Map<String, MessageAttributeValue> modifiedMessageAttributes =
74-
new HashMap<>(request.messageAttributes());
75-
modifiedMessageAttributes.put(
76-
"_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON
77-
// strings https://github.com/DataDog/datadog-lambda-js/pull/269
78-
MessageAttributeValue.builder()
79-
.dataType("Binary")
80-
.binaryValue(
81-
this.getMessageAttributeValueToInject(executionAttributes, snsTopicName))
82-
.build());
83-
return request.toBuilder().messageAttributes(modifiedMessageAttributes).build();
73+
Map<String, MessageAttributeValue> messageAttributes =
74+
withDatadogAttribute(
75+
request.messageAttributes(),
76+
this.getMessageAttributeValueToInject(executionAttributes, snsTopicName));
77+
return request.toBuilder().messageAttributes(messageAttributes).build();
8478
}
8579
return request;
8680
} else if (context.request() instanceof PublishBatchRequest) {
@@ -89,24 +83,28 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
8983
String snsTopicArn = request.topicArn();
9084
String snsTopicName = snsTopicArn.substring(snsTopicArn.lastIndexOf(':') + 1);
9185
ArrayList<PublishBatchRequestEntry> entries = new ArrayList<>();
92-
final SdkBytes sdkBytes =
93-
this.getMessageAttributeValueToInject(executionAttributes, snsTopicName);
86+
SdkBytes value = this.getMessageAttributeValueToInject(executionAttributes, snsTopicName);
9487
for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) {
95-
if (entry.messageAttributes().size() < 10) {
96-
Map<String, MessageAttributeValue> modifiedMessageAttributes =
97-
new HashMap<>(entry.messageAttributes());
98-
modifiedMessageAttributes.put(
99-
"_datadog",
100-
MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build());
101-
entries.add(entry.toBuilder().messageAttributes(modifiedMessageAttributes).build());
88+
if (entry.messageAttributes().size() < MAX_MESSAGE_ATTRIBUTES) {
89+
Map<String, MessageAttributeValue> messageAttributes =
90+
withDatadogAttribute(entry.messageAttributes(), value);
91+
entry = entry.toBuilder().messageAttributes(messageAttributes).build();
10292
}
93+
entries.add(entry);
10394
}
10495
return request.toBuilder().publishBatchRequestEntries(entries).build();
10596
}
10697
return context.request();
10798
}
10899

109-
private DataStreamsTags getTags(String snsTopicName) {
110-
return create("sns", OUTBOUND, snsTopicName);
100+
private static Map<String, MessageAttributeValue> withDatadogAttribute(
101+
Map<String, MessageAttributeValue> attributes, SdkBytes value) {
102+
// copy since the original map may be unmodifiable
103+
Map<String, MessageAttributeValue> modified = new HashMap<>(attributes);
104+
// Use Binary since SNS subscription filter policies fail silently with JSON strings
105+
// https://github.com/DataDog/datadog-lambda-js/pull/269
106+
modified.put(
107+
"_datadog", MessageAttributeValue.builder().dataType("Binary").binaryValue(value).build());
108+
return modified;
111109
}
112110
}

dd-java-agent/instrumentation/aws-java/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
1313
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
1414
import software.amazon.awssdk.regions.Region
1515
import software.amazon.awssdk.services.sns.SnsClient
16-
import software.amazon.awssdk.services.sns.model.MessageAttributeValue
1716
import software.amazon.awssdk.services.sns.model.PublishResponse
1817
import software.amazon.awssdk.services.sqs.SqsClient
1918
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
@@ -41,15 +40,15 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
4140
LOCALSTACK.start()
4241
def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566)
4342
snsClient = SnsClient.builder()
44-
.endpointOverride(URI.create(endPoint))
45-
.region(Region.of("us-east-1"))
46-
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
47-
.build()
43+
.endpointOverride(URI.create(endPoint))
44+
.region(Region.of("us-east-1"))
45+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
46+
.build()
4847
sqsClient = SqsClient.builder()
49-
.endpointOverride(URI.create(endPoint))
50-
.region(Region.of("us-east-1"))
51-
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
52-
.build()
48+
.endpointOverride(URI.create(endPoint))
49+
.region(Region.of("us-east-1"))
50+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
51+
.build()
5352
testQueueURL = sqsClient.createQueue { it.queueName("testqueue") }.queueUrl()
5453
testQueueARN = sqsClient.getQueueAttributes {it.queueUrl(testQueueURL).attributeNames(QueueAttributeName.QUEUE_ARN)}.attributes().get(QueueAttributeName.QUEUE_ARN)
5554
testTopicARN = snsClient.createTopic { it.name("testtopic") }.topicArn()
@@ -82,30 +81,6 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
8281
abstract String expectedOperation(String awsService, String awsOperation)
8382
abstract String expectedService(String awsService, String awsOperation)
8483

85-
def "trace details propagated when message attributes are readonly"() {
86-
when:
87-
TEST_WRITER.clear()
88-
89-
def headers = new HashMap<String, MessageAttributeValue>()
90-
headers.put("mykey", MessageAttributeValue.builder().stringValue("myvalue").dataType("String").build())
91-
def readonlyHeaders = Collections.unmodifiableMap(headers)
92-
snsClient.publish(b -> b.message("sometext").topicArn(testTopicARN).messageAttributes(readonlyHeaders))
93-
94-
def message = sqsClient.receiveMessage {
95-
it.queueUrl(testQueueURL).waitTimeSeconds(3)
96-
}.messages().get(0)
97-
98-
def messageBody = new JsonSlurper().parseText(message.body())
99-
100-
then:
101-
// injected value is here
102-
String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"]
103-
injectedValue.length() > 0
104-
105-
// original header value is still present
106-
messageBody["MessageAttributes"]["mykey"] != null
107-
}
108-
10984
def "trace details propagated via SNS system message attributes"() {
11085
when:
11186
TEST_WRITER.clear()
@@ -214,7 +189,7 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
214189
TEST_WRITER.clear()
215190
snsClient.publish { req ->
216191
req.message("test message")
217-
.topicArn(testTopicARN)
192+
.topicArn(testTopicARN)
218193
}
219194

220195
def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)
@@ -339,4 +314,3 @@ class SnsClientV1DataStreamsForkedTest extends SnsClientTest {
339314
1
340315
}
341316
}
342-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package datadog.trace.instrumentation.aws.v2.sns;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNotSame;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
8+
import datadog.context.Context;
9+
import java.util.Arrays;
10+
import java.util.Collections;
11+
import java.util.HashMap;
12+
import java.util.LinkedHashMap;
13+
import java.util.Map;
14+
import java.util.stream.Collectors;
15+
import org.junit.jupiter.api.Test;
16+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
17+
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
18+
import software.amazon.awssdk.services.sns.model.PublishBatchRequest;
19+
import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry;
20+
import software.amazon.awssdk.services.sns.model.PublishRequest;
21+
22+
public class SnsInterceptorTest {
23+
24+
@Test
25+
void publishBatchPreservesEntriesAndOnlyInjectsBelowTheMessageAttributeLimit() {
26+
PublishBatchRequest batchRequest =
27+
PublishBatchRequest.builder()
28+
.topicArn("arn:aws:sns:us-east-1:123456789012:test-topic")
29+
.publishBatchRequestEntries(
30+
PublishBatchRequestEntry.builder()
31+
.id("at-limit")
32+
.message("first")
33+
.messageAttributes(stringAttributes(10))
34+
.build(),
35+
PublishBatchRequestEntry.builder()
36+
.id("under-limit")
37+
.message("second")
38+
.messageAttributes(stringAttributes(9))
39+
.build())
40+
.build();
41+
42+
PublishBatchRequest modified =
43+
(PublishBatchRequest)
44+
new SnsInterceptor().modifyRequest(() -> batchRequest, executionAttributes());
45+
46+
assertEquals(
47+
Arrays.asList("at-limit", "under-limit"),
48+
modified.publishBatchRequestEntries().stream()
49+
.map(PublishBatchRequestEntry::id)
50+
.collect(Collectors.toList()));
51+
assertFalse(
52+
modified.publishBatchRequestEntries().get(0).messageAttributes().containsKey("_datadog"));
53+
assertTrue(
54+
modified.publishBatchRequestEntries().get(1).messageAttributes().containsKey("_datadog"));
55+
}
56+
57+
@Test
58+
void publishPreservesReadonlyAttributesWhileAddingDatadogContext() {
59+
Map<String, MessageAttributeValue> headers = new HashMap<>();
60+
headers.put(
61+
"mykey", MessageAttributeValue.builder().dataType("String").stringValue("myvalue").build());
62+
Map<String, MessageAttributeValue> readonlyHeaders = Collections.unmodifiableMap(headers);
63+
64+
PublishRequest request =
65+
PublishRequest.builder()
66+
.topicArn("arn:aws:sns:us-east-1:123456789012:test-topic")
67+
.message("sometext")
68+
.messageAttributes(readonlyHeaders)
69+
.build();
70+
71+
PublishRequest modified =
72+
(PublishRequest) new SnsInterceptor().modifyRequest(() -> request, executionAttributes());
73+
74+
assertNotSame(readonlyHeaders, modified.messageAttributes());
75+
assertEquals("myvalue", modified.messageAttributes().get("mykey").stringValue());
76+
assertTrue(modified.messageAttributes().containsKey("_datadog"));
77+
assertFalse(readonlyHeaders.containsKey("_datadog"));
78+
}
79+
80+
private static ExecutionAttributes executionAttributes() {
81+
ExecutionAttributes executionAttributes = new ExecutionAttributes();
82+
executionAttributes.putAttribute(SnsInterceptor.CONTEXT_ATTRIBUTE, Context.root());
83+
return executionAttributes;
84+
}
85+
86+
private static Map<String, MessageAttributeValue> stringAttributes(int count) {
87+
Map<String, MessageAttributeValue> attributes = new LinkedHashMap<>();
88+
for (int index = 1; index <= count; index++) {
89+
attributes.put(
90+
"key" + index,
91+
MessageAttributeValue.builder().dataType("String").stringValue("value" + index).build());
92+
}
93+
return attributes;
94+
}
95+
}

0 commit comments

Comments
 (0)